1mod utils;
2
3use std::any::Any;
4
5use bftgrid_core::actor::{
6 ActorControl, ActorMsg, ActorRef, ActorSystemHandle, AnActorMsg, Joinable, MessageNotSupported,
7 P2PNetworkClient, TypedMsgHandler, UntypedMsgHandler,
8 erased::{DynActorRef, DynActorSystemHandle, DynP2PNetworkClient},
9};
10
11use bftgrid_mt::{
12 AsyncRuntime,
13 thread::ThreadActorSystemHandle,
14 tokio::{TokioActorSystemHandle, TokioP2PNetworkClient, TokioP2PNetworkServer},
15};
16use tokio::net::UdpSocket;
17
18#[derive(Clone, Debug)]
19struct Ping();
20impl ActorMsg for Ping {}
21
22struct Actor1 {
23 self_ref: DynActorRef<AnActorMsg>,
24 node_id: String,
25 actor_system: DynActorSystemHandle,
26 network_out: DynP2PNetworkClient,
27 ping_count: u8,
28 spawn_count: u8,
29}
30
31impl Actor1 {
32 fn new(
33 self_ref: DynActorRef<AnActorMsg>,
34 node_id: impl Into<String>,
35 actor_system: DynActorSystemHandle,
36 network_out: DynP2PNetworkClient,
37 ) -> Actor1 {
38 Actor1 {
39 self_ref,
40 node_id: node_id.into(),
41 actor_system,
42 network_out,
43 ping_count: 0,
44 spawn_count: 0,
45 }
46 }
47}
48
49impl TypedMsgHandler<AnActorMsg> for Actor1 {
50 fn receive(&mut self, msg: AnActorMsg) -> Option<ActorControl> {
51 if (msg as Box<dyn Any>).downcast::<Ping>().is_ok() {
52 } else {
53 panic!("Not a Ping")
54 }
55 let ret = match self.ping_count {
56 0 => {
57 log::info!("Actor1 received first ping, sending ping to Actor2 over the network");
58 let mut out = self.network_out.clone();
59 let _ = out.attempt_send(
60 Box::new(Ping {}),
61 Box::new(|_msg| Ok(Vec::new())),
62 "localhost:5002".into(),
63 );
64 log::info!("Actor1 sent ping to Actor2 over the network");
65 None
66 }
67 1 => {
68 log::info!("Actor1 received second ping, self-pinging after async work");
69 self.self_ref.spawn_async_send(
70 Box::pin(async {
71 log::info!("Actor1 doing async work");
72 Box::new(Ping()) as AnActorMsg
73 }),
74 None,
75 );
76 None
77 }
78 _ => {
79 log::info!("Actor1 received third ping");
80 if self.spawn_count < 1 {
81 log::info!("Actor1 spawning");
82 let mut new_ref = self.actor_system.create(
83 self.node_id.clone(),
84 self.spawn_count.to_string(),
85 false,
86 );
87 log::info!("Actor1 setting handler");
88 new_ref.set_handler(Box::new(Actor1 {
89 self_ref: self.self_ref.clone(),
90 node_id: self.node_id.clone(),
91 actor_system: self.actor_system.clone(),
92 network_out: self.network_out.clone(),
93 ping_count: 3,
94 spawn_count: self.spawn_count + 1,
95 }));
96 log::info!("Actor1 sending to spawned actor");
97 new_ref.send(Box::new(Ping()), None);
98 log::info!("Actor1 done");
99 }
100 log::info!("Actor1 exiting");
101 Some(ActorControl::Exit())
102 }
103 };
104 self.ping_count += 1;
105 ret
106 }
107}
108
109#[derive(Debug)]
110struct Actor2<P2PNetworkClientT>
111where
112 P2PNetworkClientT: P2PNetworkClient,
113{
114 network_out: P2PNetworkClientT,
115}
116
117impl<P2PNetworkClientT> Actor2<P2PNetworkClientT>
118where
119 P2PNetworkClientT: P2PNetworkClient,
120{
121 fn new(network_out: P2PNetworkClientT) -> Self {
122 Actor2 { network_out }
123 }
124}
125
126impl<P2PNetworkClientT> TypedMsgHandler<AnActorMsg> for Actor2<P2PNetworkClientT>
127where
128 P2PNetworkClientT: P2PNetworkClient + Clone + Send + std::fmt::Debug + 'static,
129{
130 fn receive(&mut self, msg: AnActorMsg) -> Option<ActorControl> {
131 if (msg.clone() as Box<dyn Any>).downcast::<Ping>().is_ok() {
132 } else {
133 panic!("Not a Ping")
134 }
135 log::info!("Actor2 received ping over the network, replying with a ping over the network");
136 let mut out = self.network_out.clone();
137 let _ = out.attempt_send(msg, |_msg| Ok(Vec::new()), "localhost:5001");
138 log::info!("Actor2 sent ping reply, exiting");
139 Some(ActorControl::Exit())
140 }
141}
142
143#[derive(Debug)]
144struct NodeP2PNetworkInputHandler<ActorRefT>
145where
146 ActorRefT: ActorRef<AnActorMsg>,
147{
148 actor_ref: ActorRefT,
149}
150
151impl<ActorRefT> UntypedMsgHandler for NodeP2PNetworkInputHandler<ActorRefT>
152where
153 ActorRefT: ActorRef<AnActorMsg>,
154{
155 fn receive_untyped(
156 &mut self,
157 message: AnActorMsg,
158 ) -> Result<Option<ActorControl>, MessageNotSupported> {
159 self.actor_ref.send(message, None);
160 Result::Ok(None)
161 }
162}
163
164#[tokio::main]
167async fn main() {
168 utils::setup_logging(false);
169 let async_runtime = AsyncRuntime::new("main", None);
170 let network1 = TokioP2PNetworkClient::new("network1", vec!["localhost:5002"], None);
171 let network2 = TokioP2PNetworkClient::new("network2", vec!["localhost:5001"], None);
172 let mut tokio_actor_system = TokioActorSystemHandle::new_actor_system("tokio-as", None, false);
173 let mut thread_actor_system =
174 ThreadActorSystemHandle::new_actor_system("thread-as", None, false);
175 let mut actor1_ref: bftgrid_mt::tokio::TokioActorRef<AnActorMsg> =
176 tokio_actor_system.create("node1", "actor1", false);
177 let actor1_ref_copy = actor1_ref.clone();
178 actor1_ref.set_handler(Box::new(Actor1::new(
179 Box::new(actor1_ref_copy),
180 "node1",
181 Box::new(tokio_actor_system.clone()),
182 Box::new(network1.clone()),
183 )));
184 let mut actor2_ref: bftgrid_mt::thread::ThreadActorRef<AnActorMsg> =
185 thread_actor_system.create("node2", "actor2", false);
186 actor2_ref.set_handler(Box::new(Actor2::new(network2.clone())));
187 let node1 = TokioP2PNetworkServer::new(
188 "node1",
189 async_runtime.block_on_async(async {
190 UdpSocket::bind("localhost:5001")
191 .await
192 .expect("Cannot bind")
193 }),
194 None,
195 );
196 let node1_p2p_network_input_handler = NodeP2PNetworkInputHandler {
197 actor_ref: actor1_ref.clone(),
198 };
199 drop(
200 node1
201 .start(node1_p2p_network_input_handler, |_buf| Ok(Ping {}), 0)
202 .unwrap(),
203 );
204 log::info!("Started node1");
205 let node2 = TokioP2PNetworkServer::new(
206 "node2",
207 async_runtime.block_on_async(async {
208 UdpSocket::bind("localhost:5002")
209 .await
210 .expect("Cannot bind")
211 }),
212 None,
213 );
214 let node2_p2p_network_input_handler = NodeP2PNetworkInputHandler {
215 actor_ref: actor2_ref.clone(),
216 };
217 drop(
218 node2
219 .start(node2_p2p_network_input_handler, |_buf| Ok(Ping {}), 0)
220 .unwrap(),
221 );
222 log::info!("Started node2");
223 actor1_ref.send(Box::new(Ping()), None);
224 log::info!("Sent startup ping to actor1; joining actors, actor systems and exiting");
225 actor2_ref.join();
226 log::info!("Joined Actor2");
227 actor1_ref.join();
228 log::info!("Joined Actor1");
229 tokio_actor_system.join();
230 log::info!("Joined Tokio actor system");
231 thread_actor_system.join();
232 log::info!("Joined Thread actor system");
233}
234
235#[cfg(test)]
236mod tests {
237 use std::{
238 collections::HashMap,
239 ops::Add,
240 time::{Duration, Instant},
241 };
242
243 use crate::utils;
244 use bftgrid_core::actor::ActorSystemHandle;
245 use bftgrid_sim::{NodeDescriptor, Simulation};
246
247 use crate::{Actor1, Actor2, ActorRef, Ping};
248
249 #[test]
250 fn simulation() {
251 utils::setup_logging(true);
252 let mut topology = HashMap::new();
253 topology.insert(
254 "localhost:5001".into(),
255 NodeDescriptor::new(None::<&str>, Some("actor1")),
256 );
257 topology.insert(
258 "localhost:5002".into(),
259 NodeDescriptor::new(None::<&str>, Some("actor2")),
260 );
261 let start = Instant::now();
262 let simulation = Simulation::new(topology, start, start.add(Duration::from_secs(100)));
263 let mut actor1_ref = simulation.create("localhost:5001", "actor1", false);
264 let actor1_ref_copy = actor1_ref.clone();
265 actor1_ref.set_handler(Box::new(Actor1::new(
266 Box::new(actor1_ref_copy),
267 "localhost:5001",
268 Box::new(simulation.clone()),
269 Box::new(simulation.clone()),
270 )));
271 let mut actor2_ref = simulation.create("localhost:5002", "actor2", false);
272 actor2_ref.set_handler(Box::new(Actor2::new(simulation.clone())));
273 actor1_ref.send(Box::new(Ping()), None);
274 let history = simulation.run();
275 log::info!("{:?}", history);
276 }
277}