bftgrid_mt/
tokio.rs

1use std::{
2    collections::HashMap,
3    fmt::Debug,
4    future::Future,
5    io, mem,
6    sync::{Arc, Condvar},
7    time::Duration,
8};
9
10use bftgrid_core::actor::{
11    ActorControl, ActorMsg, ActorRef, ActorSystemHandle, DynMsgHandler, Joinable, P2PNetworkClient,
12    P2PNetworkError, P2PNetworkResult, Task, UntypedMsgHandler,
13};
14use futures::future;
15use tokio::{
16    net::UdpSocket,
17    runtime::Runtime,
18    sync::mpsc::{self as tmpsc, UnboundedSender as TUnboundedSender},
19    task::JoinHandle as TokioJoinHandle,
20};
21
22use crate::{
23    AsyncRuntime, ThreadJoinable, TokioTask, join_tasks, notify_close, push_async_task,
24    spawn_async_task,
25};
26
27#[derive(Debug)]
28struct TokioActorData<MsgT>
29where
30    MsgT: ActorMsg,
31{
32    tx: TUnboundedSender<MsgT>,
33    handler_tx: TUnboundedSender<Arc<tokio::sync::Mutex<DynMsgHandler<MsgT>>>>,
34    close_cond: Arc<(std::sync::Mutex<bool>, Condvar)>,
35    name: Arc<String>,
36}
37
38impl<MsgT> Clone for TokioActorData<MsgT>
39where
40    MsgT: ActorMsg,
41{
42    fn clone(&self) -> Self {
43        TokioActorData {
44            tx: self.tx.clone(),
45            handler_tx: self.handler_tx.clone(),
46            close_cond: self.close_cond.clone(),
47            name: self.name.clone(),
48        }
49    }
50}
51
52#[derive(Debug)]
53struct TokioActor<MsgT>
54where
55    MsgT: ActorMsg,
56{
57    data: TokioActorData<MsgT>,
58    actor_system_handle: TokioActorSystemHandle,
59    join_on_drop: bool,
60}
61
62impl<MsgT> TokioActor<MsgT>
63where
64    MsgT: ActorMsg,
65{
66    fn join(&self) {
67        let (close_mutex, cvar) = &*self.data.close_cond;
68        let mut closed = close_mutex.lock().unwrap();
69        while !*closed {
70            // The wait can be long, so block the tread safely via the runtime.
71            //  In any case, since the actor system is being dropped and this
72            //  is a one-time operation, it does not constitute a performance issue.
73            let runtime = self
74                .actor_system_handle
75                .actor_system
76                .lock()
77                .unwrap()
78                .async_runtime
79                .clone();
80            closed = runtime.thread_blocking(|| cvar.wait(closed).unwrap());
81        }
82    }
83}
84
85impl<MsgT> Drop for TokioActor<MsgT>
86where
87    MsgT: ActorMsg,
88{
89    fn drop(&mut self) {
90        if self.join_on_drop {
91            log::debug!("Tokio actor {} dropping, joining actor", self.data.name);
92            self.join();
93        } else {
94            log::debug!("Tokio actor {} dropping, not joining actor", self.data.name);
95        }
96    }
97}
98
99impl<MsgT> Task for TokioActor<MsgT>
100where
101    MsgT: ActorMsg,
102{
103    fn is_finished(&self) -> bool {
104        *self.data.close_cond.0.lock().unwrap()
105    }
106}
107
108#[derive(Debug)]
109pub struct TokioActorRef<MsgT>
110where
111    MsgT: ActorMsg,
112{
113    actor: Arc<TokioActor<MsgT>>,
114}
115
116impl<MsgT> Clone for TokioActorRef<MsgT>
117where
118    MsgT: ActorMsg,
119{
120    fn clone(&self) -> Self {
121        TokioActorRef {
122            actor: self.actor.clone(),
123        }
124    }
125}
126
127impl<MsgT> Task for TokioActorRef<MsgT>
128where
129    MsgT: ActorMsg,
130{
131    fn is_finished(&self) -> bool {
132        log::debug!("Starting is_finished: locking actor");
133        self.actor.is_finished()
134    }
135}
136
137impl<MsgT> Joinable<()> for TokioActorRef<MsgT>
138where
139    MsgT: ActorMsg,
140{
141    fn join(&mut self) {
142        self.actor.join();
143    }
144}
145
146impl<MsgT> ActorRef<MsgT> for TokioActorRef<MsgT>
147where
148    MsgT: ActorMsg,
149{
150    fn send(&mut self, message: MsgT, delay: Option<Duration>) {
151        let actor = &self.actor;
152        let sender = actor.data.tx.clone();
153        let mut actor_system = actor.actor_system_handle.actor_system.lock().unwrap();
154        actor_system.spawn_async_task(async move {
155            if let Some(delay_duration) = delay {
156                log::debug!("Delaying send by {:?}", delay_duration);
157                tokio::time::sleep(delay_duration).await;
158            }
159            checked_send(sender, message);
160        });
161    }
162
163    fn set_handler(&mut self, handler: DynMsgHandler<MsgT>) {
164        self.actor
165            .data
166            .handler_tx
167            .send(Arc::new(tokio::sync::Mutex::new(handler)))
168            .unwrap();
169    }
170
171    fn spawn_async_send(
172        &mut self,
173        f: impl Future<Output = MsgT> + Send + 'static,
174        delay: Option<Duration>,
175    ) {
176        let mut self_clone = self.clone();
177        let mut actor_system_lock_guard =
178            self.actor.actor_system_handle.actor_system.lock().unwrap();
179        let async_runtime = actor_system_lock_guard.async_runtime.clone();
180        actor_system_lock_guard.push_async_task(async_runtime.spawn_async(async move {
181            self_clone.send(f.await, delay);
182        }));
183    }
184
185    fn spawn_thread_blocking_send(
186        &mut self,
187        f: impl FnOnce() -> MsgT + Send + 'static,
188        delay: Option<Duration>,
189    ) {
190        let self_clone = self.clone();
191        let mut actor_system_lock_guard =
192            self.actor.actor_system_handle.actor_system.lock().unwrap();
193        let async_runtime = actor_system_lock_guard.async_runtime.clone();
194        actor_system_lock_guard
195            .push_async_task(async_runtime.spawn_thread_blocking_send(f, self_clone, delay));
196    }
197}
198
199fn checked_send<MsgT>(sender: TUnboundedSender<MsgT>, message: MsgT)
200where
201    MsgT: ActorMsg,
202{
203    match sender.send(message) {
204        Ok(_) => (),
205        Err(e) => {
206            log::warn!("Send from tokio actor failed: {:?}", e);
207        }
208    }
209}
210
211#[derive(Debug)]
212pub struct TokioActorSystem {
213    async_runtime: Arc<AsyncRuntime>,
214    async_tasks: Vec<TokioTask<()>>,
215    join_tasks_on_drop: bool,
216}
217
218impl TokioActorSystem {
219    fn spawn_async_task(&mut self, future: impl std::future::Future<Output = ()> + Send + 'static) {
220        spawn_async_task(&mut self.async_tasks, &self.async_runtime, future);
221    }
222
223    fn extract_tasks(&mut self) -> (Vec<ThreadJoinable<()>>, Vec<TokioTask<()>>) {
224        (vec![], mem::take(&mut self.async_tasks))
225    }
226
227    fn create<MsgT>(
228        &mut self,
229        name: impl Into<String>,
230        node_id: impl Into<String>,
231    ) -> TokioActorData<MsgT>
232    where
233        MsgT: ActorMsg,
234    {
235        let (tx, mut rx) = tmpsc::unbounded_channel();
236        let (handler_tx, mut handler_rx) =
237            tmpsc::unbounded_channel::<Arc<tokio::sync::Mutex<DynMsgHandler<MsgT>>>>();
238        let close_cond = Arc::new((std::sync::Mutex::new(false), Condvar::new()));
239        let close_cond2 = close_cond.clone();
240        let actor_name = Arc::new(name.into());
241        let actor_name_clone = actor_name.clone();
242        let actor_node_id = node_id.into();
243        let actor_system_name = self.async_runtime.name.clone();
244        self.spawn_async_task(async move {
245            let mut current_handler = handler_rx.recv().await.unwrap();
246            log::debug!("Async actor '{}' on node '{}' started in tokio actor system '{}'", actor_name, actor_node_id, actor_system_name);
247            loop {
248                if let Ok(new_handler) = handler_rx.try_recv() {
249                    log::debug!("Async actor '{}' on node '{}' in tokio actor system '{}': new handler received", actor_name, actor_node_id, actor_system_name);
250                    current_handler = new_handler;
251                }
252                match rx.recv().await {
253                    None => {
254                        log::info!("Async actor '{}' on node '{}' in tokio actor system '{}': shutting down due to message receive channel having being closed", actor_name, actor_node_id, actor_system_name);
255                        rx.close();
256                        handler_rx.close();
257                        notify_close(close_cond2);
258                        return;
259                    }
260                    Some(m) => {
261                        if let Some(control) = current_handler.lock().await.receive(m) {
262                            match control {
263                                ActorControl::Exit() => {
264                                    log::info!("Async actor '{}' on node '{}' in tokio actor system '{}': closing requested by handler, shutting it down", actor_name, actor_node_id, actor_system_name);
265                                    rx.close();
266                                    handler_rx.close();
267                                    notify_close(close_cond2);
268                                    return;
269                                }
270                            }
271                        }
272                    }
273                }
274            }
275        });
276        TokioActorData {
277            tx,
278            handler_tx,
279            close_cond,
280            name: actor_name_clone,
281        }
282    }
283
284    fn push_async_task(&mut self, tokio_join_handle: TokioJoinHandle<()>) {
285        push_async_task(&mut self.async_tasks, tokio_join_handle);
286    }
287}
288
289impl Task for TokioActorSystem {
290    fn is_finished(&self) -> bool {
291        self.async_tasks.iter().all(|h| h.is_finished())
292    }
293}
294
295impl Drop for TokioActorSystem {
296    fn drop(&mut self) {
297        if self.join_tasks_on_drop {
298            log::debug!(
299                "Tokio actor system '{}' dropping, joining tasks",
300                self.async_runtime.name
301            );
302            join_tasks(self.async_runtime.clone().as_ref(), self.extract_tasks());
303        } else {
304            log::debug!(
305                "Tokio actor system '{}' dropping, not joining tasks",
306                self.async_runtime.name
307            );
308        }
309    }
310}
311
312#[derive(Clone, Debug)]
313pub struct TokioActorSystemHandle {
314    actor_system: Arc<std::sync::Mutex<TokioActorSystem>>,
315}
316
317impl TokioActorSystemHandle {
318    /// Owns the passed runtime, using it only if no contextual handle is available;
319    ///  if `None` is passed, it creates a runtime with multi-threaded support,
320    ///  CPU-based thread pool size and all features enabled.
321    pub fn new_actor_system(
322        name: impl Into<String>,
323        tokio: Option<Runtime>,
324        join_tasks_on_drop: bool,
325    ) -> Self {
326        let runtime = AsyncRuntime::new(name, tokio);
327        let actor_system = TokioActorSystem {
328            async_runtime: Arc::new(runtime),
329            async_tasks: vec![],
330            join_tasks_on_drop,
331        };
332        TokioActorSystemHandle {
333            actor_system: Arc::new(std::sync::Mutex::new(actor_system)),
334        }
335    }
336}
337
338impl ActorSystemHandle for TokioActorSystemHandle {
339    type ActorRefT<MsgT>
340        = TokioActorRef<MsgT>
341    where
342        MsgT: ActorMsg;
343
344    fn create<MsgT>(
345        &self,
346        node_id: impl Into<String>,
347        name: impl Into<String>,
348        join_on_drop: bool,
349    ) -> Self::ActorRefT<MsgT>
350    where
351        MsgT: ActorMsg,
352    {
353        TokioActorRef {
354            actor: Arc::new(TokioActor {
355                data: self.actor_system.lock().unwrap().create(name, node_id),
356                actor_system_handle: self.clone(),
357                join_on_drop,
358            }),
359        }
360    }
361}
362
363impl Task for TokioActorSystemHandle {
364    fn is_finished(&self) -> bool {
365        self.actor_system.lock().unwrap().is_finished()
366    }
367}
368
369impl Joinable<()> for TokioActorSystemHandle {
370    fn join(&mut self) {
371        let mut actor_system_lock_guard = self.actor_system.lock().unwrap();
372        let async_runtime = actor_system_lock_guard.async_runtime.clone();
373        let tasks = actor_system_lock_guard.extract_tasks();
374        // Drop the lock before joining tasks to avoid deadlocks if they also lock the actor system
375        drop(actor_system_lock_guard);
376        join_tasks(async_runtime.as_ref(), tasks);
377    }
378}
379
380pub struct TokioP2PNetworkServer {
381    socket: Arc<UdpSocket>,
382    runtime: AsyncRuntime,
383}
384
385impl TokioP2PNetworkServer {
386    /// Owns the passed runtime, using it only if no contextual handle is available;
387    ///  if `None` is passed, it creates a runtime with multi-threaded support,
388    ///  CPU-based thread pool size and all features enabled.
389    pub fn new(name: impl Into<String>, socket: UdpSocket, tokio: Option<Runtime>) -> Self {
390        TokioP2PNetworkServer {
391            socket: Arc::new(socket),
392            runtime: AsyncRuntime::new(name, tokio),
393        }
394    }
395
396    pub fn start<UntypedMsgHandlerT: UntypedMsgHandler + 'static, MsgT, DeT>(
397        &self,
398        mut handler: UntypedMsgHandlerT,
399        deserializer: DeT,
400        buffer_size: usize,
401    ) -> io::Result<TokioJoinHandle<()>>
402    where
403        MsgT: ActorMsg,
404        DeT: Fn(&mut [u8]) -> P2PNetworkResult<MsgT> + Send + 'static,
405    {
406        let addr = self.socket.local_addr()?;
407        log::info!("Async actor network server '{}' starting", addr);
408        let socket = self.socket.clone();
409        Ok(self.runtime.spawn_async(async move {
410            log::info!("Async actor network server '{}' started", addr);
411            let mut buf = vec![0; buffer_size];
412            loop {
413                log::debug!("Async actor network server '{}' receiving", addr);
414                let (valid_bytes, _) = match socket.recv_from(&mut buf[..]).await {
415                    Ok(bs) => bs,
416                    Err(e) => {
417                        log::warn!(
418                            "Async actor network server '{}': datagram receive failed: {}",
419                            addr,
420                            e
421                        );
422                        continue;
423                    }
424                };
425                log::debug!(
426                    "Async actor network server '{}' received datagram, deserializing",
427                    addr
428                );
429                let message = match deserializer(&mut buf[..valid_bytes]) {
430                    Ok(m) => m,
431                    Err(e) => {
432                        log::warn!(
433                            "Async actor network server '{}': deserialization failed: {:?}",
434                            addr,
435                            e
436                        );
437                        continue;
438                    }
439                };
440                log::debug!(
441                    "Async actor network server '{}' deserialized datagra, sending to input actor",
442                    addr
443                );
444                if let Some(ActorControl::Exit()) = match handler.receive_untyped(Box::new(message))
445                {
446                    Ok(m) => m,
447                    Err(e) => {
448                        log::warn!(
449                            "Async actor network server '{}': handler receive failed: {:?}",
450                            addr,
451                            e
452                        );
453                        continue;
454                    }
455                } {
456                    log::debug!(
457                        "Async actor network server '{}': input actor exited, stopping",
458                        addr
459                    );
460                    break;
461                }
462            }
463        }))
464    }
465}
466
467#[derive(Debug, Clone)]
468pub struct TokioP2PNetworkClient {
469    runtime: Arc<AsyncRuntime>,
470    sockets: HashMap<String, P2PNetworkResult<Arc<UdpSocket>>>,
471}
472
473impl TokioP2PNetworkClient {
474    /// Owns the passed runtime, using it only if no contextual handle is available;
475    ///  if `None` is passed, it creates a runtime with multi-threaded support,
476    ///  CPU-based thread pool size and all features enabled.
477    pub fn new(
478        name: impl Into<String>,
479        initial_peers: Vec<impl Into<String>>,
480        tokio: Option<Runtime>,
481    ) -> Self {
482        let runtime = Arc::new(AsyncRuntime::new(name, tokio));
483        let runtime_clone = runtime.clone();
484        let network_name = runtime.name.clone();
485        // `block_on_async` is used here to bind Tokio UDP sockets to peer addresses from any
486        //  context, async or thread-blocking. This is a one-time operation and is fast.
487        let sockets: HashMap<String, Result<Arc<UdpSocket>, P2PNetworkError>> =
488            runtime.block_on_async(async {
489                let initial_peer_addrs: Vec<String> =
490                    initial_peers.into_iter().map(|p| p.into()).collect(); // Consumed to produce result
491                let initial_peer_addrs_clone = initial_peer_addrs.clone(); // Consumed by `connect`
492                initial_peer_addrs
493                    .into_iter()
494                    .zip(
495                        future::join_all(initial_peer_addrs_clone.into_iter().map(|peer_addr| {
496                            let network_name = network_name.clone();
497                            runtime_clone.spawn_async(async move {
498                                log::debug!("Async actor network client '{}' started connecting to peer {}", network_name, peer_addr);
499                                let socket = UdpSocket::bind("localhost:0").await?;
500                                log::debug!(
501                                    "Async actor network client '{}' bound UDP socket to local address for connecting to peer {}",
502                                    network_name, peer_addr
503                                );
504                                let p_addr_clone = peer_addr.clone();
505                                socket.connect(peer_addr).await.map(|_| {
506                                    log::debug!("Async actor network client '{}' connected UDP socket to peer {}", network_name, p_addr_clone);
507                                    socket
508                                })
509                            })
510                        }))
511                        .await
512                        .into_iter()
513                        .map(|rr| match rr {
514                            Ok(r) => match r {
515                                Ok(socket) => {
516                                    P2PNetworkResult::<Arc<UdpSocket>>::Ok(Arc::new(socket))
517                                }
518                                Err(e) => P2PNetworkResult::<Arc<UdpSocket>>::Err(
519                                    P2PNetworkError::Io(Arc::new(e)),
520                                ),
521                            },
522                            Err(e) => P2PNetworkResult::<Arc<UdpSocket>>::Err(
523                                P2PNetworkError::Join(Arc::new(e)),
524                            ),
525                        }),
526                    )
527                    .collect()
528            });
529        TokioP2PNetworkClient { runtime, sockets }
530    }
531}
532
533impl P2PNetworkClient for TokioP2PNetworkClient {
534    fn attempt_send<MsgT, SerializerT>(
535        &mut self,
536        message: MsgT,
537        serializer: SerializerT,
538        node_addr: impl Into<String>,
539    ) -> P2PNetworkResult<()>
540    where
541        MsgT: ActorMsg,
542        SerializerT: Fn(MsgT) -> P2PNetworkResult<Vec<u8>> + Sync,
543    {
544        attempt_send_internal(&self.runtime, &self.sockets, message, serializer, node_addr)
545    }
546}
547
548fn attempt_send_internal<MsgT, SerializerT>(
549    runtime: &AsyncRuntime,
550    sockets: &HashMap<String, P2PNetworkResult<Arc<tokio::net::UdpSocket>>>,
551    message: MsgT,
552    serializer: SerializerT,
553    node_addr: impl Into<String>,
554) -> P2PNetworkResult<()>
555where
556    MsgT: ActorMsg,
557    SerializerT: Fn(MsgT) -> P2PNetworkResult<Vec<u8>> + Sync,
558{
559    let node_addr: Arc<String> = node_addr.into().into();
560    log::debug!(
561        "Async actor network client '{}' sending to {}",
562        runtime.name,
563        node_addr
564    );
565    let socket_handle = match sockets.get(node_addr.as_ref()) {
566        Some(s) => (*s).clone(),
567        None => P2PNetworkResult::Err(P2PNetworkError::ActorNotFound(node_addr.clone())),
568    }?;
569    let serialized_message = serializer(message)?;
570    let runtime_name = runtime.name.clone();
571    // `block_on_async` is used send over async UDP sockets from any context and is expected to be fast.
572    runtime.block_on_async(async {
573        log::debug!(
574            "Async actor network client '{}' starting network send to {}",
575            runtime_name,
576            node_addr
577        );
578        match socket_handle.send(&serialized_message[..]).await {
579            Ok(_) => {
580                log::debug!(
581                    "Async actor network client '{}' sent a message to {}",
582                    runtime_name,
583                    node_addr
584                );
585                Ok(())
586            }
587            Err(e) => {
588                log::warn!(
589                    "Async actor network client '{}' failed to send message to {}: {:?}",
590                    runtime_name,
591                    node_addr,
592                    e
593                );
594                Err(P2PNetworkError::Io(Arc::new(e)))
595            }
596        }
597    })
598}