use anyhow::{Error, Result}; use std::{ collections::HashMap, net::SocketAddr, sync::{Mutex, OnceLock}, time::{Duration, Instant}, }; use tokio::time; use tokio_tungstenite::tungstenite::Message; use sdk_common::network::{AnkFlag, Envelope, FaucetMessage, NewTxMessage}; use crate::{faucet::handle_faucet_request, handle_new_tx_request, PEERMAP}; pub(crate) static MESSAGECACHE: OnceLock = OnceLock::new(); const MESSAGECACHEDURATION: Duration = Duration::from_secs(10); const MESSAGECACHEINTERVAL: Duration = Duration::from_secs(2); #[derive(Debug)] pub(crate) struct MessageCache { store: Mutex>, } impl MessageCache { pub fn new() -> Self { Self { store: Mutex::new(HashMap::new()), } } fn insert(&self, key: String) { let mut store = self.store.lock().unwrap(); store.insert(key.clone(), Instant::now()); } fn contains(&self, key: &str) -> bool { let store = self.store.lock().unwrap(); store.contains_key(key) } pub async fn clean_up() { let cache = MESSAGECACHE.get().unwrap(); let mut interval = time::interval(MESSAGECACHEINTERVAL); loop { interval.tick().await; let mut store = cache.store.lock().unwrap(); let now = Instant::now(); let to_rm: Vec = store .iter() .filter_map(|(entry, entrytime)| { if let Some(duration) = now.checked_duration_since(*entrytime) { if duration > MESSAGECACHEDURATION { Some(entry.clone()) } else { None } } else { None } }) .collect(); for key in to_rm { store.remove(&key); } } } } pub(crate) enum BroadcastType { Sender(SocketAddr), #[allow(dead_code)] ExcludeSender(SocketAddr), #[allow(dead_code)] ToAll, } pub(crate) fn broadcast_message( flag: AnkFlag, payload: String, broadcast: BroadcastType, ) -> Result<()> { let peers = PEERMAP.get().ok_or(Error::msg("Unitialized peer map"))?; let ank_msg = Envelope { flag, content: payload, }; let msg = Message::Text(serde_json::to_string(&ank_msg)?); log::debug!("Broadcasting message: {}", msg); match broadcast { BroadcastType::Sender(addr) => { peers .lock() .map_err(|e| Error::msg(format!("Failed to lock peers: {}", e.to_string())))? .iter() .find(|(peer_addr, _)| peer_addr == &&addr) .ok_or(Error::msg("Failed to find the sender in the peer_map"))? .1 .send(msg)?; } BroadcastType::ExcludeSender(addr) => { peers .lock() .map_err(|e| Error::msg(format!("Failed to lock peers: {}", e.to_string())))? .iter() .filter(|(peer_addr, _)| peer_addr != &&addr) .for_each(|(_, peer_tx)| { let _ = peer_tx.send(msg.clone()); }); } BroadcastType::ToAll => { peers .lock() .map_err(|e| Error::msg(format!("Failed to lock peers: {}", e.to_string())))? .iter() .for_each(|(_, peer_tx)| { let _ = peer_tx.send(msg.clone()); }); } } Ok(()) } fn process_faucet_message(ank_msg: Envelope, addr: SocketAddr) { log::debug!("Received a faucet message"); if let Ok(mut content) = serde_json::from_str::(&ank_msg.content) { match handle_faucet_request(&content) { Ok(new_tx_msg) => { log::debug!( "Obtained new_tx_msg: {}", serde_json::to_string(&new_tx_msg).unwrap() ); } Err(e) => { log::error!("Failed to send faucet tx: {}", e); content.error = Some(e.into()); let payload = serde_json::to_string(&content).expect("Message type shouldn't fail"); if let Err(e) = broadcast_message(AnkFlag::Faucet, payload, BroadcastType::Sender(addr)) { log::error!("Failed to broadcast message: {}", e); } } } } else { log::error!("Invalid content for faucet message"); } } fn process_new_tx_message(ank_msg: Envelope, addr: SocketAddr) { log::debug!("Received a new tx message"); if let Ok(mut new_tx_msg) = serde_json::from_str::(&ank_msg.content) { match handle_new_tx_request(&mut new_tx_msg) { Ok(new_tx_msg) => { // Repeat the msg to all except sender if let Err(e) = broadcast_message( AnkFlag::NewTx, serde_json::to_string(&new_tx_msg).expect("This should not fail"), BroadcastType::ExcludeSender(addr), ) { log::error!("Failed to send message with error: {}", e); } } Err(e) => { log::error!("handle_new_tx_request returned error: {}", e); new_tx_msg.error = Some(e.into()); if let Err(e) = broadcast_message( AnkFlag::NewTx, serde_json::to_string(&new_tx_msg).expect("This shouldn't fail"), BroadcastType::Sender(addr), ) { log::error!("Failed to broadcast message: {}", e); } } } } else { log::error!("Invalid content for new_tx message"); } } fn process_cipher_message(ank_msg: Envelope, addr: SocketAddr) { // For now we just send it to everyone log::debug!("Received a cipher message"); if let Err(e) = broadcast_message( AnkFlag::Cipher, ank_msg.content, BroadcastType::ExcludeSender(addr), ) { log::error!("Failed to send message with error: {}", e); } } fn process_unknown_message(ank_msg: AnkNetworkMsg, addr: SocketAddr) { log::debug!("Received an unknown message"); if let Err(e) = broadcast_message( AnkFlag::Unknown, ank_msg.content, BroadcastType::ExcludeSender(addr), ) { log::error!("Failed to send message with error: {}", e); } } pub fn process_message(raw_msg: &str, addr: SocketAddr) { log::debug!("Received msg: {}", raw_msg); let cache = MESSAGECACHE.get().expect("Cache should be initialized"); if cache.contains(raw_msg) { log::debug!("Message already processed, dropping"); return; } else { cache.insert(raw_msg.to_owned()); } match serde_json::from_str::(raw_msg) { Ok(ank_msg) => match ank_msg.flag { AnkFlag::Faucet => process_faucet_message(ank_msg, addr), AnkFlag::NewTx => process_new_tx_message(ank_msg, addr), AnkFlag::Cipher => process_cipher_message(ank_msg, addr), AnkFlag::Unknown => process_unknown_message(ank_msg, addr), }, Err(_) => log::error!("Failed to parse network message"), } }