bft_bench_core/
lib.rs

1//! A benchmarking framework geared towards BFT ordering libraries and platforms.
2
3use std::collections::{HashMap, HashSet};
4use std::time::{Duration, Instant};
5
6use histogram::Histogram;
7use tokio::task::spawn;
8use tokio::time::sleep;
9use tokio::{sync::broadcast, sync::mpsc};
10use uuid::Uuid;
11
12use reader::ReaderReply;
13use stats::*;
14use worker::WorkerRequest;
15use writer::WriterReply;
16pub use {bft_binding::*, config::*, result::*};
17
18pub mod bft_binding;
19pub mod config;
20pub mod result;
21pub mod stats;
22
23mod reader;
24mod worker;
25mod writer;
26
27pub const UUID_SIZE: usize = 16;
28
29const CONTROL_CHANNELS_BUFFER: usize = 1;
30const DATA_CHANNELS_BUFFER: usize = 128 * 1024 * 1024;
31
32enum WriteStatus {
33    Written {
34        write_start: Instant,
35        nodes_awaiting_read: HashSet<usize>,
36    },
37    ReadWhenWriteDataAvailable {
38        read_completion_instant: Instant,
39        node_idx: usize,
40    },
41}
42
43struct BftBenchmarkState {
44    in_progress_writes: HashMap<Uuid, WriteStatus>,
45    tx_writers_control: broadcast::Sender<WorkerRequest>,
46    rx_incoming_writes: mpsc::Receiver<WriterReply>,
47    tx_readers_control: broadcast::Sender<WorkerRequest>,
48    rx_incoming_reads: mpsc::Receiver<ReaderReply>,
49    stats: Stats,
50}
51
52/// Runs a benchmark using the given [`BftBinding`] and [`Config`] and produces [`Stats`].
53pub async fn run<B: BftBinding + 'static>(config: Config, mut bft_binding: B) -> Result<Reports> {
54    let (tx_writers_control, rx_writers_control) = broadcast::channel(CONTROL_CHANNELS_BUFFER);
55    let (tx_incoming_writes, rx_incoming_writes) = mpsc::channel(DATA_CHANNELS_BUFFER);
56    let (tx_readers_control, rx_readers_control) = broadcast::channel(CONTROL_CHANNELS_BUFFER);
57    let (tx_incoming_reads, rx_incoming_reads) = mpsc::channel(DATA_CHANNELS_BUFFER);
58
59    let mut state = BftBenchmarkState {
60        in_progress_writes: HashMap::<Uuid, WriteStatus>::new(),
61        tx_writers_control,
62        rx_incoming_writes,
63        tx_readers_control,
64        rx_incoming_reads,
65        stats: Stats::new(
66            Instant::now(),
67            config.nodes.len(),
68            config
69                .nodes
70                .iter()
71                .filter(|node| matches!(**node, Node::ReadWrite(_)))
72                .collect::<Vec<_>>()
73                .len(),
74        ),
75    };
76
77    let mut writers_to_go = config
78        .nodes
79        .iter()
80        .enumerate()
81        .map(|(node_idx, _)| node_idx)
82        .collect::<HashSet<_>>();
83
84    let mut write_accesses = HashMap::<usize, B::Writer>::new();
85    let mut read_accesses = HashMap::<usize, B::Reader>::new();
86    for (node_idx, node) in config.nodes.into_iter().enumerate() {
87        match bft_binding.access(node).await {
88            NodeAccess::ReadWriteAccess { reader, writer } => {
89                write_accesses.insert(node_idx, writer);
90                read_accesses.insert(node_idx, reader);
91            }
92            NodeAccess::WriteOnlyAccess { writer } => {
93                write_accesses.insert(node_idx, writer);
94            }
95        }
96    }
97
98    let read_indices = start_reads::<B>(
99        read_accesses,
100        rx_readers_control,
101        tx_incoming_reads,
102        config.read_grace,
103    );
104    let mut readers_to_go = read_indices.clone();
105
106    log::info!("Read nodes: {:?}", read_indices);
107
108    start_writes::<B>(
109        write_accesses,
110        config.write_interval,
111        rx_writers_control,
112        tx_incoming_writes,
113    );
114
115    let (tx_report, mut rx_report) = mpsc::channel(CONTROL_CHANNELS_BUFFER);
116    let tx_report_periodic = tx_report.clone();
117    spawn(async move {
118        sleep(config.run_duration).await;
119        tx_report
120            .send(true)
121            .await
122            .expect("Cannot send benchmark completion request");
123    });
124    let periodic_report_awaiter = config.report_interval.map_or(spawn(async move {}), |i| {
125        spawn(async move {
126            loop {
127                sleep(i).await;
128                if tx_report_periodic.send(false).await.is_err() {
129                    log::debug!("Report channel closed, stopping periodic report");
130                    break;
131                }
132            }
133        })
134    });
135
136    loop {
137        tokio::select! {
138            Some(write) = state.rx_incoming_writes.recv() => {
139                if handle_writer_reply(&mut state, &mut writers_to_go, &mut readers_to_go, &read_indices, write) {
140                    break;
141                }
142            }
143            Some(read) = state.rx_incoming_reads.recv() => {
144                if handle_reader_reply(&mut state, &mut writers_to_go, &mut readers_to_go, read) {
145                    break;
146                }
147            }
148            Some(complete) = rx_report.recv() => {
149                if complete {
150                    log::info!("Benchmark duration elapsed, requesting readers' and writers' completion");
151                    rx_report.close();
152                    request_stop(&mut state);
153                } else {
154                    log::info!("Periodic stats follow: {}", Into::<Reports>::into(&state.stats));
155                }
156            }
157        }
158    }
159
160    let _ = periodic_report_awaiter.await;
161
162    Ok((&state.stats).into())
163}
164
165fn start_writes<B: BftBinding + 'static>(
166    write_accesses: HashMap<usize, B::Writer>,
167    write_interval: Duration,
168    rx_writers_control: broadcast::Receiver<WorkerRequest>,
169    tx_incoming_writes: mpsc::Sender<WriterReply>,
170) {
171    log::info!("Starting writers");
172    for (node_idx, writer) in write_accesses.into_iter() {
173        log::info!("Starting writer for node {}", node_idx);
174        spawn(writer::write::<B::Writer>(
175            writer,
176            node_idx,
177            write_interval,
178            rx_writers_control.resubscribe(),
179            tx_incoming_writes.clone(),
180        ));
181    }
182}
183
184fn start_reads<B: BftBinding + 'static>(
185    read_accesses: HashMap<usize, B::Reader>,
186    rx_readers_control: broadcast::Receiver<WorkerRequest>,
187    tx_incoming_reads: mpsc::Sender<ReaderReply>,
188    read_grace: Duration,
189) -> HashSet<usize> {
190    log::info!("Starting readers");
191    let mut read_indices = HashSet::<usize>::new();
192    for (node_idx, reader) in read_accesses.into_iter() {
193        log::info!("Starting reader for node {}", node_idx);
194        read_indices.insert(node_idx);
195        spawn(reader::read::<B::Reader>(
196            node_idx,
197            reader,
198            rx_readers_control.resubscribe(),
199            tx_incoming_reads.clone(),
200            read_grace,
201        ));
202    }
203    read_indices
204}
205
206fn handle_writer_reply(
207    state: &mut BftBenchmarkState,
208    writers_to_go: &mut HashSet<usize>,
209    readers_to_go: &mut HashSet<usize>,
210    read_indices: &HashSet<usize>,
211    write: WriterReply,
212) -> bool {
213    match write {
214        WriterReply::SuccessfulWrite {
215            write_start,
216            write_duration,
217            uuid,
218            node_idx,
219        } => {
220            log::debug!("Writer {} succeeded write {}", node_idx, uuid);
221            let write_duration_nanos = u64_nanos(write_duration);
222            update_op_stat(
223                &mut state.stats.global_write,
224                state.stats.nodes_writes.get_mut(node_idx).unwrap(),
225                write_duration_nanos,
226                true,
227            );
228            match state.in_progress_writes.remove(&uuid) {
229                Some(WriteStatus::ReadWhenWriteDataAvailable {
230                    read_completion_instant,
231                    node_idx,
232                }) => {
233                    let nodes_awaiting_read = read_indices.clone();
234                    if !complete_read(
235                        "Writer",
236                        state,
237                        nodes_awaiting_read,
238                        &uuid,
239                        node_idx,
240                        &write_start,
241                        read_completion_instant - write_start,
242                    ) {
243                        log::error!(
244                            "Writer {} attempted read from non-read node {}",
245                            node_idx,
246                            uuid
247                        );
248                        panic!(
249                            "Writer {} attempted read from non-read node {}",
250                            node_idx, uuid
251                        );
252                    }
253                }
254                Some(WriteStatus::Written { .. }) => {
255                    log::error!(
256                        "Writer {} trying to perform a duplicate write {}",
257                        node_idx,
258                        uuid
259                    );
260                    panic!(
261                        "Writer {} trying to perform a duplicate write {}",
262                        node_idx, uuid
263                    );
264                }
265                None => {
266                    state.in_progress_writes.insert(
267                        uuid,
268                        WriteStatus::Written {
269                            write_start,
270                            nodes_awaiting_read: read_indices.clone(),
271                        },
272                    );
273                }
274            }
275        }
276        WriterReply::FailedWrite {
277            write_duration,
278            uuid,
279            node_idx,
280        } => {
281            log::error!("Writer {} failed write {}", node_idx, uuid);
282            let write_duration_nanos = u64_nanos(write_duration);
283            update_op_stat(
284                &mut state.stats.global_write,
285                state.stats.nodes_writes.get_mut(node_idx).unwrap(),
286                write_duration_nanos,
287                false,
288            );
289        }
290        WriterReply::Completed { node_idx } => {
291            log::info!("Writer {} completed", node_idx);
292            writers_to_go.remove(&node_idx);
293            log::info!("Writers to go: {:?}", writers_to_go);
294            log::info!("Readers to go: {:?}", readers_to_go);
295            if writers_to_go.is_empty() && readers_to_go.is_empty() {
296                return true;
297            }
298        }
299    }
300
301    false
302}
303
304fn handle_reader_reply(
305    state: &mut BftBenchmarkState,
306    writers_to_go: &mut HashSet<usize>,
307    readers_to_go: &mut HashSet<usize>,
308    read: ReaderReply,
309) -> bool {
310    match read {
311        ReaderReply::SuccessfulRead {
312            read_completion_instant,
313            read_duration,
314            uuid,
315            node_idx,
316        } => {
317            log::debug!("Reader {} succeeded read {}", node_idx, uuid);
318            let read_duration_nanos = u64_nanos(read_duration);
319            update_op_stat(
320                &mut state.stats.global_read.op,
321                &mut state.stats.nodes_reads.get_mut(node_idx).unwrap().op,
322                read_duration_nanos,
323                true,
324            );
325            match state.in_progress_writes.remove(&uuid) {
326                Some(WriteStatus::Written {
327                    write_start,
328                    nodes_awaiting_read,
329                }) => {
330                    if !complete_read(
331                        "Reader",
332                        state,
333                        nodes_awaiting_read,
334                        &uuid,
335                        node_idx,
336                        &write_start,
337                        write_start.elapsed(),
338                    ) {
339                        log::error!("Duplicate read {} from reader {}: ", uuid, node_idx);
340                        panic!("Duplicate read {} from reader {}: ", uuid, node_idx);
341                    }
342                }
343
344                Some(WriteStatus::ReadWhenWriteDataAvailable { .. }) => {
345                    log::error!("Duplicate read {} from reader {}: ", uuid, node_idx);
346                    panic!("Duplicate read {} from reader {}: ", uuid, node_idx);
347                }
348
349                None => {
350                    log::debug!(
351                        "Reader {} found that write data for {} is not yet available",
352                        node_idx,
353                        uuid
354                    );
355                    state.in_progress_writes.insert(
356                        uuid,
357                        WriteStatus::ReadWhenWriteDataAvailable {
358                            read_completion_instant,
359                            node_idx,
360                        },
361                    );
362                }
363            }
364            log::debug!(
365                "In-progress writes count after Reader {} read {}: {}",
366                node_idx,
367                uuid,
368                state.in_progress_writes.len()
369            );
370        }
371
372        ReaderReply::FailedRead {
373            read_duration,
374            bft_error,
375            node_idx,
376        } => {
377            log::error!("Reader {} failed read, error: {}", node_idx, bft_error);
378            let read_duration_nanos = u64_nanos(read_duration);
379            update_op_stat(
380                &mut state.stats.global_read.op,
381                &mut state.stats.nodes_reads.get_mut(node_idx).unwrap().op,
382                read_duration_nanos,
383                false,
384            );
385        }
386
387        ReaderReply::Completed { node_idx } => {
388            log::info!("Reader {} completed", node_idx);
389            readers_to_go.remove(&node_idx);
390            if writers_to_go.is_empty() && readers_to_go.is_empty() {
391                return true;
392            }
393        }
394    }
395
396    false
397}
398
399fn complete_read(
400    role: &'static str,
401    state: &mut BftBenchmarkState,
402    mut nodes_awaiting_read: HashSet<usize>,
403    uuid: &Uuid,
404    node_idx: usize,
405    write_start: &Instant,
406    node_round_trip: Duration,
407) -> bool {
408    let node_round_trip_nanos = u64_nanos(node_round_trip);
409
410    log::debug!(
411        "{} {} completed in-progress transaction {} in {} nanos",
412        role,
413        node_idx,
414        uuid,
415        node_round_trip_nanos
416    );
417
418    if nodes_awaiting_read.remove(&node_idx) {
419        let now = Instant::now();
420        update_stat(
421            &mut state
422                .stats
423                .nodes_reads
424                .get_mut(node_idx)
425                .unwrap()
426                .round_trip,
427            now,
428            node_round_trip_nanos,
429        );
430        if nodes_awaiting_read.is_empty() {
431            // Last read
432            log::debug!(
433                "{} {} performed last read for transaction {}",
434                role,
435                node_idx,
436                uuid
437            );
438            update_stat(
439                &mut state.stats.global_read.round_trip,
440                now,
441                node_round_trip_nanos,
442            );
443        } else {
444            log::debug!(
445                "After read from {} {}, {} reads still pending for transaction {}",
446                role,
447                node_idx,
448                nodes_awaiting_read.len(),
449                uuid
450            );
451            state.in_progress_writes.insert(
452                *uuid,
453                WriteStatus::Written {
454                    write_start: *write_start,
455                    nodes_awaiting_read,
456                },
457            );
458        };
459        true
460    } else {
461        false
462    }
463}
464
465fn request_stop(state: &mut BftBenchmarkState) {
466    log::info!("Signalling workers to stop");
467    state
468        .tx_writers_control
469        .send(WorkerRequest::Stop())
470        .expect("Internal error: cannot send writers completion request");
471    state
472        .tx_readers_control
473        .send(WorkerRequest::Stop())
474        .expect("Internal error: cannot send readers completion request");
475}
476
477fn update_op_stat(
478    global_op_stat: &mut OpStat,
479    node_op_stat: &mut OpStat,
480    duration_nanos: u64,
481    ok: bool,
482) {
483    let now = Instant::now();
484    if ok {
485        update_stat(&mut global_op_stat.successful, now, duration_nanos);
486        update_stat(&mut node_op_stat.successful, now, duration_nanos);
487    } else {
488        update_stat(&mut global_op_stat.failed, now, duration_nanos);
489        update_stat(&mut node_op_stat.failed, now, duration_nanos);
490    }
491}
492
493fn update_stat(stat: &mut Stat, now: Instant, duration_nanos: u64) {
494    increment_histogram(&mut stat.histogram, duration_nanos / 1000);
495    stat.counter.count += 1;
496    stat.counter.now = now;
497}
498
499fn increment_histogram(histo: &mut Histogram, elapsed_micros: u64) {
500    match histo.increment(elapsed_micros) {
501        Ok(_) => {}
502        Err(_) => log::error!(
503            "Internal error: cannot increment histogram for {} micros",
504            elapsed_micros
505        ),
506    }
507}
508
509fn u64_nanos(duration: Duration) -> u64 {
510    u64::try_from(duration.as_nanos()).expect("Internal error: duration nanos don't fit 64 bits")
511}