bft_bench_echo/
echo_bft_binding.rs

1use async_stream::stream;
2use async_trait::async_trait;
3use tokio::sync::mpsc::{Sender, channel};
4use tokio_stream::StreamExt;
5use tonic::Streaming;
6use uuid::{Bytes, Uuid};
7
8use bft_bench_core::{
9    BftBinding, BftError, BftReader, BftWriter, Config, Node, NodeAccess, ReadWriteNode, Result,
10    WriteNode,
11};
12
13pub mod pb {
14    tonic::include_proto!("grpc.echo");
15}
16
17const CHANNEL_BUFFER_SIZE: usize = 128 * 1024 * 1024;
18
19pub struct EchoBftBinding {}
20
21#[derive(Clone)]
22pub struct Writer {
23    sender: Sender<pb::Tx>,
24}
25
26pub struct Reader {
27    response_stream: Streaming<pb::Tx>,
28}
29
30#[async_trait]
31impl BftBinding for EchoBftBinding {
32    type Writer = Writer;
33
34    type Reader = Reader;
35
36    fn new(config: &Config) -> Self {
37        if config.nodes.len() != 1 {
38            unsupported();
39        }
40        EchoBftBinding {}
41    }
42
43    async fn access(&mut self, node: Node) -> NodeAccess<Self::Writer, Self::Reader> {
44        match node {
45            Node::Write(WriteNode { endpoint: _ }) => {
46                unsupported();
47            }
48            Node::ReadWrite(ReadWriteNode {
49                node: WriteNode { endpoint },
50            }) => {
51                let (sender, mut receiver) = channel::<pb::Tx>(CHANNEL_BUFFER_SIZE);
52                let in_stream = stream! {
53                    while let Some(tx) = receiver.recv().await {
54                        yield tx
55                    }
56                };
57
58                let mut client = pb::service_client::ServiceClient::connect(format!(
59                    "http://{}:{}",
60                    endpoint.host, endpoint.port
61                ))
62                .await
63                .unwrap();
64                let response = client.echo(in_stream).await.unwrap();
65
66                let response_stream = response.into_inner();
67
68                NodeAccess::ReadWriteAccess {
69                    writer: Writer { sender },
70                    reader: Reader { response_stream },
71                }
72            }
73        }
74    }
75}
76
77#[async_trait]
78impl BftWriter for Writer {
79    async fn write(&mut self, key: Uuid) -> Result<()> {
80        match self
81            .sender
82            .send(pb::Tx {
83                tx_id: Vec::from(key.to_bytes_le()),
84                value: vec![0; 1000],
85            })
86            .await
87        {
88            Ok(_) => Ok(()),
89            Err(error) => Err(BftError::dynamic(format!(
90                "Error sending to the channel: {}",
91                error
92            ))),
93        }
94    }
95}
96
97#[async_trait]
98impl BftReader for Reader {
99    async fn read(&mut self) -> Result<Option<Uuid>> {
100        match self.response_stream.next().await {
101            Some(Ok(tx)) => {
102                let mut arr: Bytes = [0u8; 16];
103                arr[..16].copy_from_slice(&tx.tx_id[..16]);
104                Ok(Some(Uuid::from_bytes_le(arr)))
105            }
106            Some(Err(error)) => Err(BftError::dynamic(format!(
107                "Error receiving from the stream: {}",
108                error
109            ))),
110            None => Ok(None), // stream closed
111        }
112    }
113}
114
115fn unsupported() -> ! {
116    panic!("Only one read-write node is supported")
117}