bftgrid_core/
actor.rs

1//! An actor framework geared towards BFT ordering libraries with support for deterministic simulation testing.
2//! It also allows writing a generic overall system construction logic independent from the specific actor system used.
3//! Every actor is managed by a single actor system but can interact with actors managed by other actor systems.
4//!
5//! No async signatures are used in the public API in order to support single-threaded simulation testing without relying on async runtimes.
6//! Actors should not assume anything about the thread they are running on, nor use any async runtime. They must rely on special functions
7//! to execute thread-blocking and async tasks.
8
9use std::{
10    any::Any,
11    error::Error,
12    fmt::{Debug, Display, Error as FmtError, Formatter},
13    io,
14    sync::Arc,
15    time::Duration,
16};
17
18use dyn_clone::{DynClone, clone_trait_object};
19use thiserror::Error;
20use tokio::task::JoinError;
21
22pub trait ActorMsg: DynClone + Any + Send + Debug {}
23clone_trait_object!(ActorMsg);
24
25pub enum ActorControl {
26    Exit(),
27}
28
29/// A [`TypedMsgHandler`] is an actor behavior that can handle messages of a specific type
30/// and optionally return an [`ActorControl`] message.
31pub trait TypedMsgHandler<MsgT>: Send
32where
33    MsgT: ActorMsg,
34{
35    fn receive(&mut self, message: MsgT) -> Option<ActorControl>;
36}
37
38pub type DynMsgHandler<MsgT> = Box<dyn TypedMsgHandler<MsgT>>;
39
40pub type AnActorMsg = Box<dyn ActorMsg>;
41
42impl ActorMsg for AnActorMsg {}
43
44#[derive(Debug, Clone)]
45pub struct MessageNotSupported();
46
47impl Display for MessageNotSupported {
48    fn fmt(&self, f: &mut Formatter) -> Result<(), FmtError> {
49        write!(f, "Message not supported")
50    }
51}
52
53impl Error for MessageNotSupported {}
54
55/// An [`UntypedMsgHandler`] is an actor handler that can receive messages of any type,
56///  although it may refuse to handle some of them.
57pub trait UntypedMsgHandler: Send {
58    fn receive_untyped(
59        &mut self,
60        message: AnActorMsg,
61    ) -> Result<Option<ActorControl>, MessageNotSupported>;
62}
63
64pub type UntypedHandlerBox = Box<dyn UntypedMsgHandler>;
65
66/// A [`Task`] can be queried for completion.
67pub trait Task: Send + Debug {
68    fn is_finished(&self) -> bool;
69}
70
71/// A [`Joinable`] can be awaited for completion in a thread-blocking fashion.
72/// Specific types of [`ActorRef`] and [`ActorSystemHandle`] can be joined to wait for their completion.
73pub trait Joinable<Output>: Task + Send + Debug {
74    fn join(&mut self) -> Output;
75}
76
77/// An [`ActorRef`] can asynchronously send messages to the underlying actor, optionally with a delay,
78///  change the actor handler, spawn an async self-send, spawn a thread-blocking self-send and it can be cloned.
79/// Actor implementations can use [`ActorRef`]s to send messages to themselves and to other actors.
80pub trait ActorRef<MsgT>: DynClone + Send + Debug
81where
82    MsgT: ActorMsg + 'static,
83{
84    fn send(&mut self, message: MsgT, delay: Option<Duration>);
85
86    fn set_handler(&mut self, handler: DynMsgHandler<MsgT>);
87
88    fn spawn_async_send(
89        &mut self,
90        f: impl Future<Output = MsgT> + Send + 'static,
91        delay: Option<Duration>,
92    );
93
94    fn spawn_thread_blocking_send(
95        &mut self,
96        f: impl FnOnce() -> MsgT + Send + 'static,
97        delay: Option<Duration>,
98    );
99}
100
101/// An [`ActorSystemHandle`] allows spawning actors by creating an [`ActorRef`] and it can be cloned.
102/// Actors themselves can use [`ActorSystemHandle`]s to spawn new actors.
103pub trait ActorSystemHandle: DynClone + Send {
104    type ActorRefT<MsgT>: ActorRef<MsgT>
105    where
106        MsgT: ActorMsg;
107
108    fn create<MsgT>(
109        &self,
110        node_id: impl Into<String>,
111        name: impl Into<String>,
112        join_on_drop: bool,
113    ) -> Self::ActorRefT<MsgT>
114    where
115        MsgT: ActorMsg;
116}
117
118#[derive(Error, Debug, Clone)]
119pub enum P2PNetworkError {
120    #[error("I/O error")]
121    Io(#[from] Arc<io::Error>),
122    #[error("Join error")]
123    Join(#[from] Arc<JoinError>),
124    #[error("Actor not found")]
125    ActorNotFound(Arc<String>),
126    #[error("Message type not supported")]
127    MessageNotSupported,
128}
129
130pub type P2PNetworkResult<R> = Result<R, P2PNetworkError>;
131
132/// A [`P2PNetworkClient`] allows sending messages to other nodes in a P2P network.
133pub trait P2PNetworkClient: DynClone {
134    fn attempt_send<MsgT, SerializerT>(
135        &mut self,
136        message: MsgT,
137        serializer: SerializerT,
138        node: impl Into<String>,
139    ) -> P2PNetworkResult<()>
140    where
141        MsgT: ActorMsg,
142        SerializerT: Fn(MsgT) -> P2PNetworkResult<Vec<u8>> + Sync + 'static;
143}
144
145/// Erased versions
146pub mod erased {
147    use std::any::Any;
148    use std::fmt::Debug;
149    use std::marker::PhantomData;
150    use std::{pin::Pin, time::Duration};
151
152    use dyn_clone::DynClone;
153
154    use crate::actor;
155
156    use super::{ActorMsg, AnActorMsg, DynMsgHandler, P2PNetworkResult, TypedMsgHandler};
157
158    pub type DynFuture<MsgT> = Pin<Box<dyn Future<Output = MsgT> + Send + 'static>>;
159    pub type DynLazy<MsgT> = Box<dyn FnOnce() -> MsgT + Send + 'static>;
160
161    pub trait ActorRef<MsgT>: DynClone + Send + Debug
162    where
163        MsgT: ActorMsg + 'static,
164    {
165        fn send(&mut self, message: MsgT, delay: Option<Duration>);
166
167        fn set_handler(&mut self, handler: DynMsgHandler<MsgT>);
168
169        fn spawn_async_send(&mut self, f: DynFuture<MsgT>, delay: Option<Duration>);
170
171        fn spawn_thread_blocking_send(&mut self, f: DynLazy<MsgT>, delay: Option<Duration>);
172    }
173
174    pub type DynActorRef<MsgT> = Box<dyn ActorRef<MsgT>>;
175
176    impl<MsgT> Clone for DynActorRef<MsgT> {
177        fn clone(&self) -> Self {
178            dyn_clone::clone_box(&**self)
179        }
180    }
181
182    impl<ActorRefT, MsgT> ActorRef<MsgT> for ActorRefT
183    where
184        ActorRefT: actor::ActorRef<MsgT> + ?Sized,
185        MsgT: ActorMsg + 'static,
186    {
187        fn send(&mut self, message: MsgT, delay: Option<Duration>) {
188            self.send(message, delay);
189        }
190
191        fn set_handler(&mut self, handler: DynMsgHandler<MsgT>) {
192            self.set_handler(handler);
193        }
194
195        fn spawn_async_send(&mut self, f: DynFuture<MsgT>, delay: Option<Duration>) {
196            self.spawn_async_send(f, delay);
197        }
198
199        fn spawn_thread_blocking_send(&mut self, f: DynLazy<MsgT>, delay: Option<Duration>) {
200            self.spawn_thread_blocking_send(f, delay);
201        }
202    }
203
204    impl<MsgT> actor::ActorRef<MsgT> for dyn ActorRef<MsgT>
205    where
206        MsgT: ActorMsg + 'static,
207    {
208        fn send(&mut self, message: MsgT, delay: Option<Duration>) {
209            ActorRef::send(self, message, delay);
210        }
211
212        fn set_handler(&mut self, handler: DynMsgHandler<MsgT>) {
213            ActorRef::set_handler(self, handler);
214        }
215
216        fn spawn_async_send(
217            &mut self,
218            f: impl Future<Output = MsgT> + Send + 'static,
219            delay: Option<Duration>,
220        ) {
221            ActorRef::spawn_async_send(self, Box::pin(f), delay);
222        }
223
224        fn spawn_thread_blocking_send(
225            &mut self,
226            f: impl FnOnce() -> MsgT + Send + 'static,
227            delay: Option<Duration>,
228        ) {
229            ActorRef::spawn_thread_blocking_send(self, Box::new(f), delay);
230        }
231    }
232
233    pub trait ActorSystemHandle: DynClone + Send {
234        fn create(
235            &self,
236            node_id: String,
237            name: String,
238            join_on_drop: bool,
239        ) -> DynActorRef<AnActorMsg>;
240    }
241
242    pub type DynActorSystemHandle = Box<dyn ActorSystemHandle>;
243
244    impl Clone for DynActorSystemHandle {
245        fn clone(&self) -> Self {
246            dyn_clone::clone_box(&**self)
247        }
248    }
249
250    impl<ActorSystemHandleT> ActorSystemHandle for ActorSystemHandleT
251    where
252        ActorSystemHandleT: actor::ActorSystemHandle + ?Sized + 'static,
253    {
254        fn create(
255            &self,
256            node_id: String,
257            name: String,
258            join_on_drop: bool,
259        ) -> DynActorRef<AnActorMsg> {
260            Box::new(self.create(node_id, name, join_on_drop))
261        }
262    }
263
264    #[derive(Debug)]
265    pub struct DynActorRefWrapped<MsgT>
266    where
267        MsgT: ActorMsg,
268    {
269        an_actor_ref: DynActorRef<AnActorMsg>,
270        _p: PhantomData<MsgT>,
271    }
272
273    impl<MsgT> Clone for DynActorRefWrapped<MsgT>
274    where
275        MsgT: ActorMsg,
276    {
277        fn clone(&self) -> Self {
278            DynActorRefWrapped {
279                an_actor_ref: self.an_actor_ref.clone(),
280                _p: PhantomData {},
281            }
282        }
283    }
284
285    struct DowncastingMsgHandler<MsgT>
286    where
287        MsgT: ActorMsg,
288    {
289        dyn_msg_handler: DynMsgHandler<MsgT>,
290    }
291
292    impl<MsgT> TypedMsgHandler<AnActorMsg> for DowncastingMsgHandler<MsgT>
293    where
294        MsgT: ActorMsg,
295    {
296        fn receive(&mut self, message: AnActorMsg) -> Option<actor::ActorControl> {
297            match (message as Box<dyn Any>).downcast::<MsgT>() {
298                Ok(m) => self.dyn_msg_handler.receive(*m),
299                Err(_) => None,
300            }
301        }
302    }
303
304    impl<MsgT> actor::ActorRef<MsgT> for DynActorRefWrapped<MsgT>
305    where
306        MsgT: ActorMsg,
307    {
308        fn send(&mut self, message: MsgT, delay: Option<Duration>) {
309            self.an_actor_ref
310                .send(Box::new(message) as Box<dyn ActorMsg>, delay);
311        }
312
313        fn set_handler(&mut self, handler: DynMsgHandler<MsgT>) {
314            self.an_actor_ref
315                .set_handler(Box::new(DowncastingMsgHandler::<MsgT> {
316                    dyn_msg_handler: handler,
317                }));
318        }
319
320        fn spawn_async_send(
321            &mut self,
322            f: impl Future<Output = MsgT> + Send + 'static,
323            delay: Option<Duration>,
324        ) {
325            self.an_actor_ref.spawn_async_send(
326                Box::pin(async move { Box::new(f.await) as Box<dyn ActorMsg> }),
327                delay,
328            );
329        }
330
331        fn spawn_thread_blocking_send(
332            &mut self,
333            f: impl FnOnce() -> MsgT + Send + 'static,
334            delay: Option<Duration>,
335        ) {
336            self.an_actor_ref
337                .spawn_thread_blocking_send(Box::new(|| Box::new(f())), delay);
338        }
339    }
340
341    impl actor::ActorSystemHandle for dyn ActorSystemHandle {
342        type ActorRefT<MsgT>
343            = DynActorRefWrapped<MsgT>
344        where
345            MsgT: ActorMsg;
346
347        fn create<MsgT>(
348            &self,
349            node_id: impl Into<String>,
350            name: impl Into<String>,
351            join_on_drop: bool,
352        ) -> Self::ActorRefT<MsgT>
353        where
354            MsgT: ActorMsg,
355        {
356            DynActorRefWrapped {
357                an_actor_ref: ActorSystemHandle::create(
358                    self,
359                    node_id.into(),
360                    name.into(),
361                    join_on_drop,
362                ),
363                _p: PhantomData {},
364            }
365        }
366    }
367
368    pub trait P2PNetworkClient: DynClone + Send {
369        fn attempt_send(
370            &mut self,
371            message: AnActorMsg,
372            serializer: Box<dyn Fn(AnActorMsg) -> P2PNetworkResult<Vec<u8>> + Sync>,
373            node: String,
374        ) -> P2PNetworkResult<()>;
375    }
376
377    pub type DynP2PNetworkClient = Box<dyn P2PNetworkClient>;
378
379    impl Clone for DynP2PNetworkClient {
380        fn clone(&self) -> Self {
381            dyn_clone::clone_box(&**self)
382        }
383    }
384
385    impl<P2PNetworkClientT> P2PNetworkClient for P2PNetworkClientT
386    where
387        P2PNetworkClientT: actor::P2PNetworkClient + Send + ?Sized + 'static,
388    {
389        fn attempt_send(
390            &mut self,
391            message: AnActorMsg,
392            serializer: Box<dyn Fn(AnActorMsg) -> P2PNetworkResult<Vec<u8>> + Sync>,
393            node: String,
394        ) -> P2PNetworkResult<()> {
395            self.attempt_send(message, serializer, node)
396        }
397    }
398
399    impl actor::P2PNetworkClient for dyn P2PNetworkClient {
400        fn attempt_send<MsgT, SerializerT>(
401            &mut self,
402            message: MsgT,
403            serializer: SerializerT,
404            node: impl Into<String>,
405        ) -> P2PNetworkResult<()>
406        where
407            MsgT: ActorMsg,
408            SerializerT: Fn(MsgT) -> P2PNetworkResult<Vec<u8>> + Sync + 'static,
409        {
410            P2PNetworkClient::attempt_send(
411                self,
412                Box::new(message),
413                Box::new(move |msg| match (msg as Box<dyn Any>).downcast::<MsgT>() {
414                    Ok(typed_msg) => serializer(*typed_msg),
415                    Err(_) => Err(actor::P2PNetworkError::MessageNotSupported),
416                }),
417                node.into(),
418            )
419        }
420    }
421}