bftgrid_local_example/
local.rs

1mod utils;
2
3use std::marker::PhantomData;
4
5use bftgrid_core::actor::{
6    ActorControl, ActorMsg, ActorRef, ActorSystemHandle, Joinable, TypedMsgHandler,
7};
8use bftgrid_mt::{thread::ThreadActorSystemHandle, tokio::TokioActorSystemHandle};
9
10#[derive(Clone, Debug)]
11struct Ping();
12impl ActorMsg for Ping {}
13
14#[derive(Clone, Debug)]
15struct Actor1ToActor2<Actor1RefT>
16where
17    Actor1RefT: ActorRef<Ping>,
18{
19    pub actor1_ref: Actor1RefT,
20}
21
22impl<Actor1RefT> ActorMsg for Actor1ToActor2<Actor1RefT> where
23    Actor1RefT: ActorRef<Ping> + Clone + 'static
24{
25}
26
27#[derive(Debug)]
28struct Actor1<ActorSystemT, Actor1RefT, Actor2RefT>
29where
30    ActorSystemT: ActorSystemHandle + 'static,
31    Actor1RefT: ActorRef<Ping> + Clone + 'static,
32    Actor2RefT: ActorRef<Actor1ToActor2<Actor1RefT>> + Clone,
33{
34    self_ref: Actor1RefT,
35    node_id: String,
36    actor_system: ActorSystemT,
37    actor2_ref: Actor2RefT,
38    ping_count: u8,
39    spawn_count: u8,
40}
41
42impl<ActorSystemT, Actor1RefT, Actor2RefT> Actor1<ActorSystemT, Actor1RefT, Actor2RefT>
43where
44    ActorSystemT: ActorSystemHandle + std::fmt::Debug + Send,
45    Actor1RefT: ActorRef<Ping> + Clone + 'static,
46    Actor2RefT: ActorRef<Actor1ToActor2<Actor1RefT>> + Clone,
47{
48    fn new(
49        self_ref: Actor1RefT,
50        node_id: impl Into<String>,
51        actor_system: ActorSystemT,
52        actor2_ref: Actor2RefT,
53    ) -> Actor1<ActorSystemT, Actor1RefT, Actor2RefT> {
54        Actor1 {
55            self_ref,
56            node_id: node_id.into(),
57            actor_system,
58            actor2_ref,
59            ping_count: 0,
60            spawn_count: 0,
61        }
62    }
63}
64
65#[derive(Debug)]
66struct Actor2<Actor1RefT>
67where
68    Actor1RefT: ActorRef<Ping>,
69{
70    _p: PhantomData<Actor1RefT>,
71}
72
73impl<Actor1RefT> Actor2<Actor1RefT>
74where
75    Actor1RefT: ActorRef<Ping>,
76{
77    fn new() -> Self {
78        Actor2 { _p: PhantomData {} }
79    }
80}
81
82impl<Actor1RefT> TypedMsgHandler<Actor1ToActor2<Actor1RefT>> for Actor2<Actor1RefT>
83where
84    Actor1RefT: ActorRef<Ping> + Clone + 'static,
85{
86    fn receive(&mut self, mut msg: Actor1ToActor2<Actor1RefT>) -> Option<ActorControl> {
87        log::info!("Actor2 received ref, sending ping to it");
88        msg.actor1_ref.send(Ping(), None);
89        log::info!("Actor2 sent ping, exiting");
90        Some(ActorControl::Exit())
91    }
92}
93
94impl<ActorSystemT, Actor1RefT, Actor2RefT> TypedMsgHandler<Ping>
95    for Actor1<ActorSystemT, Actor1RefT, Actor2RefT>
96where
97    ActorSystemT: ActorSystemHandle + std::fmt::Debug + Clone + Send + 'static,
98    Actor1RefT: ActorRef<Ping> + Clone + 'static,
99    Actor2RefT: ActorRef<Actor1ToActor2<Actor1RefT>> + Clone + 'static,
100{
101    fn receive(&mut self, _msg: Ping) -> Option<ActorControl> {
102        let ret = match self.ping_count {
103            0 => {
104                log::info!("Actor1 received first ping, sending ref to Actor2");
105                let self_ref = self.self_ref.clone();
106                self.actor2_ref.send(
107                    Actor1ToActor2 {
108                        actor1_ref: self_ref,
109                    },
110                    None,
111                );
112                None
113            }
114            1 => {
115                log::info!("Actor1 received second ping, self-pinging after async work");
116                self.self_ref.spawn_async_send(
117                    async move {
118                        log::info!("Actor1 simulating async work");
119                        Ping()
120                    },
121                    None,
122                );
123                None
124            }
125            _ => {
126                log::info!("Actor1 received third ping");
127                log::info!("Actor1 simulating async work");
128                if self.spawn_count < 1 {
129                    log::info!("Actor1 spawning");
130                    let mut new_ref = self.actor_system.create::<Ping>(
131                        self.node_id.clone(),
132                        self.spawn_count.to_string(),
133                        false,
134                    );
135                    log::info!("Actor1 setting handler");
136                    new_ref.set_handler(Box::new(Actor1 {
137                        self_ref: self.self_ref.clone(),
138                        node_id: self.node_id.clone(),
139                        actor_system: self.actor_system.clone(),
140                        actor2_ref: self.actor2_ref.clone(),
141                        ping_count: 3,
142                        spawn_count: self.spawn_count + 1,
143                    }));
144                    log::info!("Actor1 sending");
145                    new_ref.send(Ping(), None);
146                    log::info!("Actor1 done");
147                }
148                log::info!("Actor1 exiting");
149                Some(ActorControl::Exit())
150            }
151        };
152        self.ping_count += 1;
153        ret
154    }
155}
156
157struct System<Actor1ActorSystemT, Actor2ActorSystemT>
158where
159    Actor1ActorSystemT: ActorSystemHandle + std::fmt::Debug + Send + 'static,
160    Actor2ActorSystemT: ActorSystemHandle + 'static,
161    Actor1ActorSystemT::ActorRefT<Ping>: Clone,
162{
163    actor1_ref: Actor1ActorSystemT::ActorRefT<Ping>,
164    actor2_ref: Actor2ActorSystemT::ActorRefT<Actor1ToActor2<Actor1ActorSystemT::ActorRefT<Ping>>>,
165    actor1_actor_system_type: PhantomData<Actor1ActorSystemT>,
166}
167
168impl<Actor1ActorSystemT, Actor2ActorSystemT> System<Actor1ActorSystemT, Actor2ActorSystemT>
169where
170    Actor1ActorSystemT: ActorSystemHandle + std::fmt::Debug + Send + 'static,
171    Actor2ActorSystemT: ActorSystemHandle + 'static,
172    Actor1ActorSystemT::ActorRefT<Ping>: Clone,
173{
174    fn new(
175        actor1_ref: Actor1ActorSystemT::ActorRefT<Ping>,
176        actor2_ref: Actor2ActorSystemT::ActorRefT<
177            Actor1ToActor2<Actor1ActorSystemT::ActorRefT<Ping>>,
178        >,
179    ) -> Self {
180        System {
181            actor1_ref,
182            actor2_ref,
183            actor1_actor_system_type: PhantomData {},
184        }
185    }
186}
187
188fn build_system<Actor1ActorSystemT, Actor2ActorSystemT>(
189    actor1_actor_system: Actor1ActorSystemT,
190    actor2_actor_system: Actor2ActorSystemT,
191) -> System<Actor1ActorSystemT, Actor2ActorSystemT>
192where
193    Actor1ActorSystemT: ActorSystemHandle + std::fmt::Debug + Clone + Send + 'static,
194    Actor2ActorSystemT: ActorSystemHandle + 'static,
195    Actor1ActorSystemT::ActorRefT<Ping>: Clone,
196    Actor2ActorSystemT::ActorRefT<Actor1ToActor2<Actor1ActorSystemT::ActorRefT<Ping>>>: Clone,
197{
198    let mut actor1_ref = actor1_actor_system.create("node", "actor1", false);
199    let actor1_ref_copy = actor1_ref.clone();
200    let mut actor2_ref = actor2_actor_system.create("node", "actor2", false);
201    let actor2_ref_copy = actor2_ref.clone();
202    actor2_ref.set_handler(Box::new(
203        Actor2::<Actor1ActorSystemT::ActorRefT<Ping>>::new(),
204    ));
205    actor1_ref.set_handler(Box::new(Actor1::new(
206        actor1_ref_copy,
207        "node",
208        actor1_actor_system.clone(),
209        actor2_ref_copy,
210    )));
211    System::new(actor1_ref, actor2_ref)
212}
213
214// Components that need a Tokio runtime will reuse the one from the async context, if any,
215//  otherwise they will create a new one.
216#[tokio::main]
217async fn main() {
218    utils::setup_logging(false);
219    let mut thread_actor_system =
220        ThreadActorSystemHandle::new_actor_system("thread-as", None, false);
221    let mut tokio_actor_system = TokioActorSystemHandle::new_actor_system("tokio-as", None, false);
222    let System {
223        mut actor1_ref,
224        mut actor2_ref,
225        ..
226    } = build_system(thread_actor_system.clone(), tokio_actor_system.clone());
227    actor1_ref.send(Ping(), None);
228    actor2_ref.join();
229    log::info!("Joined Actor2");
230    actor1_ref.join();
231    log::info!("Joined Actor1");
232    tokio_actor_system.join();
233    thread_actor_system.join();
234}
235
236#[cfg(test)]
237mod tests {
238    use std::{
239        collections::HashMap,
240        ops::Add,
241        time::{Duration, Instant},
242    };
243
244    use bftgrid_core::actor::ActorRef;
245    use bftgrid_sim::{NodeDescriptor, Simulation};
246
247    use crate::{Ping, System, build_system, utils};
248
249    #[test]
250    fn simulation() {
251        utils::setup_logging(true);
252        let mut topology = HashMap::new();
253        topology.insert("node".into(), NodeDescriptor::default());
254        let start = Instant::now();
255        let simulation = Simulation::new(topology, start, start.add(Duration::from_secs(100)));
256        let System { mut actor1_ref, .. } = build_system(simulation.clone(), simulation.clone());
257        actor1_ref.send(Ping(), None);
258        let history = simulation.run();
259        log::info!("{:?}", history);
260    }
261}