1use std::{
4 any::Any,
5 collections::{BinaryHeap, HashMap},
6 fmt::Debug,
7 future::Future,
8 marker::PhantomData,
9 mem,
10 ops::{Add, ControlFlow},
11 sync::{Arc, Mutex},
12 time::{Duration, Instant},
13};
14
15use bftgrid_core::actor::{
16 ActorControl, ActorMsg, ActorRef, ActorSystemHandle, AnActorMsg, DynMsgHandler,
17 MessageNotSupported, P2PNetworkClient, P2PNetworkResult, UntypedMsgHandler,
18};
19use rand_chacha::{
20 ChaCha8Rng,
21 rand_core::{RngCore, SeedableRng},
22};
23
24pub type UntypedHandlerBox = Box<dyn UntypedMsgHandler>;
25
26struct SimulationMsgHandler<MsgT> {
27 handler: DynMsgHandler<MsgT>,
28}
29
30impl<MsgT> UntypedMsgHandler for SimulationMsgHandler<MsgT>
39where
40 MsgT: ActorMsg,
41{
42 fn receive_untyped(
43 &mut self,
44 message: AnActorMsg,
45 ) -> Result<Option<ActorControl>, MessageNotSupported> {
46 match (message.clone() as Box<dyn Any>).downcast::<MsgT>() {
47 Ok(typed_message) => Result::Ok(self.handler.receive(*typed_message)),
48 Err(_) => {
49 match (Box::new(message) as Box<dyn Any>).downcast::<MsgT>() {
51 Ok(typed_message) => Result::Ok(self.handler.receive(*typed_message)),
52 Err(_) => Result::Err(MessageNotSupported()),
53 }
54 }
55 }
56 }
57}
58
59const SEED: u64 = 10;
60const MAX_RANDOM_DURATION: Duration = Duration::from_secs(1);
61
62pub struct SimulatedClock {
63 current_instant: Instant,
64}
65
66#[derive(Clone, Debug)]
67pub struct InternalEvent {
68 node: Arc<String>,
69 handler: Arc<String>,
70 event: Box<dyn ActorMsg>,
71 delay: Option<Duration>,
72}
73
74#[derive(Clone, Debug)]
75pub enum SimulationEvent {
76 ClientSend {
77 to_node: Arc<String>,
79 event: Box<dyn ActorMsg>,
80 },
81 ClientRequest {
82 to_node: Arc<String>,
84 event: Box<dyn ActorMsg>,
85 },
86 P2PSend {
87 to_node: Arc<String>,
89 event: Box<dyn ActorMsg>,
90 },
91 P2PRequest {
92 node: Arc<String>,
94 event: Box<dyn ActorMsg>,
95 },
96 Internal {
97 event: InternalEvent,
99 },
100 SimulationEnd,
101}
102
103#[derive(Clone, Debug)]
104pub struct SimulationEventAtInstant {
105 instant: Instant,
106 event: SimulationEvent,
107}
108
109impl PartialEq for SimulationEventAtInstant {
110 fn eq(&self, other: &Self) -> bool {
111 self.instant == other.instant
112 }
113}
114
115impl Eq for SimulationEventAtInstant {}
116
117impl Ord for SimulationEventAtInstant {
118 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
119 other.instant.cmp(&self.instant) }
121}
122
123impl PartialOrd for SimulationEventAtInstant {
124 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
125 Some(self.cmp(other))
126 }
127}
128
129pub struct NodeDescriptor {
130 pub client_request_handler: Option<Arc<String>>,
131 pub p2p_request_handler: Option<Arc<String>>,
132 pub all_handlers: HashMap<Arc<String>, Arc<Mutex<Option<UntypedHandlerBox>>>>,
133}
134
135impl std::fmt::Debug for NodeDescriptor {
136 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
137 f.debug_struct("NodeDescriptor")
138 .field("client_request_handler", &self.client_request_handler)
139 .field("p2p_request_handler", &self.p2p_request_handler)
140 .field(
141 "all_handlers",
142 &self.all_handlers.keys().collect::<Vec<_>>(),
143 )
144 .finish()
145 }
146}
147
148impl NodeDescriptor {
149 pub fn new<C: Into<String>, P: Into<String>>(
150 client_request_handler: Option<C>,
151 p2p_request_handler: Option<P>,
152 ) -> Self {
153 NodeDescriptor {
154 client_request_handler: client_request_handler.map(|h| Arc::new(h.into())),
155 p2p_request_handler: p2p_request_handler.map(|h| Arc::new(h.into())),
156 all_handlers: Default::default(),
157 }
158 }
159}
160
161impl Default for NodeDescriptor {
162 fn default() -> Self {
163 Self::new::<String, String>(Default::default(), Default::default())
164 }
165}
166
167type Topology = HashMap<String, NodeDescriptor>;
168
169struct SimulatedActor<MsgT> {
170 node_id: Arc<String>,
171 name: Arc<String>,
172 events_buffer: Arc<Mutex<Vec<InternalEvent>>>,
173 topology: Arc<Mutex<Topology>>,
174 tokio_runtime: Arc<tokio::runtime::Runtime>,
175 message_type: PhantomData<MsgT>,
176}
177
178impl<MsgT> std::fmt::Debug for SimulatedActor<MsgT> {
179 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
180 f.debug_struct("SimulatedActor")
181 .field("node_id", &self.node_id)
182 .field("name", &self.name)
183 .finish()
184 }
185}
186
187#[derive(Debug)]
188pub struct SimulatedActorRef<MsgT> {
189 actor: Arc<Mutex<SimulatedActor<MsgT>>>,
190}
191
192impl<MsgT> Clone for SimulatedActorRef<MsgT> {
193 fn clone(&self) -> Self {
194 SimulatedActorRef {
195 actor: self.actor.clone(),
196 }
197 }
198}
199
200impl<MsgT> ActorRef<MsgT> for SimulatedActorRef<MsgT>
201where
202 MsgT: ActorMsg,
203{
204 fn send(&mut self, message: MsgT, delay: Option<Duration>) {
205 let actor = self.actor.lock().unwrap();
206 actor.events_buffer.lock().unwrap().push(InternalEvent {
207 node: actor.node_id.clone(),
208 handler: actor.name.clone(),
209 event: Box::new(message),
210 delay,
211 });
212 }
213
214 fn set_handler(&mut self, handler: DynMsgHandler<MsgT>) {
215 let actor = self.actor.lock().unwrap();
216 let mut topology = actor.topology.lock().unwrap();
217 let node = topology.get_mut(actor.node_id.as_ref()).unwrap();
218 node.all_handlers.insert(
219 actor.name.clone(),
220 Arc::new(Mutex::new(Some(Box::new(SimulationMsgHandler { handler })))),
221 );
222 }
223
224 fn spawn_async_send(
225 &mut self,
226 f: impl Future<Output = MsgT> + 'static,
227 delay: Option<Duration>,
228 ) {
229 let runtime = self.actor.lock().unwrap().tokio_runtime.clone();
230 let res = runtime.block_on(f);
231 self.send(res, delay);
232 }
233
234 fn spawn_thread_blocking_send(
235 &mut self,
236 f: impl FnOnce() -> MsgT + Send + 'static,
237 delay: Option<Duration>,
238 ) {
239 self.send(f(), delay);
240 }
241}
242
243pub struct Simulation {
244 topology: Arc<Mutex<Topology>>,
245 exited_actors: Arc<Mutex<Vec<Arc<String>>>>,
246 internal_events_buffer: Arc<Mutex<Vec<InternalEvent>>>,
247 events_queue: Arc<Mutex<BinaryHeap<SimulationEventAtInstant>>>,
248 clock: Arc<Mutex<SimulatedClock>>,
249 random: ChaCha8Rng,
250 end_instant: Instant,
251 tokio_runtime: Arc<tokio::runtime::Runtime>,
252}
253
254impl Simulation {
255 pub fn new(topology: Topology, start_instant: Instant, end_instant: Instant) -> Simulation {
256 Simulation {
257 topology: Arc::new(Mutex::new(topology)),
258 exited_actors: Default::default(),
259 internal_events_buffer: Default::default(),
260 events_queue: Default::default(),
261 clock: Arc::new(Mutex::new(SimulatedClock {
262 current_instant: start_instant,
263 })),
264 random: ChaCha8Rng::seed_from_u64(SEED),
265 end_instant,
266 tokio_runtime: Arc::new(
267 tokio::runtime::Builder::new_multi_thread()
268 .enable_all()
269 .build()
270 .unwrap(),
271 ),
272 }
273 }
274
275 pub fn client_send<MsgT>(&mut self, to_node: String, message: MsgT)
276 where
277 MsgT: ActorMsg + 'static,
278 {
279 let instant = self.instant_of_client_request_send();
280 self.events_queue
281 .lock()
282 .unwrap()
283 .push(SimulationEventAtInstant {
284 instant,
285 event: SimulationEvent::ClientSend {
286 to_node: Arc::new(to_node),
287 event: Box::new(message),
288 },
289 })
290 }
291
292 pub fn run(mut self) -> Vec<SimulationEventAtInstant> {
293 let mut result = Vec::new();
294 self.events_queue
295 .lock()
296 .unwrap()
297 .push(SimulationEventAtInstant {
298 instant: self.end_instant,
299 event: SimulationEvent::SimulationEnd,
300 });
301
302 while !self.events_queue.lock().unwrap().is_empty() {
303 let mut mutex_buffer = self.internal_events_buffer.lock().unwrap();
304 let mut events_buffer = Vec::new();
305 mem::swap(&mut *mutex_buffer, &mut events_buffer);
306 drop(mutex_buffer);
307 for event in events_buffer {
308 let instant = self.instant_of_internal_event();
309 self.events_queue
310 .lock()
311 .unwrap()
312 .push(SimulationEventAtInstant {
313 instant,
314 event: SimulationEvent::Internal { event },
315 })
316 }
317
318 let e = self.events_queue.lock().unwrap().pop().unwrap();
319 let e_clone = e.clone();
320 result.push(e_clone);
321 self.clock.lock().unwrap().current_instant = e.instant;
322
323 if let ControlFlow::Break(_) = self.handle_event(e) {
324 break;
325 }
326 }
327 result
328 }
329
330 fn handle_event(&mut self, e: SimulationEventAtInstant) -> ControlFlow<()> {
331 match e.event {
332 SimulationEvent::ClientSend { to_node, event } => {
333 self.handle_client_send(to_node, event)
334 }
335 SimulationEvent::ClientRequest { to_node, event } => {
336 self.handle_client_request(to_node, event)
337 }
338 SimulationEvent::P2PSend { to_node, event } => self.handle_p2p_send(to_node, event),
339 SimulationEvent::P2PRequest { node, event } => self.handle_p2p_request(node, event),
340 SimulationEvent::Internal {
341 event:
342 InternalEvent {
343 node,
344 handler,
345 event,
346 delay,
347 },
348 } => self.handle_internal_event(delay, e.instant, node, handler, event),
349 SimulationEvent::SimulationEnd => return ControlFlow::Break(()),
350 }
351 ControlFlow::Continue(())
352 }
353
354 fn handle_internal_event(
355 &mut self,
356 delay: Option<Duration>,
357 instant: Instant,
358 node: Arc<String>,
359 handler: Arc<String>,
360 event: Box<dyn ActorMsg>,
361 ) {
362 match delay {
363 Some(duration) => {
364 self.events_queue
365 .lock()
366 .unwrap()
367 .push(SimulationEventAtInstant {
368 instant: instant.add(duration),
369 event: SimulationEvent::Internal {
370 event: InternalEvent {
371 node,
372 handler,
373 event,
374 delay: None,
375 },
376 },
377 });
378 }
379 None => {
380 if self.exited_actors.lock().unwrap().contains(&handler) {
381 panic!("message sent to actor that has exited")
382 }
383 let mut topology = self.topology.lock().unwrap();
384 let mut removed_node = topology
385 .remove(node.as_ref())
386 .unwrap_or_else(|| panic!("node {:?} unknown", node));
387 let removed_handler_arc = removed_node
388 .all_handlers
389 .remove(&handler)
390 .unwrap_or_else(|| {
391 panic!("handler {:?} not found for node {:?}", handler, node)
392 });
393 topology.insert((*node).clone(), removed_node);
394 drop(topology); if let Some(control) = removed_handler_arc
396 .lock()
397 .unwrap()
398 .as_mut()
399 .unwrap_or_else(|| {
400 panic!("handler {:?} not found for node {:?}", handler, node)
401 })
402 .receive_untyped(event.clone())
403 .unwrap_or_else(|_| {
404 panic!(
405 "handler {:?} for node {:?} cannot handle event {:?}",
406 handler, node, event
407 )
408 })
409 {
410 match control {
411 ActorControl::Exit() => self.exited_actors.lock().unwrap().push(handler),
412 }
413 } else {
414 let mut topology = self.topology.lock().unwrap();
415 let mut removed_node = topology
416 .remove(node.as_ref())
417 .unwrap_or_else(|| panic!("node {:?} not found", node));
418 removed_node
419 .all_handlers
420 .insert(handler, removed_handler_arc.clone());
421 topology.insert((*node).clone(), removed_node);
422 };
423 }
424 }
425 }
426
427 fn handle_p2p_request(&mut self, node: Arc<String>, event: Box<dyn ActorMsg>) {
428 let internal_event_instant = self.instant_of_internal_event();
429 let p2p_request_handler = self
430 .topology
431 .lock()
432 .unwrap()
433 .get(node.as_ref())
434 .unwrap()
435 .p2p_request_handler
436 .as_ref()
437 .unwrap_or_else(|| panic!("p2p request handler unset for node {:?}", node))
438 .clone();
439 self.events_queue
440 .lock()
441 .unwrap()
442 .push(SimulationEventAtInstant {
443 instant: internal_event_instant,
444 event: SimulationEvent::Internal {
445 event: InternalEvent {
446 node,
447 handler: p2p_request_handler,
448 event,
449 delay: None,
450 },
451 },
452 })
453 }
454
455 fn handle_p2p_send(&mut self, to_node: Arc<String>, event: Box<dyn ActorMsg>) {
456 let p2p_request_arrival_instant = self.instant_of_p2p_request_arrival();
457 self.events_queue
458 .lock()
459 .unwrap()
460 .push(SimulationEventAtInstant {
461 instant: p2p_request_arrival_instant,
462 event: SimulationEvent::P2PRequest {
463 node: to_node,
464 event,
465 },
466 });
467 }
468
469 fn handle_client_request(&mut self, to_node: Arc<String>, event: Box<dyn ActorMsg>) {
470 let internal_event_instant = self.instant_of_internal_event();
471 let client_request_handler = self
472 .topology
473 .lock()
474 .unwrap()
475 .get(to_node.as_ref())
476 .unwrap_or_else(|| panic!("node {:?} unknown", to_node))
477 .client_request_handler
478 .as_ref()
479 .unwrap_or_else(|| panic!("client request handler unset for node {:?}", to_node))
480 .clone();
481 self.events_queue
482 .lock()
483 .unwrap()
484 .push(SimulationEventAtInstant {
485 instant: internal_event_instant,
486 event: SimulationEvent::Internal {
487 event: InternalEvent {
488 node: to_node,
489 handler: client_request_handler,
490 event,
491 delay: None,
492 },
493 },
494 })
495 }
496
497 fn handle_client_send(&mut self, to_node: Arc<String>, event: Box<dyn ActorMsg>) {
498 let client_request_arrival_instant = self.instant_of_client_request_arrival();
499 self.events_queue
500 .lock()
501 .unwrap()
502 .push(SimulationEventAtInstant {
503 instant: client_request_arrival_instant,
504 event: SimulationEvent::ClientRequest { to_node, event },
505 })
506 }
507
508 fn instant_of_internal_event(&mut self) -> Instant {
509 let pseudo_random_between_zero_and_one = self.random.next_u64() as f64 / u64::MAX as f64;
510 let pseudo_random_duration =
511 MAX_RANDOM_DURATION.mul_f64(pseudo_random_between_zero_and_one);
512 self.clock
513 .lock()
514 .unwrap()
515 .current_instant
516 .add(pseudo_random_duration)
517 }
518
519 fn instant_of_client_request_send(&mut self) -> Instant {
520 self.instant_of_internal_event()
521 }
522
523 fn instant_of_client_request_arrival(&mut self) -> Instant {
524 self.instant_of_internal_event()
525 }
526
527 fn instant_of_p2p_request_send(&mut self) -> Instant {
528 self.instant_of_internal_event()
529 }
530
531 fn instant_of_p2p_request_arrival(&mut self) -> Instant {
532 self.instant_of_internal_event()
533 }
534}
535
536impl Clone for Simulation {
537 fn clone(&self) -> Self {
538 Simulation {
539 topology: self.topology.clone(),
540 exited_actors: self.exited_actors.clone(),
541 internal_events_buffer: self.internal_events_buffer.clone(),
542 events_queue: self.events_queue.clone(),
543 clock: self.clock.clone(),
544 random: self.random.clone(),
545 end_instant: self.end_instant,
546 tokio_runtime: self.tokio_runtime.clone(),
547 }
548 }
549}
550
551impl std::fmt::Debug for Simulation {
552 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
553 f.debug_struct("Simulation")
554 .field("topology", &self.topology)
555 .field("random", &self.random)
556 .field("end_instant", &self.end_instant)
557 .finish()
558 }
559}
560
561impl ActorSystemHandle for Simulation {
562 type ActorRefT<MsgT>
563 = SimulatedActorRef<MsgT>
564 where
565 MsgT: ActorMsg;
566
567 fn create<MsgT>(
568 &self,
569 node_id: impl Into<String>,
570 name: impl Into<String>,
571 _join_on_drop: bool,
572 ) -> SimulatedActorRef<MsgT>
573 where
574 MsgT: ActorMsg,
575 {
576 let node_id_string = node_id.into();
577 let name_string = name.into();
578 let name_arc = Arc::new(name_string);
579 let name_arc2 = name_arc.clone();
580 if self
581 .topology
582 .lock()
583 .unwrap()
584 .get_mut(&node_id_string)
585 .unwrap_or_else(|| panic!("Node {:?} unknown", &node_id_string))
586 .all_handlers
587 .insert(name_arc, Arc::new(Mutex::new(None)))
588 .is_some()
589 {
590 panic!("An actor with such a name already exist");
591 }
592 SimulatedActorRef {
593 actor: Arc::new(Mutex::new(SimulatedActor {
594 node_id: Arc::new(node_id_string),
595 name: name_arc2,
596 events_buffer: self.internal_events_buffer.clone(),
597 topology: self.topology.clone(),
598 tokio_runtime: self.tokio_runtime.clone(),
599 message_type: PhantomData {},
600 })),
601 }
602 }
603}
604
605impl P2PNetworkClient for Simulation {
606 fn attempt_send<MsgT, SerializerT>(
607 &mut self,
608 message: MsgT,
609 _serializer: SerializerT,
610 to_node: impl Into<String>,
611 ) -> P2PNetworkResult<()>
612 where
613 MsgT: ActorMsg,
614 SerializerT: Fn(MsgT) -> P2PNetworkResult<Vec<u8>> + Sync,
615 {
616 let instant = self.instant_of_p2p_request_send();
617 self.events_queue
618 .lock()
619 .unwrap()
620 .push(SimulationEventAtInstant {
621 instant,
622 event: SimulationEvent::P2PSend {
623 to_node: Arc::new(to_node.into()),
624 event: Box::new(message),
625 },
626 });
627 Ok(())
628 }
629}