diff --git a/src/daemon.rs b/src/daemon.rs index 661bc5f..a3d8c80 100644 --- a/src/daemon.rs +++ b/src/daemon.rs @@ -294,7 +294,10 @@ impl Daemon { Ok(blockchain_info.chain) } - pub(crate) fn test_mempool_accept(&self, tx: &Transaction) -> Result { + pub(crate) fn test_mempool_accept( + &self, + tx: &Transaction, + ) -> Result { let res = self.rpc.test_mempool_accept(&vec![tx])?; Ok(res.get(0).unwrap().clone()) diff --git a/src/main.rs b/src/main.rs index 6d724b5..5b6cdea 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,7 +4,8 @@ use std::{ fmt::Debug, net::SocketAddr, str::FromStr, - sync::{Arc, Mutex, MutexGuard}, + sync::{Arc, Mutex, MutexGuard, OnceLock}, + time::{Duration, Instant}, }; use bitcoincore_rpc::json::{self as bitcoin_json}; @@ -29,7 +30,7 @@ use sp_client::silentpayments::sending::{generate_recipient_pubkeys, SilentPayme use sp_client::silentpayments::utils::receiving::{calculate_tweak_data, get_pubkey_from_input}; use sp_client::silentpayments::utils::sending::calculate_partial_secret; use sp_client::spclient::{derive_keys_from_seed, Recipient, SpClient, SpendKey}; -use tokio::net::{TcpListener, TcpStream}; +use tokio::{net::{TcpListener, TcpStream}, time}; use tokio::sync::mpsc::{unbounded_channel, UnboundedSender}; use tokio_stream::wrappers::UnboundedReceiverStream; use tokio_tungstenite::tungstenite::Message; @@ -49,6 +50,66 @@ type PeerMap = Arc>>; type SharedDaemon = Arc>; +static MESSAGECACHE: OnceLock = OnceLock::new(); + +const MESSAGECACHEDURATION: Duration = Duration::from_secs(10); +const MESSAGECACHEINTERVAL: Duration = Duration::from_secs(2); + +#[derive(Debug)] +struct MessageCache { + store: Mutex>, +} + +impl MessageCache { + fn new() -> Self { + Self { + store: Mutex::new(HashMap::new()), + } + } + + fn insert(&self, key: String) { + let mut store = self.store.lock().unwrap(); + store.insert(key.clone(), Instant::now()); + } + + fn contains(&self, key: &str) -> bool { + let store = self.store.lock().unwrap(); + store.contains_key(key) + } +} + +async fn clean_up() { + let cache = MESSAGECACHE.get().unwrap(); + + let mut interval = time::interval(MESSAGECACHEINTERVAL); + + loop { + interval.tick().await; + + let mut store = cache.store.lock().unwrap(); + + let now = Instant::now(); + let to_rm: Vec = store + .iter() + .filter_map(|(entry, entrytime)| { + if let Some(duration) = now.checked_duration_since(*entrytime) { + if duration > MESSAGECACHEDURATION { + Some(entry.clone()) + } else { + None + } + } else { + None + } + }) + .collect(); + + for key in to_rm { + store.remove(&key); + } + } +} + const FAUCET_AMT: Amount = Amount::from_sat(100_000); pub(crate) trait MutexExt { @@ -433,12 +494,20 @@ async fn handle_connection( let peers = peers.clone(); if let Ok(raw_msg) = msg.to_text() { debug!("Received msg: {}", raw_msg); + let cache = MESSAGECACHE.get().expect("Cache should be initialized"); + if cache.contains(raw_msg) { + debug!("Message already processed, dropping"); + return future::ok(()); + } else { + cache.insert(raw_msg.to_owned()); + } let parsed = serde_json::from_str::(raw_msg); match parsed { Ok(ank_msg) => match ank_msg.flag { AnkFlag::Faucet => { debug!("Received a faucet message"); - if let Ok(mut content) = serde_json::from_str::(&ank_msg.content) + if let Ok(mut content) = + serde_json::from_str::(&ank_msg.content) { match handle_faucet_request( &content, @@ -454,7 +523,8 @@ async fn handle_connection( Err(e) => { log::error!("Failed to send faucet tx: {}", e); content.error = Some(e.into()); - let payload = serde_json::to_string(&content).expect("Message type shouldn't fail"); + let payload = serde_json::to_string(&content) + .expect("Message type shouldn't fail"); if let Err(e) = broadcast_message( peers.clone(), AnkFlag::Faucet, @@ -471,7 +541,9 @@ async fn handle_connection( } AnkFlag::NewTx => { debug!("Received a new tx message"); - if let Ok(mut new_tx_msg) = serde_json::from_str::(&ank_msg.content) { + if let Ok(mut new_tx_msg) = + serde_json::from_str::(&ank_msg.content) + { match handle_new_tx_request(&mut new_tx_msg, shared_daemon.clone()) { Ok(new_tx_msg) => { // Repeat the msg to all except sender @@ -491,7 +563,8 @@ async fn handle_connection( if let Err(e) = broadcast_message( peers.clone(), AnkFlag::NewTx, - serde_json::to_string(&new_tx_msg).expect("This shouldn't fail"), + serde_json::to_string(&new_tx_msg) + .expect("This shouldn't fail"), BroadcastType::Sender(addr), ) { log::error!("Failed to broadcast message: {}", e); @@ -672,6 +745,10 @@ async fn main() -> Result<()> { .expect("Please provide either \"true\" or \"false\""); let core_wallet: Option = env::args().nth(4); + MESSAGECACHE.set(MessageCache::new()).expect("Message Cache initialization failed"); + + tokio::spawn(clean_up()); + let peers = PeerMap::new(Mutex::new(HashMap::new())); // Connect the rpc daemon