bftgrid_sim/
lib.rs

1//! Single-threaded simulation of a network of actors.
2
3use std::{
4    any::Any,
5    collections::{BinaryHeap, HashMap},
6    fmt::Debug,
7    future::Future,
8    marker::PhantomData,
9    mem,
10    ops::{Add, ControlFlow},
11    sync::{Arc, Mutex},
12    time::{Duration, Instant},
13};
14
15use bftgrid_core::actor::{
16    ActorControl, ActorMsg, ActorRef, ActorSystemHandle, AnActorMsg, DynMsgHandler,
17    MessageNotSupported, P2PNetworkClient, P2PNetworkResult, UntypedMsgHandler,
18};
19use rand_chacha::{
20    ChaCha8Rng,
21    rand_core::{RngCore, SeedableRng},
22};
23
24pub type UntypedHandlerBox = Box<dyn UntypedMsgHandler>;
25
26struct SimulationMsgHandler<MsgT> {
27    handler: DynMsgHandler<MsgT>,
28}
29
30/// A blanket [`UntypedMsgHandler`] implementation for any [`MsgHandler<MsgT>`]
31///  to allow any boxed typed actor to be used as a network input actor.
32// The manual supertrait upcasting approach
33//  (https://quinedot.github.io/rust-learning/dyn-trait-combining.html#manual-supertrait-upcasting)
34//  would not work in this case due to the `MsgT` type parameter being
35//  method-bound rather than trait-bound in a blanket implementation for `T: TypedMsgHandler<MsgT>`.
36//  Somewhat sadly, this means that a generic [`UntypedMsgHandler`] must be costructed via a second level
37//  of boxing (`Box<dyn UntypedMsgHandler>`).
38impl<MsgT> UntypedMsgHandler for SimulationMsgHandler<MsgT>
39where
40    MsgT: ActorMsg,
41{
42    fn receive_untyped(
43        &mut self,
44        message: AnActorMsg,
45    ) -> Result<Option<ActorControl>, MessageNotSupported> {
46        match (message.clone() as Box<dyn Any>).downcast::<MsgT>() {
47            Ok(typed_message) => Result::Ok(self.handler.receive(*typed_message)),
48            Err(_) => {
49                // MsgT may be a trait object, so we retry after boxing the message
50                match (Box::new(message) as Box<dyn Any>).downcast::<MsgT>() {
51                    Ok(typed_message) => Result::Ok(self.handler.receive(*typed_message)),
52                    Err(_) => Result::Err(MessageNotSupported()),
53                }
54            }
55        }
56    }
57}
58
59const SEED: u64 = 10;
60const MAX_RANDOM_DURATION: Duration = Duration::from_secs(1);
61
62pub struct SimulatedClock {
63    current_instant: Instant,
64}
65
66#[derive(Clone, Debug)]
67pub struct InternalEvent {
68    node: Arc<String>,
69    handler: Arc<String>,
70    event: Box<dyn ActorMsg>,
71    delay: Option<Duration>,
72}
73
74#[derive(Clone, Debug)]
75pub enum SimulationEvent {
76    ClientSend {
77        // Injected before the simulation starts, is assigned an instant and reinjected as ClientRequest
78        to_node: Arc<String>,
79        event: Box<dyn ActorMsg>,
80    },
81    ClientRequest {
82        // Sent as internal event to a node's client_request_handler
83        to_node: Arc<String>,
84        event: Box<dyn ActorMsg>,
85    },
86    P2PSend {
87        // Produced by the simulated network, is assigned an instant and reinjected as P2PRequest
88        to_node: Arc<String>,
89        event: Box<dyn ActorMsg>,
90    },
91    P2PRequest {
92        // Sent as internal event to a node's p2p_request_handler
93        node: Arc<String>,
94        event: Box<dyn ActorMsg>,
95    },
96    Internal {
97        // Produced by internal modules for internal modules
98        event: InternalEvent,
99    },
100    SimulationEnd,
101}
102
103#[derive(Clone, Debug)]
104pub struct SimulationEventAtInstant {
105    instant: Instant,
106    event: SimulationEvent,
107}
108
109impl PartialEq for SimulationEventAtInstant {
110    fn eq(&self, other: &Self) -> bool {
111        self.instant == other.instant
112    }
113}
114
115impl Eq for SimulationEventAtInstant {}
116
117impl Ord for SimulationEventAtInstant {
118    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
119        other.instant.cmp(&self.instant) // Reverse order: earlier is bigger
120    }
121}
122
123impl PartialOrd for SimulationEventAtInstant {
124    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
125        Some(self.cmp(other))
126    }
127}
128
129pub struct NodeDescriptor {
130    pub client_request_handler: Option<Arc<String>>,
131    pub p2p_request_handler: Option<Arc<String>>,
132    pub all_handlers: HashMap<Arc<String>, Arc<Mutex<Option<UntypedHandlerBox>>>>,
133}
134
135impl std::fmt::Debug for NodeDescriptor {
136    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
137        f.debug_struct("NodeDescriptor")
138            .field("client_request_handler", &self.client_request_handler)
139            .field("p2p_request_handler", &self.p2p_request_handler)
140            .field(
141                "all_handlers",
142                &self.all_handlers.keys().collect::<Vec<_>>(),
143            )
144            .finish()
145    }
146}
147
148impl NodeDescriptor {
149    pub fn new<C: Into<String>, P: Into<String>>(
150        client_request_handler: Option<C>,
151        p2p_request_handler: Option<P>,
152    ) -> Self {
153        NodeDescriptor {
154            client_request_handler: client_request_handler.map(|h| Arc::new(h.into())),
155            p2p_request_handler: p2p_request_handler.map(|h| Arc::new(h.into())),
156            all_handlers: Default::default(),
157        }
158    }
159}
160
161impl Default for NodeDescriptor {
162    fn default() -> Self {
163        Self::new::<String, String>(Default::default(), Default::default())
164    }
165}
166
167type Topology = HashMap<String, NodeDescriptor>;
168
169struct SimulatedActor<MsgT> {
170    node_id: Arc<String>,
171    name: Arc<String>,
172    events_buffer: Arc<Mutex<Vec<InternalEvent>>>,
173    topology: Arc<Mutex<Topology>>,
174    tokio_runtime: Arc<tokio::runtime::Runtime>,
175    message_type: PhantomData<MsgT>,
176}
177
178impl<MsgT> std::fmt::Debug for SimulatedActor<MsgT> {
179    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
180        f.debug_struct("SimulatedActor")
181            .field("node_id", &self.node_id)
182            .field("name", &self.name)
183            .finish()
184    }
185}
186
187#[derive(Debug)]
188pub struct SimulatedActorRef<MsgT> {
189    actor: Arc<Mutex<SimulatedActor<MsgT>>>,
190}
191
192impl<MsgT> Clone for SimulatedActorRef<MsgT> {
193    fn clone(&self) -> Self {
194        SimulatedActorRef {
195            actor: self.actor.clone(),
196        }
197    }
198}
199
200impl<MsgT> ActorRef<MsgT> for SimulatedActorRef<MsgT>
201where
202    MsgT: ActorMsg,
203{
204    fn send(&mut self, message: MsgT, delay: Option<Duration>) {
205        let actor = self.actor.lock().unwrap();
206        actor.events_buffer.lock().unwrap().push(InternalEvent {
207            node: actor.node_id.clone(),
208            handler: actor.name.clone(),
209            event: Box::new(message),
210            delay,
211        });
212    }
213
214    fn set_handler(&mut self, handler: DynMsgHandler<MsgT>) {
215        let actor = self.actor.lock().unwrap();
216        let mut topology = actor.topology.lock().unwrap();
217        let node = topology.get_mut(actor.node_id.as_ref()).unwrap();
218        node.all_handlers.insert(
219            actor.name.clone(),
220            Arc::new(Mutex::new(Some(Box::new(SimulationMsgHandler { handler })))),
221        );
222    }
223
224    fn spawn_async_send(
225        &mut self,
226        f: impl Future<Output = MsgT> + 'static,
227        delay: Option<Duration>,
228    ) {
229        let runtime = self.actor.lock().unwrap().tokio_runtime.clone();
230        let res = runtime.block_on(f);
231        self.send(res, delay);
232    }
233
234    fn spawn_thread_blocking_send(
235        &mut self,
236        f: impl FnOnce() -> MsgT + Send + 'static,
237        delay: Option<Duration>,
238    ) {
239        self.send(f(), delay);
240    }
241}
242
243pub struct Simulation {
244    topology: Arc<Mutex<Topology>>,
245    exited_actors: Arc<Mutex<Vec<Arc<String>>>>,
246    internal_events_buffer: Arc<Mutex<Vec<InternalEvent>>>,
247    events_queue: Arc<Mutex<BinaryHeap<SimulationEventAtInstant>>>,
248    clock: Arc<Mutex<SimulatedClock>>,
249    random: ChaCha8Rng,
250    end_instant: Instant,
251    tokio_runtime: Arc<tokio::runtime::Runtime>,
252}
253
254impl Simulation {
255    pub fn new(topology: Topology, start_instant: Instant, end_instant: Instant) -> Simulation {
256        Simulation {
257            topology: Arc::new(Mutex::new(topology)),
258            exited_actors: Default::default(),
259            internal_events_buffer: Default::default(),
260            events_queue: Default::default(),
261            clock: Arc::new(Mutex::new(SimulatedClock {
262                current_instant: start_instant,
263            })),
264            random: ChaCha8Rng::seed_from_u64(SEED),
265            end_instant,
266            tokio_runtime: Arc::new(
267                tokio::runtime::Builder::new_multi_thread()
268                    .enable_all()
269                    .build()
270                    .unwrap(),
271            ),
272        }
273    }
274
275    pub fn client_send<MsgT>(&mut self, to_node: String, message: MsgT)
276    where
277        MsgT: ActorMsg + 'static,
278    {
279        let instant = self.instant_of_client_request_send();
280        self.events_queue
281            .lock()
282            .unwrap()
283            .push(SimulationEventAtInstant {
284                instant,
285                event: SimulationEvent::ClientSend {
286                    to_node: Arc::new(to_node),
287                    event: Box::new(message),
288                },
289            })
290    }
291
292    pub fn run(mut self) -> Vec<SimulationEventAtInstant> {
293        let mut result = Vec::new();
294        self.events_queue
295            .lock()
296            .unwrap()
297            .push(SimulationEventAtInstant {
298                instant: self.end_instant,
299                event: SimulationEvent::SimulationEnd,
300            });
301
302        while !self.events_queue.lock().unwrap().is_empty() {
303            let mut mutex_buffer = self.internal_events_buffer.lock().unwrap();
304            let mut events_buffer = Vec::new();
305            mem::swap(&mut *mutex_buffer, &mut events_buffer);
306            drop(mutex_buffer);
307            for event in events_buffer {
308                let instant = self.instant_of_internal_event();
309                self.events_queue
310                    .lock()
311                    .unwrap()
312                    .push(SimulationEventAtInstant {
313                        instant,
314                        event: SimulationEvent::Internal { event },
315                    })
316            }
317
318            let e = self.events_queue.lock().unwrap().pop().unwrap();
319            let e_clone = e.clone();
320            result.push(e_clone);
321            self.clock.lock().unwrap().current_instant = e.instant;
322
323            if let ControlFlow::Break(_) = self.handle_event(e) {
324                break;
325            }
326        }
327        result
328    }
329
330    fn handle_event(&mut self, e: SimulationEventAtInstant) -> ControlFlow<()> {
331        match e.event {
332            SimulationEvent::ClientSend { to_node, event } => {
333                self.handle_client_send(to_node, event)
334            }
335            SimulationEvent::ClientRequest { to_node, event } => {
336                self.handle_client_request(to_node, event)
337            }
338            SimulationEvent::P2PSend { to_node, event } => self.handle_p2p_send(to_node, event),
339            SimulationEvent::P2PRequest { node, event } => self.handle_p2p_request(node, event),
340            SimulationEvent::Internal {
341                event:
342                    InternalEvent {
343                        node,
344                        handler,
345                        event,
346                        delay,
347                    },
348            } => self.handle_internal_event(delay, e.instant, node, handler, event),
349            SimulationEvent::SimulationEnd => return ControlFlow::Break(()),
350        }
351        ControlFlow::Continue(())
352    }
353
354    fn handle_internal_event(
355        &mut self,
356        delay: Option<Duration>,
357        instant: Instant,
358        node: Arc<String>,
359        handler: Arc<String>,
360        event: Box<dyn ActorMsg>,
361    ) {
362        match delay {
363            Some(duration) => {
364                self.events_queue
365                    .lock()
366                    .unwrap()
367                    .push(SimulationEventAtInstant {
368                        instant: instant.add(duration),
369                        event: SimulationEvent::Internal {
370                            event: InternalEvent {
371                                node,
372                                handler,
373                                event,
374                                delay: None,
375                            },
376                        },
377                    });
378            }
379            None => {
380                if self.exited_actors.lock().unwrap().contains(&handler) {
381                    panic!("message sent to actor that has exited")
382                }
383                let mut topology = self.topology.lock().unwrap();
384                let mut removed_node = topology
385                    .remove(node.as_ref())
386                    .unwrap_or_else(|| panic!("node {:?} unknown", node));
387                let removed_handler_arc = removed_node
388                    .all_handlers
389                    .remove(&handler)
390                    .unwrap_or_else(|| {
391                        panic!("handler {:?} not found for node {:?}", handler, node)
392                    });
393                topology.insert((*node).clone(), removed_node);
394                drop(topology); // So that a handler with access to the actor system can lock it again in `crate` and/or `set_handler`
395                if let Some(control) = removed_handler_arc
396                    .lock()
397                    .unwrap()
398                    .as_mut()
399                    .unwrap_or_else(|| {
400                        panic!("handler {:?} not found for node {:?}", handler, node)
401                    })
402                    .receive_untyped(event.clone())
403                    .unwrap_or_else(|_| {
404                        panic!(
405                            "handler {:?} for node {:?} cannot handle event {:?}",
406                            handler, node, event
407                        )
408                    })
409                {
410                    match control {
411                        ActorControl::Exit() => self.exited_actors.lock().unwrap().push(handler),
412                    }
413                } else {
414                    let mut topology = self.topology.lock().unwrap();
415                    let mut removed_node = topology
416                        .remove(node.as_ref())
417                        .unwrap_or_else(|| panic!("node {:?} not found", node));
418                    removed_node
419                        .all_handlers
420                        .insert(handler, removed_handler_arc.clone());
421                    topology.insert((*node).clone(), removed_node);
422                };
423            }
424        }
425    }
426
427    fn handle_p2p_request(&mut self, node: Arc<String>, event: Box<dyn ActorMsg>) {
428        let internal_event_instant = self.instant_of_internal_event();
429        let p2p_request_handler = self
430            .topology
431            .lock()
432            .unwrap()
433            .get(node.as_ref())
434            .unwrap()
435            .p2p_request_handler
436            .as_ref()
437            .unwrap_or_else(|| panic!("p2p request handler unset for node {:?}", node))
438            .clone();
439        self.events_queue
440            .lock()
441            .unwrap()
442            .push(SimulationEventAtInstant {
443                instant: internal_event_instant,
444                event: SimulationEvent::Internal {
445                    event: InternalEvent {
446                        node,
447                        handler: p2p_request_handler,
448                        event,
449                        delay: None,
450                    },
451                },
452            })
453    }
454
455    fn handle_p2p_send(&mut self, to_node: Arc<String>, event: Box<dyn ActorMsg>) {
456        let p2p_request_arrival_instant = self.instant_of_p2p_request_arrival();
457        self.events_queue
458            .lock()
459            .unwrap()
460            .push(SimulationEventAtInstant {
461                instant: p2p_request_arrival_instant,
462                event: SimulationEvent::P2PRequest {
463                    node: to_node,
464                    event,
465                },
466            });
467    }
468
469    fn handle_client_request(&mut self, to_node: Arc<String>, event: Box<dyn ActorMsg>) {
470        let internal_event_instant = self.instant_of_internal_event();
471        let client_request_handler = self
472            .topology
473            .lock()
474            .unwrap()
475            .get(to_node.as_ref())
476            .unwrap_or_else(|| panic!("node {:?} unknown", to_node))
477            .client_request_handler
478            .as_ref()
479            .unwrap_or_else(|| panic!("client request handler unset for node {:?}", to_node))
480            .clone();
481        self.events_queue
482            .lock()
483            .unwrap()
484            .push(SimulationEventAtInstant {
485                instant: internal_event_instant,
486                event: SimulationEvent::Internal {
487                    event: InternalEvent {
488                        node: to_node,
489                        handler: client_request_handler,
490                        event,
491                        delay: None,
492                    },
493                },
494            })
495    }
496
497    fn handle_client_send(&mut self, to_node: Arc<String>, event: Box<dyn ActorMsg>) {
498        let client_request_arrival_instant = self.instant_of_client_request_arrival();
499        self.events_queue
500            .lock()
501            .unwrap()
502            .push(SimulationEventAtInstant {
503                instant: client_request_arrival_instant,
504                event: SimulationEvent::ClientRequest { to_node, event },
505            })
506    }
507
508    fn instant_of_internal_event(&mut self) -> Instant {
509        let pseudo_random_between_zero_and_one = self.random.next_u64() as f64 / u64::MAX as f64;
510        let pseudo_random_duration =
511            MAX_RANDOM_DURATION.mul_f64(pseudo_random_between_zero_and_one);
512        self.clock
513            .lock()
514            .unwrap()
515            .current_instant
516            .add(pseudo_random_duration)
517    }
518
519    fn instant_of_client_request_send(&mut self) -> Instant {
520        self.instant_of_internal_event()
521    }
522
523    fn instant_of_client_request_arrival(&mut self) -> Instant {
524        self.instant_of_internal_event()
525    }
526
527    fn instant_of_p2p_request_send(&mut self) -> Instant {
528        self.instant_of_internal_event()
529    }
530
531    fn instant_of_p2p_request_arrival(&mut self) -> Instant {
532        self.instant_of_internal_event()
533    }
534}
535
536impl Clone for Simulation {
537    fn clone(&self) -> Self {
538        Simulation {
539            topology: self.topology.clone(),
540            exited_actors: self.exited_actors.clone(),
541            internal_events_buffer: self.internal_events_buffer.clone(),
542            events_queue: self.events_queue.clone(),
543            clock: self.clock.clone(),
544            random: self.random.clone(),
545            end_instant: self.end_instant,
546            tokio_runtime: self.tokio_runtime.clone(),
547        }
548    }
549}
550
551impl std::fmt::Debug for Simulation {
552    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
553        f.debug_struct("Simulation")
554            .field("topology", &self.topology)
555            .field("random", &self.random)
556            .field("end_instant", &self.end_instant)
557            .finish()
558    }
559}
560
561impl ActorSystemHandle for Simulation {
562    type ActorRefT<MsgT>
563        = SimulatedActorRef<MsgT>
564    where
565        MsgT: ActorMsg;
566
567    fn create<MsgT>(
568        &self,
569        node_id: impl Into<String>,
570        name: impl Into<String>,
571        _join_on_drop: bool,
572    ) -> SimulatedActorRef<MsgT>
573    where
574        MsgT: ActorMsg,
575    {
576        let node_id_string = node_id.into();
577        let name_string = name.into();
578        let name_arc = Arc::new(name_string);
579        let name_arc2 = name_arc.clone();
580        if self
581            .topology
582            .lock()
583            .unwrap()
584            .get_mut(&node_id_string)
585            .unwrap_or_else(|| panic!("Node {:?} unknown", &node_id_string))
586            .all_handlers
587            .insert(name_arc, Arc::new(Mutex::new(None)))
588            .is_some()
589        {
590            panic!("An actor with such a name already exist");
591        }
592        SimulatedActorRef {
593            actor: Arc::new(Mutex::new(SimulatedActor {
594                node_id: Arc::new(node_id_string),
595                name: name_arc2,
596                events_buffer: self.internal_events_buffer.clone(),
597                topology: self.topology.clone(),
598                tokio_runtime: self.tokio_runtime.clone(),
599                message_type: PhantomData {},
600            })),
601        }
602    }
603}
604
605impl P2PNetworkClient for Simulation {
606    fn attempt_send<MsgT, SerializerT>(
607        &mut self,
608        message: MsgT,
609        _serializer: SerializerT,
610        to_node: impl Into<String>,
611    ) -> P2PNetworkResult<()>
612    where
613        MsgT: ActorMsg,
614        SerializerT: Fn(MsgT) -> P2PNetworkResult<Vec<u8>> + Sync,
615    {
616        let instant = self.instant_of_p2p_request_send();
617        self.events_queue
618            .lock()
619            .unwrap()
620            .push(SimulationEventAtInstant {
621                instant,
622                event: SimulationEvent::P2PSend {
623                    to_node: Arc::new(to_node.into()),
624                    event: Box::new(message),
625                },
626            });
627        Ok(())
628    }
629}