1use std::{
2 collections::HashMap,
3 fmt::Debug,
4 future::Future,
5 io, mem,
6 sync::{Arc, Condvar},
7 time::Duration,
8};
9
10use bftgrid_core::actor::{
11 ActorControl, ActorMsg, ActorRef, ActorSystemHandle, DynMsgHandler, Joinable, P2PNetworkClient,
12 P2PNetworkError, P2PNetworkResult, Task, UntypedMsgHandler,
13};
14use futures::future;
15use tokio::{
16 net::UdpSocket,
17 runtime::Runtime,
18 sync::mpsc::{self as tmpsc, UnboundedSender as TUnboundedSender},
19 task::JoinHandle as TokioJoinHandle,
20};
21
22use crate::{
23 AsyncRuntime, ThreadJoinable, TokioTask, join_tasks, notify_close, push_async_task,
24 spawn_async_task,
25};
26
27#[derive(Debug)]
28struct TokioActorData<MsgT>
29where
30 MsgT: ActorMsg,
31{
32 tx: TUnboundedSender<MsgT>,
33 handler_tx: TUnboundedSender<Arc<tokio::sync::Mutex<DynMsgHandler<MsgT>>>>,
34 close_cond: Arc<(std::sync::Mutex<bool>, Condvar)>,
35 name: Arc<String>,
36}
37
38impl<MsgT> Clone for TokioActorData<MsgT>
39where
40 MsgT: ActorMsg,
41{
42 fn clone(&self) -> Self {
43 TokioActorData {
44 tx: self.tx.clone(),
45 handler_tx: self.handler_tx.clone(),
46 close_cond: self.close_cond.clone(),
47 name: self.name.clone(),
48 }
49 }
50}
51
52#[derive(Debug)]
53struct TokioActor<MsgT>
54where
55 MsgT: ActorMsg,
56{
57 data: TokioActorData<MsgT>,
58 actor_system_handle: TokioActorSystemHandle,
59 join_on_drop: bool,
60}
61
62impl<MsgT> TokioActor<MsgT>
63where
64 MsgT: ActorMsg,
65{
66 fn join(&self) {
67 let (close_mutex, cvar) = &*self.data.close_cond;
68 let mut closed = close_mutex.lock().unwrap();
69 while !*closed {
70 let runtime = self
74 .actor_system_handle
75 .actor_system
76 .lock()
77 .unwrap()
78 .async_runtime
79 .clone();
80 closed = runtime.thread_blocking(|| cvar.wait(closed).unwrap());
81 }
82 }
83}
84
85impl<MsgT> Drop for TokioActor<MsgT>
86where
87 MsgT: ActorMsg,
88{
89 fn drop(&mut self) {
90 if self.join_on_drop {
91 log::debug!("Tokio actor {} dropping, joining actor", self.data.name);
92 self.join();
93 } else {
94 log::debug!("Tokio actor {} dropping, not joining actor", self.data.name);
95 }
96 }
97}
98
99impl<MsgT> Task for TokioActor<MsgT>
100where
101 MsgT: ActorMsg,
102{
103 fn is_finished(&self) -> bool {
104 *self.data.close_cond.0.lock().unwrap()
105 }
106}
107
108#[derive(Debug)]
109pub struct TokioActorRef<MsgT>
110where
111 MsgT: ActorMsg,
112{
113 actor: Arc<TokioActor<MsgT>>,
114}
115
116impl<MsgT> Clone for TokioActorRef<MsgT>
117where
118 MsgT: ActorMsg,
119{
120 fn clone(&self) -> Self {
121 TokioActorRef {
122 actor: self.actor.clone(),
123 }
124 }
125}
126
127impl<MsgT> Task for TokioActorRef<MsgT>
128where
129 MsgT: ActorMsg,
130{
131 fn is_finished(&self) -> bool {
132 log::debug!("Starting is_finished: locking actor");
133 self.actor.is_finished()
134 }
135}
136
137impl<MsgT> Joinable<()> for TokioActorRef<MsgT>
138where
139 MsgT: ActorMsg,
140{
141 fn join(&mut self) {
142 self.actor.join();
143 }
144}
145
146impl<MsgT> ActorRef<MsgT> for TokioActorRef<MsgT>
147where
148 MsgT: ActorMsg,
149{
150 fn send(&mut self, message: MsgT, delay: Option<Duration>) {
151 let actor = &self.actor;
152 let sender = actor.data.tx.clone();
153 let mut actor_system = actor.actor_system_handle.actor_system.lock().unwrap();
154 actor_system.spawn_async_task(async move {
155 if let Some(delay_duration) = delay {
156 log::debug!("Delaying send by {:?}", delay_duration);
157 tokio::time::sleep(delay_duration).await;
158 }
159 checked_send(sender, message);
160 });
161 }
162
163 fn set_handler(&mut self, handler: DynMsgHandler<MsgT>) {
164 self.actor
165 .data
166 .handler_tx
167 .send(Arc::new(tokio::sync::Mutex::new(handler)))
168 .unwrap();
169 }
170
171 fn spawn_async_send(
172 &mut self,
173 f: impl Future<Output = MsgT> + Send + 'static,
174 delay: Option<Duration>,
175 ) {
176 let mut self_clone = self.clone();
177 let mut actor_system_lock_guard =
178 self.actor.actor_system_handle.actor_system.lock().unwrap();
179 let async_runtime = actor_system_lock_guard.async_runtime.clone();
180 actor_system_lock_guard.push_async_task(async_runtime.spawn_async(async move {
181 self_clone.send(f.await, delay);
182 }));
183 }
184
185 fn spawn_thread_blocking_send(
186 &mut self,
187 f: impl FnOnce() -> MsgT + Send + 'static,
188 delay: Option<Duration>,
189 ) {
190 let self_clone = self.clone();
191 let mut actor_system_lock_guard =
192 self.actor.actor_system_handle.actor_system.lock().unwrap();
193 let async_runtime = actor_system_lock_guard.async_runtime.clone();
194 actor_system_lock_guard
195 .push_async_task(async_runtime.spawn_thread_blocking_send(f, self_clone, delay));
196 }
197}
198
199fn checked_send<MsgT>(sender: TUnboundedSender<MsgT>, message: MsgT)
200where
201 MsgT: ActorMsg,
202{
203 match sender.send(message) {
204 Ok(_) => (),
205 Err(e) => {
206 log::warn!("Send from tokio actor failed: {:?}", e);
207 }
208 }
209}
210
211#[derive(Debug)]
212pub struct TokioActorSystem {
213 async_runtime: Arc<AsyncRuntime>,
214 async_tasks: Vec<TokioTask<()>>,
215 join_tasks_on_drop: bool,
216}
217
218impl TokioActorSystem {
219 fn spawn_async_task(&mut self, future: impl std::future::Future<Output = ()> + Send + 'static) {
220 spawn_async_task(&mut self.async_tasks, &self.async_runtime, future);
221 }
222
223 fn extract_tasks(&mut self) -> (Vec<ThreadJoinable<()>>, Vec<TokioTask<()>>) {
224 (vec![], mem::take(&mut self.async_tasks))
225 }
226
227 fn create<MsgT>(
228 &mut self,
229 name: impl Into<String>,
230 node_id: impl Into<String>,
231 ) -> TokioActorData<MsgT>
232 where
233 MsgT: ActorMsg,
234 {
235 let (tx, mut rx) = tmpsc::unbounded_channel();
236 let (handler_tx, mut handler_rx) =
237 tmpsc::unbounded_channel::<Arc<tokio::sync::Mutex<DynMsgHandler<MsgT>>>>();
238 let close_cond = Arc::new((std::sync::Mutex::new(false), Condvar::new()));
239 let close_cond2 = close_cond.clone();
240 let actor_name = Arc::new(name.into());
241 let actor_name_clone = actor_name.clone();
242 let actor_node_id = node_id.into();
243 let actor_system_name = self.async_runtime.name.clone();
244 self.spawn_async_task(async move {
245 let mut current_handler = handler_rx.recv().await.unwrap();
246 log::debug!("Async actor '{}' on node '{}' started in tokio actor system '{}'", actor_name, actor_node_id, actor_system_name);
247 loop {
248 if let Ok(new_handler) = handler_rx.try_recv() {
249 log::debug!("Async actor '{}' on node '{}' in tokio actor system '{}': new handler received", actor_name, actor_node_id, actor_system_name);
250 current_handler = new_handler;
251 }
252 match rx.recv().await {
253 None => {
254 log::info!("Async actor '{}' on node '{}' in tokio actor system '{}': shutting down due to message receive channel having being closed", actor_name, actor_node_id, actor_system_name);
255 rx.close();
256 handler_rx.close();
257 notify_close(close_cond2);
258 return;
259 }
260 Some(m) => {
261 if let Some(control) = current_handler.lock().await.receive(m) {
262 match control {
263 ActorControl::Exit() => {
264 log::info!("Async actor '{}' on node '{}' in tokio actor system '{}': closing requested by handler, shutting it down", actor_name, actor_node_id, actor_system_name);
265 rx.close();
266 handler_rx.close();
267 notify_close(close_cond2);
268 return;
269 }
270 }
271 }
272 }
273 }
274 }
275 });
276 TokioActorData {
277 tx,
278 handler_tx,
279 close_cond,
280 name: actor_name_clone,
281 }
282 }
283
284 fn push_async_task(&mut self, tokio_join_handle: TokioJoinHandle<()>) {
285 push_async_task(&mut self.async_tasks, tokio_join_handle);
286 }
287}
288
289impl Task for TokioActorSystem {
290 fn is_finished(&self) -> bool {
291 self.async_tasks.iter().all(|h| h.is_finished())
292 }
293}
294
295impl Drop for TokioActorSystem {
296 fn drop(&mut self) {
297 if self.join_tasks_on_drop {
298 log::debug!(
299 "Tokio actor system '{}' dropping, joining tasks",
300 self.async_runtime.name
301 );
302 join_tasks(self.async_runtime.clone().as_ref(), self.extract_tasks());
303 } else {
304 log::debug!(
305 "Tokio actor system '{}' dropping, not joining tasks",
306 self.async_runtime.name
307 );
308 }
309 }
310}
311
312#[derive(Clone, Debug)]
313pub struct TokioActorSystemHandle {
314 actor_system: Arc<std::sync::Mutex<TokioActorSystem>>,
315}
316
317impl TokioActorSystemHandle {
318 pub fn new_actor_system(
322 name: impl Into<String>,
323 tokio: Option<Runtime>,
324 join_tasks_on_drop: bool,
325 ) -> Self {
326 let runtime = AsyncRuntime::new(name, tokio);
327 let actor_system = TokioActorSystem {
328 async_runtime: Arc::new(runtime),
329 async_tasks: vec![],
330 join_tasks_on_drop,
331 };
332 TokioActorSystemHandle {
333 actor_system: Arc::new(std::sync::Mutex::new(actor_system)),
334 }
335 }
336}
337
338impl ActorSystemHandle for TokioActorSystemHandle {
339 type ActorRefT<MsgT>
340 = TokioActorRef<MsgT>
341 where
342 MsgT: ActorMsg;
343
344 fn create<MsgT>(
345 &self,
346 node_id: impl Into<String>,
347 name: impl Into<String>,
348 join_on_drop: bool,
349 ) -> Self::ActorRefT<MsgT>
350 where
351 MsgT: ActorMsg,
352 {
353 TokioActorRef {
354 actor: Arc::new(TokioActor {
355 data: self.actor_system.lock().unwrap().create(name, node_id),
356 actor_system_handle: self.clone(),
357 join_on_drop,
358 }),
359 }
360 }
361}
362
363impl Task for TokioActorSystemHandle {
364 fn is_finished(&self) -> bool {
365 self.actor_system.lock().unwrap().is_finished()
366 }
367}
368
369impl Joinable<()> for TokioActorSystemHandle {
370 fn join(&mut self) {
371 let mut actor_system_lock_guard = self.actor_system.lock().unwrap();
372 let async_runtime = actor_system_lock_guard.async_runtime.clone();
373 let tasks = actor_system_lock_guard.extract_tasks();
374 drop(actor_system_lock_guard);
376 join_tasks(async_runtime.as_ref(), tasks);
377 }
378}
379
380pub struct TokioP2PNetworkServer {
381 socket: Arc<UdpSocket>,
382 runtime: AsyncRuntime,
383}
384
385impl TokioP2PNetworkServer {
386 pub fn new(name: impl Into<String>, socket: UdpSocket, tokio: Option<Runtime>) -> Self {
390 TokioP2PNetworkServer {
391 socket: Arc::new(socket),
392 runtime: AsyncRuntime::new(name, tokio),
393 }
394 }
395
396 pub fn start<UntypedMsgHandlerT: UntypedMsgHandler + 'static, MsgT, DeT>(
397 &self,
398 mut handler: UntypedMsgHandlerT,
399 deserializer: DeT,
400 buffer_size: usize,
401 ) -> io::Result<TokioJoinHandle<()>>
402 where
403 MsgT: ActorMsg,
404 DeT: Fn(&mut [u8]) -> P2PNetworkResult<MsgT> + Send + 'static,
405 {
406 let addr = self.socket.local_addr()?;
407 log::info!("Async actor network server '{}' starting", addr);
408 let socket = self.socket.clone();
409 Ok(self.runtime.spawn_async(async move {
410 log::info!("Async actor network server '{}' started", addr);
411 let mut buf = vec![0; buffer_size];
412 loop {
413 log::debug!("Async actor network server '{}' receiving", addr);
414 let (valid_bytes, _) = match socket.recv_from(&mut buf[..]).await {
415 Ok(bs) => bs,
416 Err(e) => {
417 log::warn!(
418 "Async actor network server '{}': datagram receive failed: {}",
419 addr,
420 e
421 );
422 continue;
423 }
424 };
425 log::debug!(
426 "Async actor network server '{}' received datagram, deserializing",
427 addr
428 );
429 let message = match deserializer(&mut buf[..valid_bytes]) {
430 Ok(m) => m,
431 Err(e) => {
432 log::warn!(
433 "Async actor network server '{}': deserialization failed: {:?}",
434 addr,
435 e
436 );
437 continue;
438 }
439 };
440 log::debug!(
441 "Async actor network server '{}' deserialized datagra, sending to input actor",
442 addr
443 );
444 if let Some(ActorControl::Exit()) = match handler.receive_untyped(Box::new(message))
445 {
446 Ok(m) => m,
447 Err(e) => {
448 log::warn!(
449 "Async actor network server '{}': handler receive failed: {:?}",
450 addr,
451 e
452 );
453 continue;
454 }
455 } {
456 log::debug!(
457 "Async actor network server '{}': input actor exited, stopping",
458 addr
459 );
460 break;
461 }
462 }
463 }))
464 }
465}
466
467#[derive(Debug, Clone)]
468pub struct TokioP2PNetworkClient {
469 runtime: Arc<AsyncRuntime>,
470 sockets: HashMap<String, P2PNetworkResult<Arc<UdpSocket>>>,
471}
472
473impl TokioP2PNetworkClient {
474 pub fn new(
478 name: impl Into<String>,
479 initial_peers: Vec<impl Into<String>>,
480 tokio: Option<Runtime>,
481 ) -> Self {
482 let runtime = Arc::new(AsyncRuntime::new(name, tokio));
483 let runtime_clone = runtime.clone();
484 let network_name = runtime.name.clone();
485 let sockets: HashMap<String, Result<Arc<UdpSocket>, P2PNetworkError>> =
488 runtime.block_on_async(async {
489 let initial_peer_addrs: Vec<String> =
490 initial_peers.into_iter().map(|p| p.into()).collect(); let initial_peer_addrs_clone = initial_peer_addrs.clone(); initial_peer_addrs
493 .into_iter()
494 .zip(
495 future::join_all(initial_peer_addrs_clone.into_iter().map(|peer_addr| {
496 let network_name = network_name.clone();
497 runtime_clone.spawn_async(async move {
498 log::debug!("Async actor network client '{}' started connecting to peer {}", network_name, peer_addr);
499 let socket = UdpSocket::bind("localhost:0").await?;
500 log::debug!(
501 "Async actor network client '{}' bound UDP socket to local address for connecting to peer {}",
502 network_name, peer_addr
503 );
504 let p_addr_clone = peer_addr.clone();
505 socket.connect(peer_addr).await.map(|_| {
506 log::debug!("Async actor network client '{}' connected UDP socket to peer {}", network_name, p_addr_clone);
507 socket
508 })
509 })
510 }))
511 .await
512 .into_iter()
513 .map(|rr| match rr {
514 Ok(r) => match r {
515 Ok(socket) => {
516 P2PNetworkResult::<Arc<UdpSocket>>::Ok(Arc::new(socket))
517 }
518 Err(e) => P2PNetworkResult::<Arc<UdpSocket>>::Err(
519 P2PNetworkError::Io(Arc::new(e)),
520 ),
521 },
522 Err(e) => P2PNetworkResult::<Arc<UdpSocket>>::Err(
523 P2PNetworkError::Join(Arc::new(e)),
524 ),
525 }),
526 )
527 .collect()
528 });
529 TokioP2PNetworkClient { runtime, sockets }
530 }
531}
532
533impl P2PNetworkClient for TokioP2PNetworkClient {
534 fn attempt_send<MsgT, SerializerT>(
535 &mut self,
536 message: MsgT,
537 serializer: SerializerT,
538 node_addr: impl Into<String>,
539 ) -> P2PNetworkResult<()>
540 where
541 MsgT: ActorMsg,
542 SerializerT: Fn(MsgT) -> P2PNetworkResult<Vec<u8>> + Sync,
543 {
544 attempt_send_internal(&self.runtime, &self.sockets, message, serializer, node_addr)
545 }
546}
547
548fn attempt_send_internal<MsgT, SerializerT>(
549 runtime: &AsyncRuntime,
550 sockets: &HashMap<String, P2PNetworkResult<Arc<tokio::net::UdpSocket>>>,
551 message: MsgT,
552 serializer: SerializerT,
553 node_addr: impl Into<String>,
554) -> P2PNetworkResult<()>
555where
556 MsgT: ActorMsg,
557 SerializerT: Fn(MsgT) -> P2PNetworkResult<Vec<u8>> + Sync,
558{
559 let node_addr: Arc<String> = node_addr.into().into();
560 log::debug!(
561 "Async actor network client '{}' sending to {}",
562 runtime.name,
563 node_addr
564 );
565 let socket_handle = match sockets.get(node_addr.as_ref()) {
566 Some(s) => (*s).clone(),
567 None => P2PNetworkResult::Err(P2PNetworkError::ActorNotFound(node_addr.clone())),
568 }?;
569 let serialized_message = serializer(message)?;
570 let runtime_name = runtime.name.clone();
571 runtime.block_on_async(async {
573 log::debug!(
574 "Async actor network client '{}' starting network send to {}",
575 runtime_name,
576 node_addr
577 );
578 match socket_handle.send(&serialized_message[..]).await {
579 Ok(_) => {
580 log::debug!(
581 "Async actor network client '{}' sent a message to {}",
582 runtime_name,
583 node_addr
584 );
585 Ok(())
586 }
587 Err(e) => {
588 log::warn!(
589 "Async actor network client '{}' failed to send message to {}: {:?}",
590 runtime_name,
591 node_addr,
592 e
593 );
594 Err(P2PNetworkError::Io(Arc::new(e)))
595 }
596 }
597 })
598}