bft_bench_shortcircuit/
lib.rs

1use std::collections::HashMap;
2
3use async_trait::async_trait;
4
5use uuid::Uuid;
6
7use bft_bench_core::{
8    BftBinding, BftError, BftReader, BftWriter, Config, Node, NodeAccess, NodeEndpoint,
9    ReadWriteNode, Result, WriteNode,
10};
11use tokio::sync::broadcast::{Receiver, Sender, channel};
12
13const CHANNEL_BUFFER_SIZE: usize = 128 * 1024 * 1024;
14
15pub struct ShortCircuitedBftBinding {
16    writer: Writer,
17    first_reader: Option<Reader>,
18    readers: HashMap<NodeEndpoint, Reader>,
19}
20
21#[derive(Clone)]
22pub struct Writer {
23    sender: Sender<Uuid>,
24}
25
26pub struct Reader {
27    receiver_factory: Sender<Uuid>,
28    receiver: Receiver<Uuid>,
29}
30
31impl Clone for Reader {
32    fn clone(&self) -> Self {
33        Self {
34            receiver_factory: self.receiver_factory.clone(),
35            receiver: self.receiver_factory.subscribe(),
36        }
37    }
38}
39
40#[async_trait]
41impl BftBinding for ShortCircuitedBftBinding {
42    type Writer = Writer;
43
44    type Reader = Reader;
45
46    fn new(_: &Config) -> Self {
47        let (sender, receiver) = channel::<Uuid>(CHANNEL_BUFFER_SIZE);
48        let sender_for_reader = sender.clone();
49        ShortCircuitedBftBinding {
50            writer: Writer { sender },
51            // We store and reuse the first reader for the first node (see `access` below) so that there's no
52            // chance of receiving errors when sending/receiving due to all opposite handles being closed.
53            first_reader: Some(Reader {
54                receiver_factory: sender_for_reader,
55                receiver,
56            }),
57            readers: HashMap::new(),
58        }
59    }
60
61    async fn access(&mut self, node: Node) -> NodeAccess<Self::Writer, Self::Reader> {
62        match node {
63            Node::Write(WriteNode { endpoint: _ }) => NodeAccess::WriteOnlyAccess {
64                writer: self.writer.clone(),
65            },
66            Node::ReadWrite(ReadWriteNode {
67                node: WriteNode { endpoint },
68            }) => NodeAccess::ReadWriteAccess {
69                writer: self.writer.clone(),
70                reader: self
71                    .readers
72                    .entry(endpoint)
73                    .or_insert(match self.first_reader.take() {
74                        Some(reader) => reader,
75                        None => Reader {
76                            receiver_factory: self.writer.sender.clone(),
77                            receiver: self.writer.sender.subscribe(),
78                        },
79                    })
80                    .clone(),
81            },
82        }
83    }
84}
85
86#[async_trait]
87impl BftWriter for Writer {
88    async fn write(&mut self, key: Uuid) -> Result<()> {
89        match self.sender.send(key) {
90            Ok(_) => Ok(()),
91            Err(error) => Err(BftError::dynamic(format!(
92                "Error sending to the channel: {}",
93                error
94            ))),
95        }
96    }
97}
98
99#[async_trait]
100impl BftReader for Reader {
101    async fn read(&mut self) -> Result<Option<Uuid>> {
102        match self.receiver.recv().await {
103            Ok(uuid) => Ok(Some(uuid)),
104            Err(error) => Err(BftError::dynamic(format!(
105                "Error receiving from the channel: {}",
106                error
107            ))),
108        }
109    }
110}