bftgrid_network_example/
network.rs

1mod utils;
2
3use std::any::Any;
4
5use bftgrid_core::actor::{
6    ActorControl, ActorMsg, ActorRef, ActorSystemHandle, AnActorMsg, Joinable, MessageNotSupported,
7    P2PNetworkClient, TypedMsgHandler, UntypedMsgHandler,
8    erased::{DynActorRef, DynActorSystemHandle, DynP2PNetworkClient},
9};
10
11use bftgrid_mt::{
12    AsyncRuntime,
13    thread::ThreadActorSystemHandle,
14    tokio::{TokioActorSystemHandle, TokioP2PNetworkClient, TokioP2PNetworkServer},
15};
16use tokio::net::UdpSocket;
17
18#[derive(Clone, Debug)]
19struct Ping();
20impl ActorMsg for Ping {}
21
22struct Actor1 {
23    self_ref: DynActorRef<AnActorMsg>,
24    node_id: String,
25    actor_system: DynActorSystemHandle,
26    network_out: DynP2PNetworkClient,
27    ping_count: u8,
28    spawn_count: u8,
29}
30
31impl Actor1 {
32    fn new(
33        self_ref: DynActorRef<AnActorMsg>,
34        node_id: impl Into<String>,
35        actor_system: DynActorSystemHandle,
36        network_out: DynP2PNetworkClient,
37    ) -> Actor1 {
38        Actor1 {
39            self_ref,
40            node_id: node_id.into(),
41            actor_system,
42            network_out,
43            ping_count: 0,
44            spawn_count: 0,
45        }
46    }
47}
48
49impl TypedMsgHandler<AnActorMsg> for Actor1 {
50    fn receive(&mut self, msg: AnActorMsg) -> Option<ActorControl> {
51        if (msg as Box<dyn Any>).downcast::<Ping>().is_ok() {
52        } else {
53            panic!("Not a Ping")
54        }
55        let ret = match self.ping_count {
56            0 => {
57                log::info!("Actor1 received first ping, sending ping to Actor2 over the network");
58                let mut out = self.network_out.clone();
59                let _ = out.attempt_send(
60                    Box::new(Ping {}),
61                    Box::new(|_msg| Ok(Vec::new())),
62                    "localhost:5002".into(),
63                );
64                log::info!("Actor1 sent ping to Actor2 over the network");
65                None
66            }
67            1 => {
68                log::info!("Actor1 received second ping, self-pinging after async work");
69                self.self_ref.spawn_async_send(
70                    Box::pin(async {
71                        log::info!("Actor1 doing async work");
72                        Box::new(Ping()) as AnActorMsg
73                    }),
74                    None,
75                );
76                None
77            }
78            _ => {
79                log::info!("Actor1 received third ping");
80                if self.spawn_count < 1 {
81                    log::info!("Actor1 spawning");
82                    let mut new_ref = self.actor_system.create(
83                        self.node_id.clone(),
84                        self.spawn_count.to_string(),
85                        false,
86                    );
87                    log::info!("Actor1 setting handler");
88                    new_ref.set_handler(Box::new(Actor1 {
89                        self_ref: self.self_ref.clone(),
90                        node_id: self.node_id.clone(),
91                        actor_system: self.actor_system.clone(),
92                        network_out: self.network_out.clone(),
93                        ping_count: 3,
94                        spawn_count: self.spawn_count + 1,
95                    }));
96                    log::info!("Actor1 sending to spawned actor");
97                    new_ref.send(Box::new(Ping()), None);
98                    log::info!("Actor1 done");
99                }
100                log::info!("Actor1 exiting");
101                Some(ActorControl::Exit())
102            }
103        };
104        self.ping_count += 1;
105        ret
106    }
107}
108
109#[derive(Debug)]
110struct Actor2<P2PNetworkClientT>
111where
112    P2PNetworkClientT: P2PNetworkClient,
113{
114    network_out: P2PNetworkClientT,
115}
116
117impl<P2PNetworkClientT> Actor2<P2PNetworkClientT>
118where
119    P2PNetworkClientT: P2PNetworkClient,
120{
121    fn new(network_out: P2PNetworkClientT) -> Self {
122        Actor2 { network_out }
123    }
124}
125
126impl<P2PNetworkClientT> TypedMsgHandler<AnActorMsg> for Actor2<P2PNetworkClientT>
127where
128    P2PNetworkClientT: P2PNetworkClient + Clone + Send + std::fmt::Debug + 'static,
129{
130    fn receive(&mut self, msg: AnActorMsg) -> Option<ActorControl> {
131        if (msg.clone() as Box<dyn Any>).downcast::<Ping>().is_ok() {
132        } else {
133            panic!("Not a Ping")
134        }
135        log::info!("Actor2 received ping over the network, replying with a ping over the network");
136        let mut out = self.network_out.clone();
137        let _ = out.attempt_send(msg, |_msg| Ok(Vec::new()), "localhost:5001");
138        log::info!("Actor2 sent ping reply, exiting");
139        Some(ActorControl::Exit())
140    }
141}
142
143#[derive(Debug)]
144struct NodeP2PNetworkInputHandler<ActorRefT>
145where
146    ActorRefT: ActorRef<AnActorMsg>,
147{
148    actor_ref: ActorRefT,
149}
150
151impl<ActorRefT> UntypedMsgHandler for NodeP2PNetworkInputHandler<ActorRefT>
152where
153    ActorRefT: ActorRef<AnActorMsg>,
154{
155    fn receive_untyped(
156        &mut self,
157        message: AnActorMsg,
158    ) -> Result<Option<ActorControl>, MessageNotSupported> {
159        self.actor_ref.send(message, None);
160        Result::Ok(None)
161    }
162}
163
164// Components that need a Tokio runtime will reuse the one from the async context, if any,
165//  otherwise they will create a new one.
166#[tokio::main]
167async fn main() {
168    utils::setup_logging(false);
169    let async_runtime = AsyncRuntime::new("main", None);
170    let network1 = TokioP2PNetworkClient::new("network1", vec!["localhost:5002"], None);
171    let network2 = TokioP2PNetworkClient::new("network2", vec!["localhost:5001"], None);
172    let mut tokio_actor_system = TokioActorSystemHandle::new_actor_system("tokio-as", None, false);
173    let mut thread_actor_system =
174        ThreadActorSystemHandle::new_actor_system("thread-as", None, false);
175    let mut actor1_ref: bftgrid_mt::tokio::TokioActorRef<AnActorMsg> =
176        tokio_actor_system.create("node1", "actor1", false);
177    let actor1_ref_copy = actor1_ref.clone();
178    actor1_ref.set_handler(Box::new(Actor1::new(
179        Box::new(actor1_ref_copy),
180        "node1",
181        Box::new(tokio_actor_system.clone()),
182        Box::new(network1.clone()),
183    )));
184    let mut actor2_ref: bftgrid_mt::thread::ThreadActorRef<AnActorMsg> =
185        thread_actor_system.create("node2", "actor2", false);
186    actor2_ref.set_handler(Box::new(Actor2::new(network2.clone())));
187    let node1 = TokioP2PNetworkServer::new(
188        "node1",
189        async_runtime.block_on_async(async {
190            UdpSocket::bind("localhost:5001")
191                .await
192                .expect("Cannot bind")
193        }),
194        None,
195    );
196    let node1_p2p_network_input_handler = NodeP2PNetworkInputHandler {
197        actor_ref: actor1_ref.clone(),
198    };
199    drop(
200        node1
201            .start(node1_p2p_network_input_handler, |_buf| Ok(Ping {}), 0)
202            .unwrap(),
203    );
204    log::info!("Started node1");
205    let node2 = TokioP2PNetworkServer::new(
206        "node2",
207        async_runtime.block_on_async(async {
208            UdpSocket::bind("localhost:5002")
209                .await
210                .expect("Cannot bind")
211        }),
212        None,
213    );
214    let node2_p2p_network_input_handler = NodeP2PNetworkInputHandler {
215        actor_ref: actor2_ref.clone(),
216    };
217    drop(
218        node2
219            .start(node2_p2p_network_input_handler, |_buf| Ok(Ping {}), 0)
220            .unwrap(),
221    );
222    log::info!("Started node2");
223    actor1_ref.send(Box::new(Ping()), None);
224    log::info!("Sent startup ping to actor1; joining actors, actor systems and exiting");
225    actor2_ref.join();
226    log::info!("Joined Actor2");
227    actor1_ref.join();
228    log::info!("Joined Actor1");
229    tokio_actor_system.join();
230    log::info!("Joined Tokio actor system");
231    thread_actor_system.join();
232    log::info!("Joined Thread actor system");
233}
234
235#[cfg(test)]
236mod tests {
237    use std::{
238        collections::HashMap,
239        ops::Add,
240        time::{Duration, Instant},
241    };
242
243    use crate::utils;
244    use bftgrid_core::actor::ActorSystemHandle;
245    use bftgrid_sim::{NodeDescriptor, Simulation};
246
247    use crate::{Actor1, Actor2, ActorRef, Ping};
248
249    #[test]
250    fn simulation() {
251        utils::setup_logging(true);
252        let mut topology = HashMap::new();
253        topology.insert(
254            "localhost:5001".into(),
255            NodeDescriptor::new(None::<&str>, Some("actor1")),
256        );
257        topology.insert(
258            "localhost:5002".into(),
259            NodeDescriptor::new(None::<&str>, Some("actor2")),
260        );
261        let start = Instant::now();
262        let simulation = Simulation::new(topology, start, start.add(Duration::from_secs(100)));
263        let mut actor1_ref = simulation.create("localhost:5001", "actor1", false);
264        let actor1_ref_copy = actor1_ref.clone();
265        actor1_ref.set_handler(Box::new(Actor1::new(
266            Box::new(actor1_ref_copy),
267            "localhost:5001",
268            Box::new(simulation.clone()),
269            Box::new(simulation.clone()),
270        )));
271        let mut actor2_ref = simulation.create("localhost:5002", "actor2", false);
272        actor2_ref.set_handler(Box::new(Actor2::new(simulation.clone())));
273        actor1_ref.send(Box::new(Ping()), None);
274        let history = simulation.run();
275        log::info!("{:?}", history);
276    }
277}