From 31c17e908dc374f0df1d17d7389759573d826e65 Mon Sep 17 00:00:00 2001 From: Sosthene Date: Tue, 25 Jun 2024 11:21:14 +0200 Subject: [PATCH] Move shared resources to static variables --- src/electrumclient.rs | 4 +- src/faucet.rs | 67 ++++++++++--------------- src/main.rs | 110 ++++++++++++++++++------------------------ src/message.rs | 77 +++++++---------------------- src/scan.rs | 24 ++++----- 5 files changed, 103 insertions(+), 179 deletions(-) diff --git a/src/electrumclient.rs b/src/electrumclient.rs index c0ff9d3..ddc1777 100644 --- a/src/electrumclient.rs +++ b/src/electrumclient.rs @@ -3,11 +3,11 @@ use log::info; const VALIDATE_DOMAIN: bool = false; // self-signed cert, so we don't validate -pub fn create_electrum_client(electrum_url: String) -> anyhow::Result { +pub fn create_electrum_client(electrum_url: &str) -> anyhow::Result { let config = ConfigBuilder::new() .validate_domain(VALIDATE_DOMAIN) .build(); - let electrum_client = Client::from_config(&electrum_url, config)?; + let electrum_client = Client::from_config(electrum_url, config)?; info!("ssl client {}", electrum_url); Ok(electrum_client) diff --git a/src/faucet.rs b/src/faucet.rs index bba871c..4d39649 100644 --- a/src/faucet.rs +++ b/src/faucet.rs @@ -1,8 +1,4 @@ -use std::{ - collections::HashMap, - str::FromStr, - sync::{Arc, Mutex}, -}; +use std::{collections::HashMap, str::FromStr}; use bitcoincore_rpc::json::{self as bitcoin_json}; use sdk_common::sp_client::bitcoin::secp256k1::{ @@ -32,18 +28,13 @@ use sdk_common::sp_client::spclient::Recipient; use anyhow::{Error, Result}; -use crate::{ - daemon::Daemon, scan::compute_partial_tweak_to_transaction, MutexExt, SharedDaemon, - SilentPaymentWallet, FAUCET_AMT, -}; +use crate::{scan::compute_partial_tweak_to_transaction, MutexExt, DAEMON, FAUCET_AMT, WALLET}; -fn spend_from_core( - dest: XOnlyPublicKey, - daemon: Arc>, -) -> Result<(Transaction, Amount)> { - let core = daemon - .lock() - .map_err(|e| Error::msg(format!("Failed to lock daemon: {}", e.to_string())))?; +fn spend_from_core(dest: XOnlyPublicKey) -> Result<(Transaction, Amount)> { + let core = DAEMON + .get() + .ok_or(Error::msg("DAEMON not initialized"))? + .lock_anyhow()?; let unspent_list: Vec = core.list_unspent_from_to(None)?; @@ -69,15 +60,11 @@ fn spend_from_core( } } -fn faucet_send( - sp_address: SilentPaymentAddress, - commitment: &str, - sp_wallet: Arc, - shared_daemon: SharedDaemon, -) -> Result { +fn faucet_send(sp_address: SilentPaymentAddress, commitment: &str) -> Result { let mut first_tx: Option = None; let final_tx: Transaction; + let sp_wallet = WALLET.get().ok_or(Error::msg("Wallet not initialized"))?; // do we have a sp output available ? let available_outpoints = sp_wallet.get_wallet()?.get_outputs().to_spendable_list(); @@ -103,7 +90,9 @@ fn faucet_send( nb_outputs: 1, }; - let fee_estimate = shared_daemon + let fee_estimate = DAEMON + .get() + .ok_or(Error::msg("DAEMON not initialized"))? .lock_anyhow()? .estimate_fee(6) .unwrap_or(Amount::from_sat(1000)) @@ -130,8 +119,7 @@ fn faucet_send( let keypair = Keypair::new(&secp, &mut thread_rng()); // we first spend from core to the pubkey we just created - let (core_tx, fee_rate) = - spend_from_core(keypair.x_only_public_key().0, shared_daemon.clone())?; + let (core_tx, fee_rate) = spend_from_core(keypair.x_only_public_key().0)?; // check that the first output of the transaction pays to the key we just created debug_assert!( @@ -237,44 +225,41 @@ fn faucet_send( final_tx = faucet_tx; } - if let Ok(daemon) = shared_daemon.lock() { + { + let daemon = DAEMON + .get() + .ok_or(Error::msg("DAEMON not initialized"))? + .lock_anyhow()?; // broadcast one or two transactions if first_tx.is_some() { daemon.broadcast(&first_tx.unwrap())?; } let txid = daemon.broadcast(&final_tx)?; log::debug!("Sent tx {}", txid); - } else { - return Err(Error::msg("Failed to lock daemon")); } Ok(final_tx) } -pub fn handle_faucet_request( - msg: &FaucetMessage, - sp_wallet: Arc, - shared_daemon: SharedDaemon, -) -> Result { +pub fn handle_faucet_request(msg: &FaucetMessage) -> Result { let sp_address = SilentPaymentAddress::try_from(msg.sp_address.as_str())?; log::debug!("Sending bootstrap coins to {}", sp_address); // send bootstrap coins to this sp_address - let tx = faucet_send( - sp_address, - &msg.commitment, - sp_wallet.clone(), - shared_daemon.clone(), - )?; + let tx = faucet_send(sp_address, &msg.commitment)?; // get the tweak - let partial_tweak = compute_partial_tweak_to_transaction(&tx, shared_daemon.clone())?; + let partial_tweak = compute_partial_tweak_to_transaction(&tx)?; // get current blockheight - let blkheight: u32 = shared_daemon + let blkheight: u32 = DAEMON + .get() + .unwrap() .lock_anyhow()? .get_current_height()? .try_into()?; + let sp_wallet = WALLET.get().ok_or(Error::msg("Wallet not initialized"))?; + // update our sp_client with the change output(s) sp_wallet .get_wallet()? diff --git a/src/main.rs b/src/main.rs index 8967b1c..0819a72 100644 --- a/src/main.rs +++ b/src/main.rs @@ -7,7 +7,7 @@ use std::{ net::SocketAddr, path::PathBuf, str::FromStr, - sync::{Arc, Mutex, MutexGuard}, + sync::{Mutex, MutexGuard, OnceLock}, }; use bitcoincore_rpc::json::{self as bitcoin_json}; @@ -50,9 +50,13 @@ use crate::{daemon::Daemon, scan::scan_blocks}; type Tx = UnboundedSender; -type PeerMap = Arc>>; +type PeerMap = Mutex>; -type SharedDaemon = Arc>; +pub(crate) static PEERMAP: OnceLock = OnceLock::new(); + +type SharedDaemon = Mutex; + +pub(crate) static DAEMON: OnceLock = OnceLock::new(); #[derive(Debug)] struct WalletFile { @@ -139,33 +143,35 @@ impl SilentPaymentWallet { } } -fn handle_new_tx_request(new_tx_msg: &mut NewTxMessage, shared_daemon: SharedDaemon) -> Result<()> { +pub(crate) static WALLET: OnceLock = OnceLock::new(); + +fn handle_new_tx_request(new_tx_msg: &mut NewTxMessage) -> Result<()> { let tx = deserialize::(&Vec::from_hex(&new_tx_msg.transaction)?)?; - let mempool_accept = shared_daemon.lock_anyhow()?.test_mempool_accept(&tx)?; + let mempool_accept = DAEMON + .get() + .unwrap() + .lock_anyhow()? + .test_mempool_accept(&tx)?; if !mempool_accept.allowed { return Err(AnkError::NewTxError(mempool_accept.reject_reason.unwrap()))?; } if new_tx_msg.tweak_data.is_none() { // we add the tweak_data - let partial_tweak = compute_partial_tweak_to_transaction(&tx, shared_daemon.clone())?; + let partial_tweak = compute_partial_tweak_to_transaction(&tx)?; new_tx_msg.tweak_data = Some(partial_tweak.to_string()); } // we try to broadcast it - shared_daemon.lock_anyhow()?.broadcast(&tx)?; + DAEMON.get().unwrap().lock_anyhow()?.broadcast(&tx)?; Ok(()) } -async fn handle_connection( - peers: PeerMap, - shared_daemon: SharedDaemon, - sp_wallet: Arc, - raw_stream: TcpStream, - addr: SocketAddr, -) { +async fn handle_connection(raw_stream: TcpStream, addr: SocketAddr) { debug!("Incoming TCP connection from: {}", addr); + let peers = PEERMAP.get().expect("Peer Map not initialized"); + let ws_stream = tokio_tungstenite::accept_async(raw_stream) .await .expect("Error during the websocket handshake occurred"); @@ -186,13 +192,7 @@ async fn handle_connection( let broadcast_incoming = incoming.try_for_each(|msg| { if let Ok(raw_msg) = msg.to_text() { debug!("Received msg: {}", raw_msg); - process_message( - raw_msg, - peers.clone(), - sp_wallet.clone(), - shared_daemon.clone(), - addr, - ); + process_message(raw_msg, addr); } else { debug!("Received non-text message {} from peer {}", msg, addr); } @@ -215,7 +215,7 @@ async fn handle_connection( peers.lock().unwrap().remove(&addr); } -fn create_new_tx_message(transaction: Vec, daemon: Arc>) -> Result { +fn create_new_tx_message(transaction: Vec) -> Result { // debug!("Creating tx message"); let tx: Transaction = deserialize(&transaction)?; @@ -223,14 +223,14 @@ fn create_new_tx_message(transaction: Vec, daemon: Arc>) -> Re return Err(Error::msg("Can't process coinbase transaction")); } - let partial_tweak = compute_partial_tweak_to_transaction(&tx, daemon)?; + let partial_tweak = compute_partial_tweak_to_transaction(&tx)?; Ok(NewTxMessage::new( transaction.to_lower_hex_string(), Some(partial_tweak.to_string()), )) } -async fn handle_zmq(peers: PeerMap, shared_daemon: SharedDaemon, zmq_url: String) { +async fn handle_zmq(zmq_url: String, electrum_url: String) { debug!("Starting listening on Core"); let mut socket = zeromq::SubSocket::new(); socket.connect(&zmq_url).await.unwrap(); @@ -250,7 +250,7 @@ async fn handle_zmq(peers: PeerMap, shared_daemon: SharedDaemon, zmq_url: String { debug!("topic: {}", std::str::from_utf8(&topic).unwrap()); match std::str::from_utf8(&topic) { - Ok("rawtx") => match create_new_tx_message(data.to_vec(), shared_daemon.clone()) { + Ok("rawtx") => match create_new_tx_message(data.to_vec()) { Ok(m) => { debug!("Created message"); serde_json::to_string(&m).expect("This shouldn't fail") @@ -272,9 +272,7 @@ async fn handle_zmq(peers: PeerMap, shared_daemon: SharedDaemon, zmq_url: String }; debug!("Broadcasting message {}", payload); - if let Err(e) = - broadcast_message(peers.clone(), AnkFlag::NewTx, payload, BroadcastType::ToAll) - { + if let Err(e) = broadcast_message(AnkFlag::NewTx, payload, BroadcastType::ToAll) { log::error!("{}", e.to_string()); } } @@ -294,16 +292,22 @@ async fn main() -> Result<()> { .set(MessageCache::new()) .expect("Message Cache initialization failed"); - let peers = PeerMap::new(Mutex::new(HashMap::new())); + PEERMAP + .set(PeerMap::new(HashMap::new())) + .expect("PeerMap initialization failed"); // Connect the rpc daemon - let shared_daemon = Arc::new(Mutex::new(Daemon::connect( - config.core_wallet, - config.core_url, - config.network, - )?)); + DAEMON + .set(Mutex::new(Daemon::connect( + config.core_wallet, + config.core_url, + config.network, + )?)) + .expect("DAEMON initialization failed"); - let current_tip: u32 = shared_daemon + let current_tip: u32 = DAEMON + .get() + .unwrap() .lock_anyhow()? .get_current_height()? .try_into()?; @@ -358,32 +362,20 @@ async fn main() -> Result<()> { let last_scan = sp_wallet.get_outputs().get_last_scan(); - let shared_sp_wallet = Mutex::new(sp_wallet); - let shared_wallet_storage = Mutex::new(wallet_file); - - let sp_wallet = Arc::new(SilentPaymentWallet { - sp_wallet: shared_sp_wallet, - storage: shared_wallet_storage, - }); - - sp_wallet.save()?; + WALLET + .set(SilentPaymentWallet { + sp_wallet: Mutex::new(sp_wallet), + storage: Mutex::new(wallet_file), + }) + .expect("Failed to initialize WALLET"); if last_scan < current_tip { log::info!("Scanning for our outputs"); - scan_blocks( - shared_daemon.clone(), - sp_wallet.clone(), - current_tip - last_scan, - config.electrum_url, - )?; + scan_blocks(current_tip - last_scan, &config.electrum_url)?; } // Subscribe to Bitcoin Core - tokio::spawn(handle_zmq( - peers.clone(), - shared_daemon.clone(), - config.zmq_url, - )); + tokio::spawn(handle_zmq(config.zmq_url, config.electrum_url)); // Create the event loop and TCP listener we'll accept connections on. let try_socket = TcpListener::bind("127.0.0.1:9090").await; @@ -393,13 +385,7 @@ async fn main() -> Result<()> { // Let's spawn the handling of each connection in a separate task. while let Ok((stream, addr)) = listener.accept().await { - tokio::spawn(handle_connection( - peers.clone(), - shared_daemon.clone(), - sp_wallet.clone(), - stream, - addr, - )); + tokio::spawn(handle_connection(stream, addr)); } Ok(()) diff --git a/src/message.rs b/src/message.rs index ec58234..4e7700a 100644 --- a/src/message.rs +++ b/src/message.rs @@ -2,7 +2,7 @@ use anyhow::{Error, Result}; use std::{ collections::HashMap, net::SocketAddr, - sync::{Arc, Mutex, OnceLock}, + sync::{Mutex, OnceLock}, time::{Duration, Instant}, }; use tokio::time; @@ -10,10 +10,7 @@ use tokio_tungstenite::tungstenite::Message; use sdk_common::network::{AnkFlag, AnkNetworkMsg, FaucetMessage, NewTxMessage}; -use crate::{ - daemon::Daemon, faucet::handle_faucet_request, handle_new_tx_request, PeerMap, - SilentPaymentWallet, -}; +use crate::{faucet::handle_faucet_request, handle_new_tx_request, PEERMAP}; pub(crate) static MESSAGECACHE: OnceLock = OnceLock::new(); @@ -84,11 +81,11 @@ pub(crate) enum BroadcastType { } pub(crate) fn broadcast_message( - peers: PeerMap, flag: AnkFlag, payload: String, broadcast: BroadcastType, ) -> Result<()> { + let peers = PEERMAP.get().ok_or(Error::msg("Unitialized peer map"))?; let ank_msg = AnkNetworkMsg { flag, content: payload, @@ -129,16 +126,10 @@ pub(crate) fn broadcast_message( Ok(()) } -fn process_faucet_message( - ank_msg: AnkNetworkMsg, - peers: PeerMap, - sp_wallet: Arc, - shared_daemon: Arc>, - addr: SocketAddr, -) { +fn process_faucet_message(ank_msg: AnkNetworkMsg, addr: SocketAddr) { log::debug!("Received a faucet message"); if let Ok(mut content) = serde_json::from_str::(&ank_msg.content) { - match handle_faucet_request(&content, sp_wallet.clone(), shared_daemon.clone()) { + match handle_faucet_request(&content) { Ok(new_tx_msg) => { log::debug!( "Obtained new_tx_msg: {}", @@ -149,12 +140,9 @@ fn process_faucet_message( log::error!("Failed to send faucet tx: {}", e); content.error = Some(e.into()); let payload = serde_json::to_string(&content).expect("Message type shouldn't fail"); - if let Err(e) = broadcast_message( - peers.clone(), - AnkFlag::Faucet, - payload, - BroadcastType::Sender(addr), - ) { + if let Err(e) = + broadcast_message(AnkFlag::Faucet, payload, BroadcastType::Sender(addr)) + { log::error!("Failed to broadcast message: {}", e); } } @@ -164,19 +152,13 @@ fn process_faucet_message( } } -fn process_new_tx_message( - ank_msg: AnkNetworkMsg, - peers: PeerMap, - shared_daemon: Arc>, - addr: SocketAddr, -) { +fn process_new_tx_message(ank_msg: AnkNetworkMsg, addr: SocketAddr) { log::debug!("Received a new tx message"); if let Ok(mut new_tx_msg) = serde_json::from_str::(&ank_msg.content) { - match handle_new_tx_request(&mut new_tx_msg, shared_daemon.clone()) { + match handle_new_tx_request(&mut new_tx_msg) { Ok(new_tx_msg) => { // Repeat the msg to all except sender if let Err(e) = broadcast_message( - peers.clone(), AnkFlag::NewTx, serde_json::to_string(&new_tx_msg).expect("This should not fail"), BroadcastType::ExcludeSender(addr), @@ -188,7 +170,6 @@ fn process_new_tx_message( log::error!("handle_new_tx_request returned error: {}", e); new_tx_msg.error = Some(e.into()); if let Err(e) = broadcast_message( - peers.clone(), AnkFlag::NewTx, serde_json::to_string(&new_tx_msg).expect("This shouldn't fail"), BroadcastType::Sender(addr), @@ -202,16 +183,11 @@ fn process_new_tx_message( } } -fn process_cipher_message( - ank_msg: AnkNetworkMsg, - peers: PeerMap, - addr: SocketAddr, -) { +fn process_cipher_message(ank_msg: AnkNetworkMsg, addr: SocketAddr) { // For now we just send it to everyone log::debug!("Received a cipher message"); if let Err(e) = broadcast_message( - peers.clone(), AnkFlag::Cipher, ank_msg.content, BroadcastType::ExcludeSender(addr), @@ -220,14 +196,9 @@ fn process_cipher_message( } } -fn process_unknown_message( - ank_msg: AnkNetworkMsg, - peers: PeerMap, - addr: SocketAddr, -) { +fn process_unknown_message(ank_msg: AnkNetworkMsg, addr: SocketAddr) { log::debug!("Received an unknown message"); if let Err(e) = broadcast_message( - peers.clone(), AnkFlag::Unknown, ank_msg.content, BroadcastType::ExcludeSender(addr), @@ -236,13 +207,7 @@ fn process_unknown_message( } } -pub fn process_message( - raw_msg: &str, - peers: PeerMap, - sp_wallet: Arc, - shared_daemon: Arc>, - addr: SocketAddr, -) { +pub fn process_message(raw_msg: &str, addr: SocketAddr) { log::debug!("Received msg: {}", raw_msg); let cache = MESSAGECACHE.get().expect("Cache should be initialized"); if cache.contains(raw_msg) { @@ -253,18 +218,10 @@ pub fn process_message( } match serde_json::from_str::(raw_msg) { Ok(ank_msg) => match ank_msg.flag { - AnkFlag::Faucet => { - process_faucet_message(ank_msg, peers, sp_wallet, shared_daemon, addr) - } - AnkFlag::NewTx => { - process_new_tx_message(ank_msg, peers, shared_daemon, addr) - } - AnkFlag::Cipher => { - process_cipher_message(ank_msg, peers, addr) - } - AnkFlag::Unknown => { - process_unknown_message(ank_msg, peers, addr) - } + AnkFlag::Faucet => process_faucet_message(ank_msg, addr), + AnkFlag::NewTx => process_new_tx_message(ank_msg, addr), + AnkFlag::Cipher => process_cipher_message(ank_msg, addr), + AnkFlag::Unknown => process_unknown_message(ank_msg, addr), }, Err(_) => log::error!("Failed to parse network message"), } diff --git a/src/scan.rs b/src/scan.rs index c6a1b8f..2efff56 100644 --- a/src/scan.rs +++ b/src/scan.rs @@ -1,6 +1,5 @@ use std::collections::HashMap; use std::str::FromStr; -use std::sync::{Arc, Mutex}; use anyhow::{Error, Result}; use electrum_client::ElectrumApi; @@ -16,13 +15,10 @@ use sdk_common::sp_client::silentpayments::utils::receiving::{ use sdk_common::sp_client::spclient::{OutputSpendStatus, OwnedOutput}; use tokio::time::Instant; -use crate::daemon::Daemon; -use crate::{electrumclient, MutexExt, SharedDaemon, SilentPaymentWallet}; +use crate::{electrumclient, MutexExt, DAEMON, WALLET}; -pub fn compute_partial_tweak_to_transaction( - tx: &Transaction, - daemon: Arc>, -) -> Result { +pub fn compute_partial_tweak_to_transaction(tx: &Transaction) -> Result { + let daemon = DAEMON.get().ok_or(Error::msg("DAEMON not initialized"))?; let mut outpoints: Vec<(String, u32)> = Vec::with_capacity(tx.input.len()); let mut pubkeys: Vec = Vec::with_capacity(tx.input.len()); for input in tx.input.iter() { @@ -216,16 +212,16 @@ fn scan_block_inputs( Ok(found) } -pub fn scan_blocks( - shared_daemon: SharedDaemon, - sp_wallet: Arc, - mut n_blocks_to_scan: u32, - electrum_url: String, -) -> anyhow::Result<()> { +pub fn scan_blocks(mut n_blocks_to_scan: u32, electrum_url: &str) -> anyhow::Result<()> { log::info!("Starting a rescan"); let electrum_client = electrumclient::create_electrum_client(electrum_url)?; - let core = shared_daemon.lock_anyhow()?; + let sp_wallet = WALLET.get().ok_or(Error::msg("Wallet not initialized"))?; + + let core = DAEMON + .get() + .ok_or(Error::msg("DAEMON not initialized"))? + .lock_anyhow()?; let secp = Secp256k1::new(); let scan_height = sp_wallet.get_wallet()?.get_outputs().get_last_scan();