bft_bench_echo/
echo_bft_binding.rs1use async_stream::stream;
2use async_trait::async_trait;
3use tokio::sync::mpsc::{Sender, channel};
4use tokio_stream::StreamExt;
5use tonic::Streaming;
6use uuid::{Bytes, Uuid};
7
8use bft_bench_core::{
9 BftBinding, BftError, BftReader, BftWriter, Config, Node, NodeAccess, ReadWriteNode, Result,
10 WriteNode,
11};
12
13pub mod pb {
14 tonic::include_proto!("grpc.echo");
15}
16
17const CHANNEL_BUFFER_SIZE: usize = 128 * 1024 * 1024;
18
19pub struct EchoBftBinding {}
20
21#[derive(Clone)]
22pub struct Writer {
23 sender: Sender<pb::Tx>,
24}
25
26pub struct Reader {
27 response_stream: Streaming<pb::Tx>,
28}
29
30#[async_trait]
31impl BftBinding for EchoBftBinding {
32 type Writer = Writer;
33
34 type Reader = Reader;
35
36 fn new(config: &Config) -> Self {
37 if config.nodes.len() != 1 {
38 unsupported();
39 }
40 EchoBftBinding {}
41 }
42
43 async fn access(&mut self, node: Node) -> NodeAccess<Self::Writer, Self::Reader> {
44 match node {
45 Node::Write(WriteNode { endpoint: _ }) => {
46 unsupported();
47 }
48 Node::ReadWrite(ReadWriteNode {
49 node: WriteNode { endpoint },
50 }) => {
51 let (sender, mut receiver) = channel::<pb::Tx>(CHANNEL_BUFFER_SIZE);
52 let in_stream = stream! {
53 while let Some(tx) = receiver.recv().await {
54 yield tx
55 }
56 };
57
58 let mut client = pb::service_client::ServiceClient::connect(format!(
59 "http://{}:{}",
60 endpoint.host, endpoint.port
61 ))
62 .await
63 .unwrap();
64 let response = client.echo(in_stream).await.unwrap();
65
66 let response_stream = response.into_inner();
67
68 NodeAccess::ReadWriteAccess {
69 writer: Writer { sender },
70 reader: Reader { response_stream },
71 }
72 }
73 }
74 }
75}
76
77#[async_trait]
78impl BftWriter for Writer {
79 async fn write(&mut self, key: Uuid) -> Result<()> {
80 match self
81 .sender
82 .send(pb::Tx {
83 tx_id: Vec::from(key.to_bytes_le()),
84 value: vec![0; 1000],
85 })
86 .await
87 {
88 Ok(_) => Ok(()),
89 Err(error) => Err(BftError::dynamic(format!(
90 "Error sending to the channel: {}",
91 error
92 ))),
93 }
94 }
95}
96
97#[async_trait]
98impl BftReader for Reader {
99 async fn read(&mut self) -> Result<Option<Uuid>> {
100 match self.response_stream.next().await {
101 Some(Ok(tx)) => {
102 let mut arr: Bytes = [0u8; 16];
103 arr[..16].copy_from_slice(&tx.tx_id[..16]);
104 Ok(Some(Uuid::from_bytes_le(arr)))
105 }
106 Some(Err(error)) => Err(BftError::dynamic(format!(
107 "Error receiving from the stream: {}",
108 error
109 ))),
110 None => Ok(None), }
112 }
113}
114
115fn unsupported() -> ! {
116 panic!("Only one read-write node is supported")
117}