1use std::{
10 any::Any,
11 error::Error,
12 fmt::{Debug, Display, Error as FmtError, Formatter},
13 io,
14 sync::Arc,
15 time::Duration,
16};
17
18use dyn_clone::{DynClone, clone_trait_object};
19use thiserror::Error;
20use tokio::task::JoinError;
21
22pub trait ActorMsg: DynClone + Any + Send + Debug {}
23clone_trait_object!(ActorMsg);
24
25pub enum ActorControl {
26 Exit(),
27}
28
29pub trait TypedMsgHandler<MsgT>: Send
32where
33 MsgT: ActorMsg,
34{
35 fn receive(&mut self, message: MsgT) -> Option<ActorControl>;
36}
37
38pub type DynMsgHandler<MsgT> = Box<dyn TypedMsgHandler<MsgT>>;
39
40pub type AnActorMsg = Box<dyn ActorMsg>;
41
42impl ActorMsg for AnActorMsg {}
43
44#[derive(Debug, Clone)]
45pub struct MessageNotSupported();
46
47impl Display for MessageNotSupported {
48 fn fmt(&self, f: &mut Formatter) -> Result<(), FmtError> {
49 write!(f, "Message not supported")
50 }
51}
52
53impl Error for MessageNotSupported {}
54
55pub trait UntypedMsgHandler: Send {
58 fn receive_untyped(
59 &mut self,
60 message: AnActorMsg,
61 ) -> Result<Option<ActorControl>, MessageNotSupported>;
62}
63
64pub type UntypedHandlerBox = Box<dyn UntypedMsgHandler>;
65
66pub trait Task: Send + Debug {
68 fn is_finished(&self) -> bool;
69}
70
71pub trait Joinable<Output>: Task + Send + Debug {
74 fn join(&mut self) -> Output;
75}
76
77pub trait ActorRef<MsgT>: DynClone + Send + Debug
81where
82 MsgT: ActorMsg + 'static,
83{
84 fn send(&mut self, message: MsgT, delay: Option<Duration>);
85
86 fn set_handler(&mut self, handler: DynMsgHandler<MsgT>);
87
88 fn spawn_async_send(
89 &mut self,
90 f: impl Future<Output = MsgT> + Send + 'static,
91 delay: Option<Duration>,
92 );
93
94 fn spawn_thread_blocking_send(
95 &mut self,
96 f: impl FnOnce() -> MsgT + Send + 'static,
97 delay: Option<Duration>,
98 );
99}
100
101pub trait ActorSystemHandle: DynClone + Send {
104 type ActorRefT<MsgT>: ActorRef<MsgT>
105 where
106 MsgT: ActorMsg;
107
108 fn create<MsgT>(
109 &self,
110 node_id: impl Into<String>,
111 name: impl Into<String>,
112 join_on_drop: bool,
113 ) -> Self::ActorRefT<MsgT>
114 where
115 MsgT: ActorMsg;
116}
117
118#[derive(Error, Debug, Clone)]
119pub enum P2PNetworkError {
120 #[error("I/O error")]
121 Io(#[from] Arc<io::Error>),
122 #[error("Join error")]
123 Join(#[from] Arc<JoinError>),
124 #[error("Actor not found")]
125 ActorNotFound(Arc<String>),
126 #[error("Message type not supported")]
127 MessageNotSupported,
128}
129
130pub type P2PNetworkResult<R> = Result<R, P2PNetworkError>;
131
132pub trait P2PNetworkClient: DynClone {
134 fn attempt_send<MsgT, SerializerT>(
135 &mut self,
136 message: MsgT,
137 serializer: SerializerT,
138 node: impl Into<String>,
139 ) -> P2PNetworkResult<()>
140 where
141 MsgT: ActorMsg,
142 SerializerT: Fn(MsgT) -> P2PNetworkResult<Vec<u8>> + Sync + 'static;
143}
144
145pub mod erased {
147 use std::any::Any;
148 use std::fmt::Debug;
149 use std::marker::PhantomData;
150 use std::{pin::Pin, time::Duration};
151
152 use dyn_clone::DynClone;
153
154 use crate::actor;
155
156 use super::{ActorMsg, AnActorMsg, DynMsgHandler, P2PNetworkResult, TypedMsgHandler};
157
158 pub type DynFuture<MsgT> = Pin<Box<dyn Future<Output = MsgT> + Send + 'static>>;
159 pub type DynLazy<MsgT> = Box<dyn FnOnce() -> MsgT + Send + 'static>;
160
161 pub trait ActorRef<MsgT>: DynClone + Send + Debug
162 where
163 MsgT: ActorMsg + 'static,
164 {
165 fn send(&mut self, message: MsgT, delay: Option<Duration>);
166
167 fn set_handler(&mut self, handler: DynMsgHandler<MsgT>);
168
169 fn spawn_async_send(&mut self, f: DynFuture<MsgT>, delay: Option<Duration>);
170
171 fn spawn_thread_blocking_send(&mut self, f: DynLazy<MsgT>, delay: Option<Duration>);
172 }
173
174 pub type DynActorRef<MsgT> = Box<dyn ActorRef<MsgT>>;
175
176 impl<MsgT> Clone for DynActorRef<MsgT> {
177 fn clone(&self) -> Self {
178 dyn_clone::clone_box(&**self)
179 }
180 }
181
182 impl<ActorRefT, MsgT> ActorRef<MsgT> for ActorRefT
183 where
184 ActorRefT: actor::ActorRef<MsgT> + ?Sized,
185 MsgT: ActorMsg + 'static,
186 {
187 fn send(&mut self, message: MsgT, delay: Option<Duration>) {
188 self.send(message, delay);
189 }
190
191 fn set_handler(&mut self, handler: DynMsgHandler<MsgT>) {
192 self.set_handler(handler);
193 }
194
195 fn spawn_async_send(&mut self, f: DynFuture<MsgT>, delay: Option<Duration>) {
196 self.spawn_async_send(f, delay);
197 }
198
199 fn spawn_thread_blocking_send(&mut self, f: DynLazy<MsgT>, delay: Option<Duration>) {
200 self.spawn_thread_blocking_send(f, delay);
201 }
202 }
203
204 impl<MsgT> actor::ActorRef<MsgT> for dyn ActorRef<MsgT>
205 where
206 MsgT: ActorMsg + 'static,
207 {
208 fn send(&mut self, message: MsgT, delay: Option<Duration>) {
209 ActorRef::send(self, message, delay);
210 }
211
212 fn set_handler(&mut self, handler: DynMsgHandler<MsgT>) {
213 ActorRef::set_handler(self, handler);
214 }
215
216 fn spawn_async_send(
217 &mut self,
218 f: impl Future<Output = MsgT> + Send + 'static,
219 delay: Option<Duration>,
220 ) {
221 ActorRef::spawn_async_send(self, Box::pin(f), delay);
222 }
223
224 fn spawn_thread_blocking_send(
225 &mut self,
226 f: impl FnOnce() -> MsgT + Send + 'static,
227 delay: Option<Duration>,
228 ) {
229 ActorRef::spawn_thread_blocking_send(self, Box::new(f), delay);
230 }
231 }
232
233 pub trait ActorSystemHandle: DynClone + Send {
234 fn create(
235 &self,
236 node_id: String,
237 name: String,
238 join_on_drop: bool,
239 ) -> DynActorRef<AnActorMsg>;
240 }
241
242 pub type DynActorSystemHandle = Box<dyn ActorSystemHandle>;
243
244 impl Clone for DynActorSystemHandle {
245 fn clone(&self) -> Self {
246 dyn_clone::clone_box(&**self)
247 }
248 }
249
250 impl<ActorSystemHandleT> ActorSystemHandle for ActorSystemHandleT
251 where
252 ActorSystemHandleT: actor::ActorSystemHandle + ?Sized + 'static,
253 {
254 fn create(
255 &self,
256 node_id: String,
257 name: String,
258 join_on_drop: bool,
259 ) -> DynActorRef<AnActorMsg> {
260 Box::new(self.create(node_id, name, join_on_drop))
261 }
262 }
263
264 #[derive(Debug)]
265 pub struct DynActorRefWrapped<MsgT>
266 where
267 MsgT: ActorMsg,
268 {
269 an_actor_ref: DynActorRef<AnActorMsg>,
270 _p: PhantomData<MsgT>,
271 }
272
273 impl<MsgT> Clone for DynActorRefWrapped<MsgT>
274 where
275 MsgT: ActorMsg,
276 {
277 fn clone(&self) -> Self {
278 DynActorRefWrapped {
279 an_actor_ref: self.an_actor_ref.clone(),
280 _p: PhantomData {},
281 }
282 }
283 }
284
285 struct DowncastingMsgHandler<MsgT>
286 where
287 MsgT: ActorMsg,
288 {
289 dyn_msg_handler: DynMsgHandler<MsgT>,
290 }
291
292 impl<MsgT> TypedMsgHandler<AnActorMsg> for DowncastingMsgHandler<MsgT>
293 where
294 MsgT: ActorMsg,
295 {
296 fn receive(&mut self, message: AnActorMsg) -> Option<actor::ActorControl> {
297 match (message as Box<dyn Any>).downcast::<MsgT>() {
298 Ok(m) => self.dyn_msg_handler.receive(*m),
299 Err(_) => None,
300 }
301 }
302 }
303
304 impl<MsgT> actor::ActorRef<MsgT> for DynActorRefWrapped<MsgT>
305 where
306 MsgT: ActorMsg,
307 {
308 fn send(&mut self, message: MsgT, delay: Option<Duration>) {
309 self.an_actor_ref
310 .send(Box::new(message) as Box<dyn ActorMsg>, delay);
311 }
312
313 fn set_handler(&mut self, handler: DynMsgHandler<MsgT>) {
314 self.an_actor_ref
315 .set_handler(Box::new(DowncastingMsgHandler::<MsgT> {
316 dyn_msg_handler: handler,
317 }));
318 }
319
320 fn spawn_async_send(
321 &mut self,
322 f: impl Future<Output = MsgT> + Send + 'static,
323 delay: Option<Duration>,
324 ) {
325 self.an_actor_ref.spawn_async_send(
326 Box::pin(async move { Box::new(f.await) as Box<dyn ActorMsg> }),
327 delay,
328 );
329 }
330
331 fn spawn_thread_blocking_send(
332 &mut self,
333 f: impl FnOnce() -> MsgT + Send + 'static,
334 delay: Option<Duration>,
335 ) {
336 self.an_actor_ref
337 .spawn_thread_blocking_send(Box::new(|| Box::new(f())), delay);
338 }
339 }
340
341 impl actor::ActorSystemHandle for dyn ActorSystemHandle {
342 type ActorRefT<MsgT>
343 = DynActorRefWrapped<MsgT>
344 where
345 MsgT: ActorMsg;
346
347 fn create<MsgT>(
348 &self,
349 node_id: impl Into<String>,
350 name: impl Into<String>,
351 join_on_drop: bool,
352 ) -> Self::ActorRefT<MsgT>
353 where
354 MsgT: ActorMsg,
355 {
356 DynActorRefWrapped {
357 an_actor_ref: ActorSystemHandle::create(
358 self,
359 node_id.into(),
360 name.into(),
361 join_on_drop,
362 ),
363 _p: PhantomData {},
364 }
365 }
366 }
367
368 pub trait P2PNetworkClient: DynClone + Send {
369 fn attempt_send(
370 &mut self,
371 message: AnActorMsg,
372 serializer: Box<dyn Fn(AnActorMsg) -> P2PNetworkResult<Vec<u8>> + Sync>,
373 node: String,
374 ) -> P2PNetworkResult<()>;
375 }
376
377 pub type DynP2PNetworkClient = Box<dyn P2PNetworkClient>;
378
379 impl Clone for DynP2PNetworkClient {
380 fn clone(&self) -> Self {
381 dyn_clone::clone_box(&**self)
382 }
383 }
384
385 impl<P2PNetworkClientT> P2PNetworkClient for P2PNetworkClientT
386 where
387 P2PNetworkClientT: actor::P2PNetworkClient + Send + ?Sized + 'static,
388 {
389 fn attempt_send(
390 &mut self,
391 message: AnActorMsg,
392 serializer: Box<dyn Fn(AnActorMsg) -> P2PNetworkResult<Vec<u8>> + Sync>,
393 node: String,
394 ) -> P2PNetworkResult<()> {
395 self.attempt_send(message, serializer, node)
396 }
397 }
398
399 impl actor::P2PNetworkClient for dyn P2PNetworkClient {
400 fn attempt_send<MsgT, SerializerT>(
401 &mut self,
402 message: MsgT,
403 serializer: SerializerT,
404 node: impl Into<String>,
405 ) -> P2PNetworkResult<()>
406 where
407 MsgT: ActorMsg,
408 SerializerT: Fn(MsgT) -> P2PNetworkResult<Vec<u8>> + Sync + 'static,
409 {
410 P2PNetworkClient::attempt_send(
411 self,
412 Box::new(message),
413 Box::new(move |msg| match (msg as Box<dyn Any>).downcast::<MsgT>() {
414 Ok(typed_msg) => serializer(*typed_msg),
415 Err(_) => Err(actor::P2PNetworkError::MessageNotSupported),
416 }),
417 node.into(),
418 )
419 }
420 }
421}