1mod utils;
2
3use std::marker::PhantomData;
4
5use bftgrid_core::actor::{
6 ActorControl, ActorMsg, ActorRef, ActorSystemHandle, Joinable, TypedMsgHandler,
7};
8use bftgrid_mt::{thread::ThreadActorSystemHandle, tokio::TokioActorSystemHandle};
9
10#[derive(Clone, Debug)]
11struct Ping();
12impl ActorMsg for Ping {}
13
14#[derive(Clone, Debug)]
15struct Actor1ToActor2<Actor1RefT>
16where
17 Actor1RefT: ActorRef<Ping>,
18{
19 pub actor1_ref: Actor1RefT,
20}
21
22impl<Actor1RefT> ActorMsg for Actor1ToActor2<Actor1RefT> where
23 Actor1RefT: ActorRef<Ping> + Clone + 'static
24{
25}
26
27#[derive(Debug)]
28struct Actor1<ActorSystemT, Actor1RefT, Actor2RefT>
29where
30 ActorSystemT: ActorSystemHandle + 'static,
31 Actor1RefT: ActorRef<Ping> + Clone + 'static,
32 Actor2RefT: ActorRef<Actor1ToActor2<Actor1RefT>> + Clone,
33{
34 self_ref: Actor1RefT,
35 node_id: String,
36 actor_system: ActorSystemT,
37 actor2_ref: Actor2RefT,
38 ping_count: u8,
39 spawn_count: u8,
40}
41
42impl<ActorSystemT, Actor1RefT, Actor2RefT> Actor1<ActorSystemT, Actor1RefT, Actor2RefT>
43where
44 ActorSystemT: ActorSystemHandle + std::fmt::Debug + Send,
45 Actor1RefT: ActorRef<Ping> + Clone + 'static,
46 Actor2RefT: ActorRef<Actor1ToActor2<Actor1RefT>> + Clone,
47{
48 fn new(
49 self_ref: Actor1RefT,
50 node_id: impl Into<String>,
51 actor_system: ActorSystemT,
52 actor2_ref: Actor2RefT,
53 ) -> Actor1<ActorSystemT, Actor1RefT, Actor2RefT> {
54 Actor1 {
55 self_ref,
56 node_id: node_id.into(),
57 actor_system,
58 actor2_ref,
59 ping_count: 0,
60 spawn_count: 0,
61 }
62 }
63}
64
65#[derive(Debug)]
66struct Actor2<Actor1RefT>
67where
68 Actor1RefT: ActorRef<Ping>,
69{
70 _p: PhantomData<Actor1RefT>,
71}
72
73impl<Actor1RefT> Actor2<Actor1RefT>
74where
75 Actor1RefT: ActorRef<Ping>,
76{
77 fn new() -> Self {
78 Actor2 { _p: PhantomData {} }
79 }
80}
81
82impl<Actor1RefT> TypedMsgHandler<Actor1ToActor2<Actor1RefT>> for Actor2<Actor1RefT>
83where
84 Actor1RefT: ActorRef<Ping> + Clone + 'static,
85{
86 fn receive(&mut self, mut msg: Actor1ToActor2<Actor1RefT>) -> Option<ActorControl> {
87 log::info!("Actor2 received ref, sending ping to it");
88 msg.actor1_ref.send(Ping(), None);
89 log::info!("Actor2 sent ping, exiting");
90 Some(ActorControl::Exit())
91 }
92}
93
94impl<ActorSystemT, Actor1RefT, Actor2RefT> TypedMsgHandler<Ping>
95 for Actor1<ActorSystemT, Actor1RefT, Actor2RefT>
96where
97 ActorSystemT: ActorSystemHandle + std::fmt::Debug + Clone + Send + 'static,
98 Actor1RefT: ActorRef<Ping> + Clone + 'static,
99 Actor2RefT: ActorRef<Actor1ToActor2<Actor1RefT>> + Clone + 'static,
100{
101 fn receive(&mut self, _msg: Ping) -> Option<ActorControl> {
102 let ret = match self.ping_count {
103 0 => {
104 log::info!("Actor1 received first ping, sending ref to Actor2");
105 let self_ref = self.self_ref.clone();
106 self.actor2_ref.send(
107 Actor1ToActor2 {
108 actor1_ref: self_ref,
109 },
110 None,
111 );
112 None
113 }
114 1 => {
115 log::info!("Actor1 received second ping, self-pinging after async work");
116 self.self_ref.spawn_async_send(
117 async move {
118 log::info!("Actor1 simulating async work");
119 Ping()
120 },
121 None,
122 );
123 None
124 }
125 _ => {
126 log::info!("Actor1 received third ping");
127 log::info!("Actor1 simulating async work");
128 if self.spawn_count < 1 {
129 log::info!("Actor1 spawning");
130 let mut new_ref = self.actor_system.create::<Ping>(
131 self.node_id.clone(),
132 self.spawn_count.to_string(),
133 false,
134 );
135 log::info!("Actor1 setting handler");
136 new_ref.set_handler(Box::new(Actor1 {
137 self_ref: self.self_ref.clone(),
138 node_id: self.node_id.clone(),
139 actor_system: self.actor_system.clone(),
140 actor2_ref: self.actor2_ref.clone(),
141 ping_count: 3,
142 spawn_count: self.spawn_count + 1,
143 }));
144 log::info!("Actor1 sending");
145 new_ref.send(Ping(), None);
146 log::info!("Actor1 done");
147 }
148 log::info!("Actor1 exiting");
149 Some(ActorControl::Exit())
150 }
151 };
152 self.ping_count += 1;
153 ret
154 }
155}
156
157struct System<Actor1ActorSystemT, Actor2ActorSystemT>
158where
159 Actor1ActorSystemT: ActorSystemHandle + std::fmt::Debug + Send + 'static,
160 Actor2ActorSystemT: ActorSystemHandle + 'static,
161 Actor1ActorSystemT::ActorRefT<Ping>: Clone,
162{
163 actor1_ref: Actor1ActorSystemT::ActorRefT<Ping>,
164 actor2_ref: Actor2ActorSystemT::ActorRefT<Actor1ToActor2<Actor1ActorSystemT::ActorRefT<Ping>>>,
165 actor1_actor_system_type: PhantomData<Actor1ActorSystemT>,
166}
167
168impl<Actor1ActorSystemT, Actor2ActorSystemT> System<Actor1ActorSystemT, Actor2ActorSystemT>
169where
170 Actor1ActorSystemT: ActorSystemHandle + std::fmt::Debug + Send + 'static,
171 Actor2ActorSystemT: ActorSystemHandle + 'static,
172 Actor1ActorSystemT::ActorRefT<Ping>: Clone,
173{
174 fn new(
175 actor1_ref: Actor1ActorSystemT::ActorRefT<Ping>,
176 actor2_ref: Actor2ActorSystemT::ActorRefT<
177 Actor1ToActor2<Actor1ActorSystemT::ActorRefT<Ping>>,
178 >,
179 ) -> Self {
180 System {
181 actor1_ref,
182 actor2_ref,
183 actor1_actor_system_type: PhantomData {},
184 }
185 }
186}
187
188fn build_system<Actor1ActorSystemT, Actor2ActorSystemT>(
189 actor1_actor_system: Actor1ActorSystemT,
190 actor2_actor_system: Actor2ActorSystemT,
191) -> System<Actor1ActorSystemT, Actor2ActorSystemT>
192where
193 Actor1ActorSystemT: ActorSystemHandle + std::fmt::Debug + Clone + Send + 'static,
194 Actor2ActorSystemT: ActorSystemHandle + 'static,
195 Actor1ActorSystemT::ActorRefT<Ping>: Clone,
196 Actor2ActorSystemT::ActorRefT<Actor1ToActor2<Actor1ActorSystemT::ActorRefT<Ping>>>: Clone,
197{
198 let mut actor1_ref = actor1_actor_system.create("node", "actor1", false);
199 let actor1_ref_copy = actor1_ref.clone();
200 let mut actor2_ref = actor2_actor_system.create("node", "actor2", false);
201 let actor2_ref_copy = actor2_ref.clone();
202 actor2_ref.set_handler(Box::new(
203 Actor2::<Actor1ActorSystemT::ActorRefT<Ping>>::new(),
204 ));
205 actor1_ref.set_handler(Box::new(Actor1::new(
206 actor1_ref_copy,
207 "node",
208 actor1_actor_system.clone(),
209 actor2_ref_copy,
210 )));
211 System::new(actor1_ref, actor2_ref)
212}
213
214#[tokio::main]
217async fn main() {
218 utils::setup_logging(false);
219 let mut thread_actor_system =
220 ThreadActorSystemHandle::new_actor_system("thread-as", None, false);
221 let mut tokio_actor_system = TokioActorSystemHandle::new_actor_system("tokio-as", None, false);
222 let System {
223 mut actor1_ref,
224 mut actor2_ref,
225 ..
226 } = build_system(thread_actor_system.clone(), tokio_actor_system.clone());
227 actor1_ref.send(Ping(), None);
228 actor2_ref.join();
229 log::info!("Joined Actor2");
230 actor1_ref.join();
231 log::info!("Joined Actor1");
232 tokio_actor_system.join();
233 thread_actor_system.join();
234}
235
236#[cfg(test)]
237mod tests {
238 use std::{
239 collections::HashMap,
240 ops::Add,
241 time::{Duration, Instant},
242 };
243
244 use bftgrid_core::actor::ActorRef;
245 use bftgrid_sim::{NodeDescriptor, Simulation};
246
247 use crate::{Ping, System, build_system, utils};
248
249 #[test]
250 fn simulation() {
251 utils::setup_logging(true);
252 let mut topology = HashMap::new();
253 topology.insert("node".into(), NodeDescriptor::default());
254 let start = Instant::now();
255 let simulation = Simulation::new(topology, start, start.add(Duration::from_secs(100)));
256 let System { mut actor1_ref, .. } = build_system(simulation.clone(), simulation.clone());
257 actor1_ref.send(Ping(), None);
258 let history = simulation.run();
259 log::info!("{:?}", history);
260 }
261}