1use std::collections::{HashMap, HashSet};
4use std::time::{Duration, Instant};
5
6use histogram::Histogram;
7use tokio::task::spawn;
8use tokio::time::sleep;
9use tokio::{sync::broadcast, sync::mpsc};
10use uuid::Uuid;
11
12use reader::ReaderReply;
13use stats::*;
14use worker::WorkerRequest;
15use writer::WriterReply;
16pub use {bft_binding::*, config::*, result::*};
17
18pub mod bft_binding;
19pub mod config;
20pub mod result;
21pub mod stats;
22
23mod reader;
24mod worker;
25mod writer;
26
27pub const UUID_SIZE: usize = 16;
28
29const CONTROL_CHANNELS_BUFFER: usize = 1;
30const DATA_CHANNELS_BUFFER: usize = 128 * 1024 * 1024;
31
32enum WriteStatus {
33 Written {
34 write_start: Instant,
35 nodes_awaiting_read: HashSet<usize>,
36 },
37 ReadWhenWriteDataAvailable {
38 read_completion_instant: Instant,
39 node_idx: usize,
40 },
41}
42
43struct BftBenchmarkState {
44 in_progress_writes: HashMap<Uuid, WriteStatus>,
45 tx_writers_control: broadcast::Sender<WorkerRequest>,
46 rx_incoming_writes: mpsc::Receiver<WriterReply>,
47 tx_readers_control: broadcast::Sender<WorkerRequest>,
48 rx_incoming_reads: mpsc::Receiver<ReaderReply>,
49 stats: Stats,
50}
51
52pub async fn run<B: BftBinding + 'static>(config: Config, mut bft_binding: B) -> Result<Reports> {
54 let (tx_writers_control, rx_writers_control) = broadcast::channel(CONTROL_CHANNELS_BUFFER);
55 let (tx_incoming_writes, rx_incoming_writes) = mpsc::channel(DATA_CHANNELS_BUFFER);
56 let (tx_readers_control, rx_readers_control) = broadcast::channel(CONTROL_CHANNELS_BUFFER);
57 let (tx_incoming_reads, rx_incoming_reads) = mpsc::channel(DATA_CHANNELS_BUFFER);
58
59 let mut state = BftBenchmarkState {
60 in_progress_writes: HashMap::<Uuid, WriteStatus>::new(),
61 tx_writers_control,
62 rx_incoming_writes,
63 tx_readers_control,
64 rx_incoming_reads,
65 stats: Stats::new(
66 Instant::now(),
67 config.nodes.len(),
68 config
69 .nodes
70 .iter()
71 .filter(|node| matches!(**node, Node::ReadWrite(_)))
72 .collect::<Vec<_>>()
73 .len(),
74 ),
75 };
76
77 let mut writers_to_go = config
78 .nodes
79 .iter()
80 .enumerate()
81 .map(|(node_idx, _)| node_idx)
82 .collect::<HashSet<_>>();
83
84 let mut write_accesses = HashMap::<usize, B::Writer>::new();
85 let mut read_accesses = HashMap::<usize, B::Reader>::new();
86 for (node_idx, node) in config.nodes.into_iter().enumerate() {
87 match bft_binding.access(node).await {
88 NodeAccess::ReadWriteAccess { reader, writer } => {
89 write_accesses.insert(node_idx, writer);
90 read_accesses.insert(node_idx, reader);
91 }
92 NodeAccess::WriteOnlyAccess { writer } => {
93 write_accesses.insert(node_idx, writer);
94 }
95 }
96 }
97
98 let read_indices = start_reads::<B>(
99 read_accesses,
100 rx_readers_control,
101 tx_incoming_reads,
102 config.read_grace,
103 );
104 let mut readers_to_go = read_indices.clone();
105
106 log::info!("Read nodes: {:?}", read_indices);
107
108 start_writes::<B>(
109 write_accesses,
110 config.write_interval,
111 rx_writers_control,
112 tx_incoming_writes,
113 );
114
115 let (tx_report, mut rx_report) = mpsc::channel(CONTROL_CHANNELS_BUFFER);
116 let tx_report_periodic = tx_report.clone();
117 spawn(async move {
118 sleep(config.run_duration).await;
119 tx_report
120 .send(true)
121 .await
122 .expect("Cannot send benchmark completion request");
123 });
124 let periodic_report_awaiter = config.report_interval.map_or(spawn(async move {}), |i| {
125 spawn(async move {
126 loop {
127 sleep(i).await;
128 if tx_report_periodic.send(false).await.is_err() {
129 log::debug!("Report channel closed, stopping periodic report");
130 break;
131 }
132 }
133 })
134 });
135
136 loop {
137 tokio::select! {
138 Some(write) = state.rx_incoming_writes.recv() => {
139 if handle_writer_reply(&mut state, &mut writers_to_go, &mut readers_to_go, &read_indices, write) {
140 break;
141 }
142 }
143 Some(read) = state.rx_incoming_reads.recv() => {
144 if handle_reader_reply(&mut state, &mut writers_to_go, &mut readers_to_go, read) {
145 break;
146 }
147 }
148 Some(complete) = rx_report.recv() => {
149 if complete {
150 log::info!("Benchmark duration elapsed, requesting readers' and writers' completion");
151 rx_report.close();
152 request_stop(&mut state);
153 } else {
154 log::info!("Periodic stats follow: {}", Into::<Reports>::into(&state.stats));
155 }
156 }
157 }
158 }
159
160 let _ = periodic_report_awaiter.await;
161
162 Ok((&state.stats).into())
163}
164
165fn start_writes<B: BftBinding + 'static>(
166 write_accesses: HashMap<usize, B::Writer>,
167 write_interval: Duration,
168 rx_writers_control: broadcast::Receiver<WorkerRequest>,
169 tx_incoming_writes: mpsc::Sender<WriterReply>,
170) {
171 log::info!("Starting writers");
172 for (node_idx, writer) in write_accesses.into_iter() {
173 log::info!("Starting writer for node {}", node_idx);
174 spawn(writer::write::<B::Writer>(
175 writer,
176 node_idx,
177 write_interval,
178 rx_writers_control.resubscribe(),
179 tx_incoming_writes.clone(),
180 ));
181 }
182}
183
184fn start_reads<B: BftBinding + 'static>(
185 read_accesses: HashMap<usize, B::Reader>,
186 rx_readers_control: broadcast::Receiver<WorkerRequest>,
187 tx_incoming_reads: mpsc::Sender<ReaderReply>,
188 read_grace: Duration,
189) -> HashSet<usize> {
190 log::info!("Starting readers");
191 let mut read_indices = HashSet::<usize>::new();
192 for (node_idx, reader) in read_accesses.into_iter() {
193 log::info!("Starting reader for node {}", node_idx);
194 read_indices.insert(node_idx);
195 spawn(reader::read::<B::Reader>(
196 node_idx,
197 reader,
198 rx_readers_control.resubscribe(),
199 tx_incoming_reads.clone(),
200 read_grace,
201 ));
202 }
203 read_indices
204}
205
206fn handle_writer_reply(
207 state: &mut BftBenchmarkState,
208 writers_to_go: &mut HashSet<usize>,
209 readers_to_go: &mut HashSet<usize>,
210 read_indices: &HashSet<usize>,
211 write: WriterReply,
212) -> bool {
213 match write {
214 WriterReply::SuccessfulWrite {
215 write_start,
216 write_duration,
217 uuid,
218 node_idx,
219 } => {
220 log::debug!("Writer {} succeeded write {}", node_idx, uuid);
221 let write_duration_nanos = u64_nanos(write_duration);
222 update_op_stat(
223 &mut state.stats.global_write,
224 state.stats.nodes_writes.get_mut(node_idx).unwrap(),
225 write_duration_nanos,
226 true,
227 );
228 match state.in_progress_writes.remove(&uuid) {
229 Some(WriteStatus::ReadWhenWriteDataAvailable {
230 read_completion_instant,
231 node_idx,
232 }) => {
233 let nodes_awaiting_read = read_indices.clone();
234 if !complete_read(
235 "Writer",
236 state,
237 nodes_awaiting_read,
238 &uuid,
239 node_idx,
240 &write_start,
241 read_completion_instant - write_start,
242 ) {
243 log::error!(
244 "Writer {} attempted read from non-read node {}",
245 node_idx,
246 uuid
247 );
248 panic!(
249 "Writer {} attempted read from non-read node {}",
250 node_idx, uuid
251 );
252 }
253 }
254 Some(WriteStatus::Written { .. }) => {
255 log::error!(
256 "Writer {} trying to perform a duplicate write {}",
257 node_idx,
258 uuid
259 );
260 panic!(
261 "Writer {} trying to perform a duplicate write {}",
262 node_idx, uuid
263 );
264 }
265 None => {
266 state.in_progress_writes.insert(
267 uuid,
268 WriteStatus::Written {
269 write_start,
270 nodes_awaiting_read: read_indices.clone(),
271 },
272 );
273 }
274 }
275 }
276 WriterReply::FailedWrite {
277 write_duration,
278 uuid,
279 node_idx,
280 } => {
281 log::error!("Writer {} failed write {}", node_idx, uuid);
282 let write_duration_nanos = u64_nanos(write_duration);
283 update_op_stat(
284 &mut state.stats.global_write,
285 state.stats.nodes_writes.get_mut(node_idx).unwrap(),
286 write_duration_nanos,
287 false,
288 );
289 }
290 WriterReply::Completed { node_idx } => {
291 log::info!("Writer {} completed", node_idx);
292 writers_to_go.remove(&node_idx);
293 log::info!("Writers to go: {:?}", writers_to_go);
294 log::info!("Readers to go: {:?}", readers_to_go);
295 if writers_to_go.is_empty() && readers_to_go.is_empty() {
296 return true;
297 }
298 }
299 }
300
301 false
302}
303
304fn handle_reader_reply(
305 state: &mut BftBenchmarkState,
306 writers_to_go: &mut HashSet<usize>,
307 readers_to_go: &mut HashSet<usize>,
308 read: ReaderReply,
309) -> bool {
310 match read {
311 ReaderReply::SuccessfulRead {
312 read_completion_instant,
313 read_duration,
314 uuid,
315 node_idx,
316 } => {
317 log::debug!("Reader {} succeeded read {}", node_idx, uuid);
318 let read_duration_nanos = u64_nanos(read_duration);
319 update_op_stat(
320 &mut state.stats.global_read.op,
321 &mut state.stats.nodes_reads.get_mut(node_idx).unwrap().op,
322 read_duration_nanos,
323 true,
324 );
325 match state.in_progress_writes.remove(&uuid) {
326 Some(WriteStatus::Written {
327 write_start,
328 nodes_awaiting_read,
329 }) => {
330 if !complete_read(
331 "Reader",
332 state,
333 nodes_awaiting_read,
334 &uuid,
335 node_idx,
336 &write_start,
337 write_start.elapsed(),
338 ) {
339 log::error!("Duplicate read {} from reader {}: ", uuid, node_idx);
340 panic!("Duplicate read {} from reader {}: ", uuid, node_idx);
341 }
342 }
343
344 Some(WriteStatus::ReadWhenWriteDataAvailable { .. }) => {
345 log::error!("Duplicate read {} from reader {}: ", uuid, node_idx);
346 panic!("Duplicate read {} from reader {}: ", uuid, node_idx);
347 }
348
349 None => {
350 log::debug!(
351 "Reader {} found that write data for {} is not yet available",
352 node_idx,
353 uuid
354 );
355 state.in_progress_writes.insert(
356 uuid,
357 WriteStatus::ReadWhenWriteDataAvailable {
358 read_completion_instant,
359 node_idx,
360 },
361 );
362 }
363 }
364 log::debug!(
365 "In-progress writes count after Reader {} read {}: {}",
366 node_idx,
367 uuid,
368 state.in_progress_writes.len()
369 );
370 }
371
372 ReaderReply::FailedRead {
373 read_duration,
374 bft_error,
375 node_idx,
376 } => {
377 log::error!("Reader {} failed read, error: {}", node_idx, bft_error);
378 let read_duration_nanos = u64_nanos(read_duration);
379 update_op_stat(
380 &mut state.stats.global_read.op,
381 &mut state.stats.nodes_reads.get_mut(node_idx).unwrap().op,
382 read_duration_nanos,
383 false,
384 );
385 }
386
387 ReaderReply::Completed { node_idx } => {
388 log::info!("Reader {} completed", node_idx);
389 readers_to_go.remove(&node_idx);
390 if writers_to_go.is_empty() && readers_to_go.is_empty() {
391 return true;
392 }
393 }
394 }
395
396 false
397}
398
399fn complete_read(
400 role: &'static str,
401 state: &mut BftBenchmarkState,
402 mut nodes_awaiting_read: HashSet<usize>,
403 uuid: &Uuid,
404 node_idx: usize,
405 write_start: &Instant,
406 node_round_trip: Duration,
407) -> bool {
408 let node_round_trip_nanos = u64_nanos(node_round_trip);
409
410 log::debug!(
411 "{} {} completed in-progress transaction {} in {} nanos",
412 role,
413 node_idx,
414 uuid,
415 node_round_trip_nanos
416 );
417
418 if nodes_awaiting_read.remove(&node_idx) {
419 let now = Instant::now();
420 update_stat(
421 &mut state
422 .stats
423 .nodes_reads
424 .get_mut(node_idx)
425 .unwrap()
426 .round_trip,
427 now,
428 node_round_trip_nanos,
429 );
430 if nodes_awaiting_read.is_empty() {
431 log::debug!(
433 "{} {} performed last read for transaction {}",
434 role,
435 node_idx,
436 uuid
437 );
438 update_stat(
439 &mut state.stats.global_read.round_trip,
440 now,
441 node_round_trip_nanos,
442 );
443 } else {
444 log::debug!(
445 "After read from {} {}, {} reads still pending for transaction {}",
446 role,
447 node_idx,
448 nodes_awaiting_read.len(),
449 uuid
450 );
451 state.in_progress_writes.insert(
452 *uuid,
453 WriteStatus::Written {
454 write_start: *write_start,
455 nodes_awaiting_read,
456 },
457 );
458 };
459 true
460 } else {
461 false
462 }
463}
464
465fn request_stop(state: &mut BftBenchmarkState) {
466 log::info!("Signalling workers to stop");
467 state
468 .tx_writers_control
469 .send(WorkerRequest::Stop())
470 .expect("Internal error: cannot send writers completion request");
471 state
472 .tx_readers_control
473 .send(WorkerRequest::Stop())
474 .expect("Internal error: cannot send readers completion request");
475}
476
477fn update_op_stat(
478 global_op_stat: &mut OpStat,
479 node_op_stat: &mut OpStat,
480 duration_nanos: u64,
481 ok: bool,
482) {
483 let now = Instant::now();
484 if ok {
485 update_stat(&mut global_op_stat.successful, now, duration_nanos);
486 update_stat(&mut node_op_stat.successful, now, duration_nanos);
487 } else {
488 update_stat(&mut global_op_stat.failed, now, duration_nanos);
489 update_stat(&mut node_op_stat.failed, now, duration_nanos);
490 }
491}
492
493fn update_stat(stat: &mut Stat, now: Instant, duration_nanos: u64) {
494 increment_histogram(&mut stat.histogram, duration_nanos / 1000);
495 stat.counter.count += 1;
496 stat.counter.now = now;
497}
498
499fn increment_histogram(histo: &mut Histogram, elapsed_micros: u64) {
500 match histo.increment(elapsed_micros) {
501 Ok(_) => {}
502 Err(_) => log::error!(
503 "Internal error: cannot increment histogram for {} micros",
504 elapsed_micros
505 ),
506 }
507}
508
509fn u64_nanos(duration: Duration) -> u64 {
510 u64::try_from(duration.as_nanos()).expect("Internal error: duration nanos don't fit 64 bits")
511}