From 306949e9f0a58bb3941912a9cf76980feca46303 Mon Sep 17 00:00:00 2001 From: Sosthene00 <674694@protonmail.ch> Date: Thu, 21 Mar 2024 18:07:22 +0100 Subject: [PATCH] Refactoring --- Cargo.lock | 2 +- src/main.rs | 179 ++++++++++++++++++++++++++++++---------------------- src/scan.rs | 64 +++++++------------ 3 files changed, 127 insertions(+), 118 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0749449..193a55b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1197,7 +1197,7 @@ dependencies = [ [[package]] name = "sp_backend" version = "0.1.0" -source = "git+https://github.com/Sosthene00/sp-backend?branch=sp_client#0213188a95921081f5c74e5099ac46e6737a07d0" +source = "git+https://github.com/Sosthene00/sp-backend?branch=sp_client#32967c214df9a25daef551a372b89c400f2369f8" dependencies = [ "anyhow", "bitcoin 0.31.1", diff --git a/src/main.rs b/src/main.rs index 4e0c9b0..4730f30 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,11 +1,12 @@ use std::{ collections::HashMap, env, + fmt::Debug, net::SocketAddr, ops::Deref, path::PathBuf, str::FromStr, - sync::{Arc, Mutex}, + sync::{Arc, Mutex, MutexGuard}, }; use bitcoincore_rpc::json::{self as bitcoin_json}; @@ -56,6 +57,8 @@ type Tx = UnboundedSender; type PeerMap = Arc>>; +type SharedDaemon = Arc>; + const FAUCET_AMT: Amount = Amount::from_sat(1000); pub(crate) trait MutexExt { @@ -216,19 +219,15 @@ fn find_owned_outputs( fn faucet_send( sp_address: SilentPaymentAddress, - sp_client: Arc>, - sp_outputs: Arc>, - daemon: Arc>, + sp_wallet: Arc, + shared_daemon: SharedDaemon, ) -> Result { let mut first_tx: Option = None; let final_tx: Transaction; let mut new_outpoints: HashMap; // do we have a sp output available ? - let available_outpoints = sp_outputs - .lock() - .map_err(|e| Error::msg(e.to_string()))? - .to_spendable_list(); + let available_outpoints = sp_wallet.get_outputs()?.to_spendable_list(); let available_amt = available_outpoints .iter() @@ -252,9 +251,8 @@ fn faucet_send( nb_outputs: 1, }; - let fee_estimate = daemon - .lock() - .map_err(|e| Error::msg(format!("{}", e.to_string())))? + let fee_estimate = shared_daemon + .lock_anyhow()? .estimate_fee(6)? .unwrap_or(Amount::from_sat(1000)) .checked_div(1000) @@ -262,7 +260,7 @@ fn faucet_send( log::debug!("fee estimate for 6 blocks: {}", fee_estimate); - let wallet = sp_client.lock().map_err(|e| Error::msg(e.to_string()))?; + let wallet = sp_wallet.get_client()?; let mut new_psbt = wallet.create_new_psbt(inputs.clone(), vec![recipient], None)?; log::debug!("Created psbt: {}", new_psbt); @@ -317,7 +315,8 @@ fn faucet_send( let keypair = Keypair::new(&secp, &mut thread_rng()); // we first spend from core to the pubkey we just created - let (core_tx, fee_rate) = spend_from_core(keypair.x_only_public_key().0, daemon.clone())?; + let (core_tx, fee_rate) = + spend_from_core(keypair.x_only_public_key().0, shared_daemon.clone())?; // check that the first output of the transaction pays to the key we just created assert!( @@ -352,10 +351,7 @@ fn faucet_send( .get(0) .expect("Failed to generate keys") .to_owned(); - let change_sp_address = sp_client - .lock() - .map_err(|e| Error::msg(format!("Failed to lock sp_client: {}", e.to_string())))? - .get_receiving_address(); + let change_sp_address = sp_wallet.get_client()?.get_receiving_address(); let change_output_key: XOnlyPublicKey = generate_recipient_pubkeys(vec![change_sp_address], partial_secret)? .into_values() @@ -410,9 +406,7 @@ fn faucet_send( first_tx = Some(core_tx); - let client = sp_client - .lock() - .map_err(|e| Error::msg(format!("{}", e.to_string())))?; + let client = sp_wallet.get_client()?; let input_pubkey = &keypair.public_key(); @@ -431,9 +425,9 @@ fn faucet_send( new_outpoints = find_owned_outputs(&final_tx, ours)?; } - if let Ok(core) = daemon.lock() { + if let Ok(daemon) = shared_daemon.lock() { // get current blockheight - let blkheight: u32 = core.get_current_height()?.try_into()?; + let blkheight: u32 = daemon.get_current_height()?.try_into()?; // update the new outpoints for o in new_outpoints.iter_mut() { o.1.blockheight = blkheight; @@ -441,34 +435,32 @@ fn faucet_send( // broadcast one or two transactions if first_tx.is_some() { - core.broadcast(&first_tx.unwrap())?; + daemon.broadcast(&first_tx.unwrap())?; } - core.broadcast(&final_tx)?; + daemon.broadcast(&final_tx)?; } else { return Err(Error::msg("Failed to lock daemon")); } // update our sp_client with the change output(s) - let mut outputs = sp_outputs - .lock() - .map_err(|e| Error::msg(format!("{}", e.to_string())))?; + let mut outputs = sp_wallet.get_outputs()?; outputs.extend_from(new_outpoints); - log::debug!("{:?}", outputs.to_outpoints_list()); + // save to disk + sp_wallet.get_storage()?.save(outputs.deref())?; Ok(final_tx.txid()) } fn handle_faucet_request( msg: &str, - sp_client: Arc>, - sp_outputs: Arc>, - daemon: Arc>, + sp_wallet: Arc, + shared_daemon: SharedDaemon, ) -> Result { if let Ok(sp_address) = SilentPaymentAddress::try_from(&msg["faucet".len()..]) { // send bootstrap coins to this sp_address - faucet_send(sp_address, sp_client, sp_outputs, daemon) + faucet_send(sp_address, sp_wallet, shared_daemon) } else { Err(Error::msg(format!( "faucet message with unparsable sp_address" @@ -477,12 +469,11 @@ fn handle_faucet_request( } async fn handle_connection( - peer_map: PeerMap, + peers: PeerMap, + shared_daemon: SharedDaemon, + sp_wallet: Arc, raw_stream: TcpStream, addr: SocketAddr, - sp_client: Arc>, - sp_outputs: Arc>, - daemon: Arc>, ) { debug!("Incoming TCP connection from: {}", addr); @@ -493,24 +484,29 @@ async fn handle_connection( // Insert the write part of this peer to the peer map. let (tx, rx) = unbounded_channel(); - peer_map.lock().unwrap().insert(addr, tx); + match peers.lock_anyhow() { + Ok(mut peer_map) => peer_map.insert(addr, tx), + Err(e) => { + log::error!("{}", e); + panic!(); + } + }; let (outgoing, incoming) = ws_stream.split(); let broadcast_incoming = incoming.try_for_each({ - let peer_map = peer_map.clone(); + let peers = peers.clone(); move |msg| { if msg.is_text() { if msg.to_string().starts_with("faucet") { match handle_faucet_request( &msg.to_string(), - sp_client.clone(), - sp_outputs.clone(), - daemon.clone(), + sp_wallet.clone(), + shared_daemon.clone(), ) { Ok(txid) => { if let Err(e) = broadcast_message( - peer_map.clone(), + peers.clone(), Message::Text(format!("faucet{}", txid.to_string())), BroadcastType::Sender(addr), ) { @@ -521,7 +517,7 @@ async fn handle_connection( } Err(e) => { if let Err(e) = broadcast_message( - peer_map.clone(), + peers.clone(), Message::Text(e.to_string()), BroadcastType::Sender(addr), ) { @@ -532,7 +528,7 @@ async fn handle_connection( } else { // we just send it `as is` to everyone except sender if let Err(e) = - broadcast_message(peer_map.clone(), msg, BroadcastType::ExcludeSender(addr)) + broadcast_message(peers.clone(), msg, BroadcastType::ExcludeSender(addr)) { log::error!("Failed to broadcast message: {}", e); } @@ -558,7 +554,7 @@ async fn handle_connection( future::select(broadcast_incoming, receive_from_others).await; debug!("{} disconnected", &addr); - peer_map.lock().unwrap().remove(&addr); + peers.lock().unwrap().remove(&addr); } fn flatten_msg(parts: &[Vec]) -> Vec { @@ -628,7 +624,11 @@ fn process_raw_tx_message( } } -async fn handle_zmq(peer_map: PeerMap, daemon: Arc>) { +async fn handle_zmq( + peers: PeerMap, + shared_daemon: SharedDaemon, + sp_wallet: Arc, +) { tokio::task::spawn_blocking(move || { debug!("Starting listening on Core"); for msg in bitcoincore_zmq::subscribe_receiver(&["tcp://127.0.0.1:29000"]).unwrap() { @@ -643,22 +643,50 @@ async fn handle_zmq(peer_map: PeerMap, daemon: Arc>) { let payload: Vec = match core_msg.topic_str() { "rawtx" => { - let processed = process_raw_tx_message(&core_msg, daemon.clone()); + let processed = process_raw_tx_message(&core_msg, shared_daemon.clone()); match processed { Ok(p) => p, Err(_) => continue, } - } + }, + "rawblock" => { + // scan the block for our outputs + match scan_blocks(shared_daemon.clone(), sp_wallet.clone(), 1) { + Ok(()) => { + let updated = match sp_wallet.get_outputs() { + Ok(sp_outputs) => sp_outputs, + Err(e) => { + log::error!("{}", e); + continue; + } + }; + match sp_wallet.get_storage() { + Ok(storage) => { + if let Err(e) = storage.save(updated.deref()) { + log::error!("{}", e); + } + } + Err(e) => { + log::error!("{}", e); + } + } + } + Err(e) => log::error!("{}", e), + }; + + flatten_msg(&core_msg.serialize_to_vecs()) + }, _ => flatten_msg(&core_msg.serialize_to_vecs()), }; if let Err(e) = broadcast_message( - peer_map.clone(), + peers.clone(), Message::Binary(payload), BroadcastType::ToAll, ) { log::error!("{}", e.to_string()); } + } }); } @@ -678,16 +706,15 @@ async fn main() -> Result<()> { .expect("Please provide either \"true\" or \"false\""); let core_wallet: Option = env::args().nth(4); - let state = PeerMap::new(Mutex::new(HashMap::new())); + let peers = PeerMap::new(Mutex::new(HashMap::new())); // Connect the rpc daemon - let daemon = Daemon::connect(core_wallet).unwrap(); + let shared_daemon = Arc::new(Mutex::new(Daemon::connect(core_wallet)?)); - let current_tip: u32 = daemon - .get_current_height() - .expect("Failed to make rpc call") - .try_into() - .expect("block count is higher than u32::MAX"); + let current_tip: u32 = shared_daemon + .lock_anyhow()? + .get_current_height()? + .try_into()?; let mut config_dir = PathBuf::from_str(&env::var("HOME")?)?; config_dir.push(".4nk"); @@ -753,30 +780,31 @@ async fn main() -> Result<()> { let last_scan = sp_outputs.get_last_scan(); - let shared_daemon = Arc::new(Mutex::new(daemon)); - let shared_sp_client = Arc::new(Mutex::new(sp_client)); - let shared_sp_outputs = Arc::new(Mutex::new(sp_outputs)); + let shared_sp_client = Mutex::new(sp_client); + let shared_sp_outputs = Mutex::new(sp_outputs); + let shared_outputs_storage = Mutex::new(sp_outputs_file); + + let sp_wallet = Arc::new(SilentPaymentWallet { + sp_client: shared_sp_client, + sp_outputs: shared_sp_outputs, + storage: shared_outputs_storage, + }); if last_scan < current_tip { log::info!("Scanning for our outputs"); - match scan_blocks( - shared_sp_client.clone(), + scan_blocks( shared_daemon.clone(), - shared_sp_outputs.clone(), + sp_wallet.clone(), current_tip - last_scan, - ) { - Ok(()) => { - let updated = shared_sp_outputs - .lock() - .map_err(|e| Error::msg(format!("Failed to lock daemon: {}", e)))?; - sp_outputs_file.save(updated.deref())?; - } - Err(e) => return Err(e), - }; + )?; } // Subscribe to Bitcoin Core - tokio::spawn(handle_zmq(state.clone(), shared_daemon.clone())); + tokio::spawn(handle_zmq( + peers.clone(), + shared_daemon.clone(), + sp_wallet.clone(), + )); // Create the event loop and TCP listener we'll accept connections on. let try_socket = TcpListener::bind(&addr).await; @@ -786,12 +814,11 @@ async fn main() -> Result<()> { // Let's spawn the handling of each connection in a separate task. while let Ok((stream, addr)) = listener.accept().await { tokio::spawn(handle_connection( - state.clone(), + peers.clone(), + shared_daemon.clone(), + sp_wallet.clone(), stream, addr, - shared_sp_client.clone(), - shared_sp_outputs.clone(), - shared_daemon.clone(), )); } diff --git a/src/scan.rs b/src/scan.rs index 1b9b7d2..7e25403 100644 --- a/src/scan.rs +++ b/src/scan.rs @@ -1,6 +1,7 @@ use std::collections::HashMap; +use std::ops::Deref; use std::str::FromStr; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use anyhow::{Error, Result}; use electrum_client::ElectrumApi; @@ -9,11 +10,12 @@ use sp_backend::bitcoin::bip158::BlockFilter; use sp_backend::bitcoin::hex::DisplayHex; use sp_backend::bitcoin::secp256k1::{All, PublicKey, Scalar, Secp256k1, SecretKey}; use sp_backend::bitcoin::{BlockHash, OutPoint, Transaction, TxOut, XOnlyPublicKey}; +use sp_backend::db::Storage; use sp_backend::silentpayments::receiving::Receiver; -use sp_backend::spclient::{OutputList, OutputSpendStatus, OwnedOutput, SpClient}; +use sp_backend::spclient::{OutputSpendStatus, OwnedOutput}; use tokio::time::Instant; -use crate::{electrumclient, Daemon}; +use crate::{electrumclient, MutexExt, SharedDaemon, SilentPaymentWallet}; fn get_script_to_secret_map( sp_receiver: &Receiver, @@ -172,23 +174,17 @@ fn scan_block_inputs( } pub fn scan_blocks( - sp_client: Arc>, - daemon: Arc>, - sp_outputs: Arc>, + shared_daemon: SharedDaemon, + sp_wallet: Arc, mut n_blocks_to_scan: u32, ) -> anyhow::Result<()> { log::info!("Starting a rescan"); let electrum_client = electrumclient::create_electrum_client()?; - let core = daemon - .lock() - .map_err(|e| Error::msg(format!("Failed to lock daemon: {}", e.to_string())))?; + let core = shared_daemon.lock_anyhow()?; let secp = Secp256k1::new(); - let scan_height = sp_outputs - .lock() - .map_err(|e| Error::msg(format!("Failed to lock daemon: {}", e.to_string())))? - .get_last_scan(); + let scan_height = sp_wallet.get_outputs()?.get_last_scan(); let tip_height: u32 = core.get_current_height()?.try_into()?; // 0 means scan to tip @@ -215,16 +211,9 @@ pub fn scan_blocks( let mut tweak_data_map = electrum_client.sp_tweaks(start as usize)?; - let scan_sk = sp_client - .lock() - .map_err(|e| Error::msg(format!("Failed to lock daemon: {}", e.to_string())))? - .get_scan_key(); + let scan_sk = sp_wallet.get_client()?.get_scan_key(); - let sp_receiver = sp_client - .lock() - .map_err(|e| Error::msg(format!("Failed to lock daemon: {}", e.to_string())))? - .sp_receiver - .clone(); + let sp_receiver = sp_wallet.get_client()?.sp_receiver.clone(); let start_time = Instant::now(); for (blkheight, blkhash, blkfilter) in filters { @@ -239,10 +228,8 @@ pub fn scan_blocks( let candidate_spks: Vec<&[u8; 34]> = spk2secret.keys().collect(); // check if owned inputs are spent - let our_outputs: HashMap = sp_outputs - .lock() - .map_err(|e| Error::msg(format!("Failed to lock daemon: {}", e.to_string())))? - .to_outpoints_list(); + let our_outputs: HashMap = + sp_wallet.get_outputs()?.to_outpoints_list(); let owned_spks: Result>> = our_outputs .iter() @@ -261,25 +248,18 @@ pub fn scan_blocks( let utxo_created_in_block = scan_block_outputs(&sp_receiver, &blk.txdata, blkheight.into(), spk2secret)?; if !utxo_created_in_block.is_empty() { - sp_outputs - .lock() - .map_err(|e| Error::msg(format!("Failed to lock daemon: {}", e)))? - .extend_from(utxo_created_in_block); + sp_wallet.get_outputs()?.extend_from(utxo_created_in_block); } // update the list of outputs just in case // utxos may be created and destroyed in the same block - let updated_outputs: HashMap = sp_outputs - .lock() - .map_err(|e| Error::msg(format!("Failed to lock daemon: {}", e.to_string())))? - .to_outpoints_list(); + let updated_outputs: HashMap = + sp_wallet.get_outputs()?.to_outpoints_list(); // search inputs and mark as mined let utxo_destroyed_in_block = scan_block_inputs(updated_outputs, blk.txdata)?; if !utxo_destroyed_in_block.is_empty() { - let mut outputs = sp_outputs - .lock() - .map_err(|e| Error::msg(format!("Failed to lock daemon: {}", e)))?; + let mut outputs = sp_wallet.get_outputs()?; for outpoint in utxo_destroyed_in_block { outputs.mark_mined(outpoint, blkhash)?; } @@ -294,9 +274,11 @@ pub fn scan_blocks( ); // update last_scan height - sp_outputs - .lock() - .map_err(|e| Error::msg(format!("Failed to lock daemon: {}", e)))? - .update_last_scan(end); + let mut updated = sp_wallet.get_outputs()?; + + updated.update_last_scan(end); + + sp_wallet.get_storage()?.save(updated.deref())?; + Ok(()) }