bft_bench_shortcircuit/
lib.rs1use std::collections::HashMap;
2
3use async_trait::async_trait;
4
5use uuid::Uuid;
6
7use bft_bench_core::{
8 BftBinding, BftError, BftReader, BftWriter, Config, Node, NodeAccess, NodeEndpoint,
9 ReadWriteNode, Result, WriteNode,
10};
11use tokio::sync::broadcast::{Receiver, Sender, channel};
12
13const CHANNEL_BUFFER_SIZE: usize = 128 * 1024 * 1024;
14
15pub struct ShortCircuitedBftBinding {
16 writer: Writer,
17 first_reader: Option<Reader>,
18 readers: HashMap<NodeEndpoint, Reader>,
19}
20
21#[derive(Clone)]
22pub struct Writer {
23 sender: Sender<Uuid>,
24}
25
26pub struct Reader {
27 receiver_factory: Sender<Uuid>,
28 receiver: Receiver<Uuid>,
29}
30
31impl Clone for Reader {
32 fn clone(&self) -> Self {
33 Self {
34 receiver_factory: self.receiver_factory.clone(),
35 receiver: self.receiver_factory.subscribe(),
36 }
37 }
38}
39
40#[async_trait]
41impl BftBinding for ShortCircuitedBftBinding {
42 type Writer = Writer;
43
44 type Reader = Reader;
45
46 fn new(_: &Config) -> Self {
47 let (sender, receiver) = channel::<Uuid>(CHANNEL_BUFFER_SIZE);
48 let sender_for_reader = sender.clone();
49 ShortCircuitedBftBinding {
50 writer: Writer { sender },
51 first_reader: Some(Reader {
54 receiver_factory: sender_for_reader,
55 receiver,
56 }),
57 readers: HashMap::new(),
58 }
59 }
60
61 async fn access(&mut self, node: Node) -> NodeAccess<Self::Writer, Self::Reader> {
62 match node {
63 Node::Write(WriteNode { endpoint: _ }) => NodeAccess::WriteOnlyAccess {
64 writer: self.writer.clone(),
65 },
66 Node::ReadWrite(ReadWriteNode {
67 node: WriteNode { endpoint },
68 }) => NodeAccess::ReadWriteAccess {
69 writer: self.writer.clone(),
70 reader: self
71 .readers
72 .entry(endpoint)
73 .or_insert(match self.first_reader.take() {
74 Some(reader) => reader,
75 None => Reader {
76 receiver_factory: self.writer.sender.clone(),
77 receiver: self.writer.sender.subscribe(),
78 },
79 })
80 .clone(),
81 },
82 }
83 }
84}
85
86#[async_trait]
87impl BftWriter for Writer {
88 async fn write(&mut self, key: Uuid) -> Result<()> {
89 match self.sender.send(key) {
90 Ok(_) => Ok(()),
91 Err(error) => Err(BftError::dynamic(format!(
92 "Error sending to the channel: {}",
93 error
94 ))),
95 }
96 }
97}
98
99#[async_trait]
100impl BftReader for Reader {
101 async fn read(&mut self) -> Result<Option<Uuid>> {
102 match self.receiver.recv().await {
103 Ok(uuid) => Ok(Some(uuid)),
104 Err(error) => Err(BftError::dynamic(format!(
105 "Error receiving from the channel: {}",
106 error
107 ))),
108 }
109 }
110}