diff --git a/src/commit.rs b/src/commit.rs index b16b022..bef6cdc 100644 --- a/src/commit.rs +++ b/src/commit.rs @@ -6,13 +6,17 @@ use std::{ use anyhow::{Error, Result}; use bitcoincore_rpc::bitcoin::hex::DisplayHex; -use sdk_common::{pcd::Member, silentpayments::sign_transaction, sp_client::{silentpayments::SilentPaymentAddress, RecipientAddress}}; +use sdk_common::network::{AnkFlag, CommitMessage, HandshakeMessage}; +use sdk_common::process::{lock_processes, Process, ProcessState}; use sdk_common::serialization::{OutPointMemberMap, OutPointProcessMap}; use sdk_common::silentpayments::create_transaction; -use sdk_common::sp_client::{FeeRate, Recipient}; -use sdk_common::network::{AnkFlag, CommitMessage, HandshakeMessage}; use sdk_common::sp_client::bitcoin::{Amount, OutPoint}; -use sdk_common::process::{lock_processes, Process, ProcessState}; +use sdk_common::sp_client::{FeeRate, Recipient}; +use sdk_common::{ + pcd::Member, + silentpayments::sign_transaction, + sp_client::{silentpayments::SilentPaymentAddress, RecipientAddress}, +}; use crate::message::{broadcast_message, BroadcastType}; use crate::{lock_freezed_utxos, MutexExt, DAEMON, STORAGE, WALLET}; @@ -24,10 +28,7 @@ pub(crate) fn handle_commit_request(commit_msg: CommitMessage) -> Result Result Result Result { let mut new_member_map = HashMap::new(); new_member_map.insert(pairing_process_id, new_member.clone()); let init_msg = HandshakeMessage::new( - our_sp_address.into(), + our_sp_address.into(), OutPointMemberMap(new_member_map), OutPointProcessMap(HashMap::new()), ); if let Err(e) = broadcast_message( - AnkFlag::Handshake, - format!("{}", init_msg.to_string()), - BroadcastType::ToAll - ) - { + AnkFlag::Handshake, + format!("{}", init_msg.to_string()), + BroadcastType::ToAll, + ) { log::error!("Failed to send handshake message: {}", e); } } else { - log::error!("Failed to find new member with process id {}", pairing_process_id); + log::error!( + "Failed to find new member with process id {}", + pairing_process_id + ); } } @@ -105,7 +107,7 @@ fn handle_new_process(commit_msg: &CommitMessage) -> Result { let init_state = ProcessState { commited_in: commit_msg.process_id, roles: commit_msg.roles.clone(), - pcd_commitment: commit_msg.pcd_commitment.clone(), + pcd_commitment: commit_msg.pcd_commitment.clone(), state_id: merkle_root_bin, public_data: commit_msg.public_data.clone(), ..Default::default() @@ -134,32 +136,41 @@ fn handle_member_list(commit_msg: &CommitMessage) -> Result { if !pairing_role.members.is_empty() { return Err(Error::msg("Process is not a pairing process")); } - } else { + } else { return Err(Error::msg("Process is not a pairing process")); } if let Some(paired_addresses) = commit_msg.public_data.get("pairedAddresses") { - let paired_addresses: Vec = sdk_common::serialization::ciborium_deserialize(paired_addresses)?; + let paired_addresses: Vec = + sdk_common::serialization::ciborium_deserialize(paired_addresses)?; let mut memberlist = lock_members()?; - memberlist.insert( - commit_msg.process_id, - Member::new(paired_addresses) - ); + memberlist.insert(commit_msg.process_id, Member::new(paired_addresses)); return Ok(commit_msg.process_id); } Err(Error::msg("Process is not a pairing process")) } -fn handle_existing_commitment(process_to_udpate: &mut Process, commit_msg: &CommitMessage) -> Result<()> { +fn handle_existing_commitment( + process_to_udpate: &mut Process, + commit_msg: &CommitMessage, +) -> Result<()> { let process_id = process_to_udpate.get_process_id()?; match register_new_state(process_to_udpate, &commit_msg) { - Ok(new_state_id) => log::debug!("Registering new state for process {} with state id {}", process_id, new_state_id.to_lower_hex_string()), - Err(existing_state_id) => log::debug!("State {} already exists", existing_state_id) + Ok(new_state_id) => log::debug!( + "Registering new state for process {} with state id {}", + process_id, + new_state_id.to_lower_hex_string() + ), + Err(existing_state_id) => log::debug!("State {} already exists", existing_state_id), } if commit_msg.validation_tokens.len() > 0 { - log::debug!("Received commit_msg with {} validation tokens for process {}", commit_msg.validation_tokens.len(), process_id); + log::debug!( + "Received commit_msg with {} validation tokens for process {}", + commit_msg.validation_tokens.len(), + process_id + ); // If the validation succeed, we return a new tip process_validation(process_to_udpate, commit_msg)?; } @@ -169,8 +180,11 @@ fn handle_existing_commitment(process_to_udpate: &mut Process, commit_msg: &Comm pub fn dump_cached_members() -> Result<(), anyhow::Error> { let members = lock_members()?.clone(); - - let storage = STORAGE.get().ok_or(Error::msg("STORAGE is not initialized"))?.lock_anyhow()?; + + let storage = STORAGE + .get() + .ok_or(Error::msg("STORAGE is not initialized"))? + .lock_anyhow()?; let members_file = &storage.members_file; @@ -184,7 +198,10 @@ pub fn dump_cached_members() -> Result<(), anyhow::Error> { } pub fn dump_cached_processes(processes: HashMap) -> Result<(), anyhow::Error> { - let storage = STORAGE.get().ok_or(Error::msg("STORAGE is not initialized"))?.lock_anyhow()?; + let storage = STORAGE + .get() + .ok_or(Error::msg("STORAGE is not initialized"))? + .lock_anyhow()?; let processes_file = &storage.processes_file; @@ -201,11 +218,18 @@ pub fn dump_cached_processes(processes: HashMap) -> Result<() fn register_new_state(process: &mut Process, commit_msg: &CommitMessage) -> Result<[u8; 32]> { let last_commited_state = process.get_latest_commited_state(); - let new_state_id = commit_msg.pcd_commitment.create_merkle_tree()?.root().unwrap(); + let new_state_id = commit_msg + .pcd_commitment + .create_merkle_tree()? + .root() + .unwrap(); if let Some(state) = last_commited_state { if new_state_id == state.state_id { - return Err(Error::msg(format!("{}", new_state_id.to_lower_hex_string()))); + return Err(Error::msg(format!( + "{}", + new_state_id.to_lower_hex_string() + ))); } } @@ -213,13 +237,15 @@ fn register_new_state(process: &mut Process, commit_msg: &CommitMessage) -> Resu let (empty_state, actual_states) = concurrent_states.split_last().unwrap(); let current_outpoint = empty_state.commited_in; - // Ensure no duplicate states if actual_states .iter() .any(|state| state.state_id == new_state_id) { - return Err(Error::msg(format!("{}", new_state_id.to_lower_hex_string()))); + return Err(Error::msg(format!( + "{}", + new_state_id.to_lower_hex_string() + ))); } // Add the new state @@ -237,30 +263,39 @@ fn register_new_state(process: &mut Process, commit_msg: &CommitMessage) -> Resu } // Process validation for a state with validation tokens -fn process_validation(updated_process: &mut Process, commit_msg: &CommitMessage) -> Result { +fn process_validation( + updated_process: &mut Process, + commit_msg: &CommitMessage, +) -> Result { let new_state_id = if commit_msg.pcd_commitment.is_empty() { // We're dealing with an obliteration attempt [0u8; 32] - } - else { - commit_msg.pcd_commitment.create_merkle_tree()?.root().ok_or(Error::msg("Invalid merkle tree"))? + } else { + commit_msg + .pcd_commitment + .create_merkle_tree()? + .root() + .ok_or(Error::msg("Invalid merkle tree"))? }; { - let state_to_update = updated_process - .get_state_for_id_mut(&new_state_id)?; + let state_to_update = updated_process.get_state_for_id_mut(&new_state_id)?; // Complete with the received tokens - state_to_update.validation_tokens.extend(commit_msg.validation_tokens.iter()); + state_to_update + .validation_tokens + .extend(commit_msg.validation_tokens.iter()); state_to_update.validation_tokens.sort_unstable(); state_to_update.validation_tokens.dedup(); } - let state_to_validate = updated_process - .get_state_for_id(&new_state_id)?; + let state_to_validate = updated_process.get_state_for_id(&new_state_id)?; let members = lock_members()?.clone(); - state_to_validate.is_valid(updated_process.get_latest_commited_state(), &OutPointMemberMap(members))?; + state_to_validate.is_valid( + updated_process.get_latest_commited_state(), + &OutPointMemberMap(members), + )?; let commited_in = commit_new_transaction(updated_process, state_to_validate.clone())?; @@ -291,16 +326,18 @@ fn commit_new_transaction( if let Some(member) = lock_members()?.get(&updated_process.get_process_id().unwrap()) { // We just pick one of the devices of the member at random en pay to it, member can then share the private key between all devices // For now we take the first address - let address: SilentPaymentAddress = member.get_addresses().get(0).unwrap().as_str().try_into()?; - recipients.push(Recipient { - address: RecipientAddress::SpAddress(address), - amount: Amount::from_sat(1000), + let address: SilentPaymentAddress = + member.get_addresses().get(0).unwrap().as_str().try_into()?; + recipients.push(Recipient { + address: RecipientAddress::SpAddress(address), + amount: Amount::from_sat(1000), }); } // This output is used to generate publicly available public keys without having to go through too many loops let daemon = DAEMON.get().unwrap().lock_anyhow()?; - let fee_rate = daemon.estimate_fee(6) + let fee_rate = daemon + .estimate_fee(6) .unwrap_or(Amount::from_sat(1000)) .checked_div(1000) .unwrap(); @@ -309,7 +346,10 @@ fn commit_new_transaction( let next_commited_in = updated_process.get_process_tip()?; if !freezed_utxos.contains(&next_commited_in) { - return Err(Error::msg(format!("Missing next commitment outpoint for process {}", updated_process.get_process_id()?))); + return Err(Error::msg(format!( + "Missing next commitment outpoint for process {}", + updated_process.get_process_id()? + ))); }; let unspent_outputs = sp_wallet.get_unspent_outputs(); @@ -354,8 +394,14 @@ fn commit_new_transaction( mod tests { use super::*; + use crate::daemon::RpcCall; + use crate::DiskStorage; + use crate::StateFile; use bitcoincore_rpc::bitcoin::consensus::deserialize; use bitcoincore_rpc::bitcoin::hex::DisplayHex; + use bitcoincore_rpc::bitcoin::*; + use mockall::mock; + use mockall::predicate::*; use sdk_common::pcd::Member; use sdk_common::pcd::Pcd; use sdk_common::pcd::PcdCommitments; @@ -363,23 +409,17 @@ mod tests { use sdk_common::pcd::Roles; use sdk_common::pcd::ValidationRule; use sdk_common::process::CACHEDPROCESSES; + use sdk_common::sp_client::bitcoin::consensus::serialize; + use sdk_common::sp_client::bitcoin::hex::FromHex; use sdk_common::sp_client::silentpayments::SilentPaymentAddress; - use mockall::predicate::*; - use mockall::mock; use serde_json::json; + use serde_json::{Map, Value}; use std::collections::BTreeMap; use std::collections::HashMap; use std::path::PathBuf; use std::str::FromStr; use std::sync::Mutex; - use bitcoincore_rpc::bitcoin::*; - use crate::daemon::RpcCall; - use crate::DiskStorage; - use crate::StateFile; use std::sync::OnceLock; - use sdk_common::sp_client::bitcoin::consensus::serialize; - use sdk_common::sp_client::bitcoin::hex::FromHex; - use serde_json::{Map, Value}; const LOCAL_ADDRESS: &str = "sprt1qq222dhaxlzmjft2pa7qtspw2aw55vwfmtnjyllv5qrsqwm3nufxs6q7t88jf9asvd7rxhczt87de68du3jhem54xvqxy80wc6ep7lauxacsrq79v"; const INIT_TRANSACTION: &str = "02000000000102b01b832bf34cf87583c628839c5316546646dcd4939e339c1d83e693216cdfa00100000000fdffffffdd1ca865b199accd4801634488fca87e0cf81b36ee7e9bec526a8f922539b8670000000000fdffffff0200e1f505000000001600140798fac9f310cefad436ea928f0bdacf03a11be544e0f5050000000016001468a66f38e7c2c9e367577d6fad8532ae2c728ed2014043764b77de5041f80d19e3d872f205635f87486af015c00d2a3b205c694a0ae1cbc60e70b18bcd4470abbd777de63ae52600aba8f5ad1334cdaa6bcd931ab78b0140b56dd8e7ac310d6dcbc3eff37f111ced470990d911b55cd6ff84b74b579c17d0bba051ec23b738eeeedba405a626d95f6bdccb94c626db74c57792254bfc5a7c00000000"; @@ -487,17 +527,24 @@ mod tests { pub fn initialize_static_variables() { if DAEMON.get().is_none() { let mut daemon = MockDaemon::new(); - daemon.expect_broadcast() + daemon + .expect_broadcast() .withf(|tx: &Transaction| serialize(tx).to_lower_hex_string() == INIT_TRANSACTION) .returning(|tx| Ok(tx.txid())); - DAEMON.set(Mutex::new(Box::new(daemon))).expect("DAEMON should only be initialized once"); + DAEMON + .set(Mutex::new(Box::new(daemon))) + .expect("DAEMON should only be initialized once"); println!("Initialized DAEMON"); } if WALLET.get().is_none() { let mut wallet = MockSilentPaymentWallet::new(); - wallet.expect_get_sp_wallet().returning(|| Ok(MockSpWallet::new())); - WALLET.set(wallet).expect("WALLET should only be initialized once"); + wallet + .expect_get_sp_wallet() + .returning(|| Ok(MockSpWallet::new())); + WALLET + .set(wallet) + .expect("WALLET should only be initialized once"); println!("Initialized WALLET"); } @@ -505,7 +552,7 @@ mod tests { CACHEDPROCESSES .set(Mutex::new(HashMap::new())) .expect("CACHEDPROCESSES should only be initialized once"); - + println!("Initialized CACHEDPROCESSES"); } @@ -513,7 +560,7 @@ mod tests { let wallet_file = StateFile::new(PathBuf::from_str(TMP_WALLET).unwrap()); let processes_file = StateFile::new(PathBuf::from_str(TMP_PROCESSES).unwrap()); let members_file = StateFile::new(PathBuf::from_str(TMP_MEMBERS).unwrap()); - + wallet_file.create().unwrap(); processes_file.create().unwrap(); members_file.create().unwrap(); @@ -521,7 +568,7 @@ mod tests { let disk_storage = DiskStorage { wallet_file, processes_file, - members_file + members_file, }; STORAGE .set(Mutex::new(disk_storage)) @@ -532,9 +579,17 @@ mod tests { } fn mock_commit_msg(process_id: OutPoint) -> CommitMessage { - let field_names = ["a".to_owned(), "b".to_owned(), "pub_a".to_owned(), "roles".to_owned()]; - let pairing_id = OutPoint::from_str("b0c8378ee68e9a73836b04423ddb6de9fc0e2e715e04ffe6aa34117bb1025f01:0").unwrap(); - let member = Member::new(vec![SilentPaymentAddress::try_from(LOCAL_ADDRESS).unwrap()]); + let field_names = [ + "a".to_owned(), + "b".to_owned(), + "pub_a".to_owned(), + "roles".to_owned(), + ]; + let pairing_id = OutPoint::from_str( + "b0c8378ee68e9a73836b04423ddb6de9fc0e2e715e04ffe6aa34117bb1025f01:0", + ) + .unwrap(); + let member = Member::new(vec![SilentPaymentAddress::try_from(LOCAL_ADDRESS).unwrap()]); let validation_rule = ValidationRule::new(1.0, Vec::from(field_names), 1.0).unwrap(); let role_def = RoleDefinition { @@ -544,8 +599,14 @@ mod tests { }; let roles = Roles::new(BTreeMap::from([(String::from("role_name"), role_def)])); let public_data = TryInto::::try_into(json!({"pub_a": Value::Null})).unwrap(); - let clear_state = TryInto::::try_into(json!({"a": Value::Null, "b": Value::Null})).unwrap(); - let pcd_commitments = PcdCommitments::new(&process_id, &Pcd::new(public_data.clone().into_iter().chain(clear_state).collect()), &roles).unwrap(); + let clear_state = + TryInto::::try_into(json!({"a": Value::Null, "b": Value::Null})).unwrap(); + let pcd_commitments = PcdCommitments::new( + &process_id, + &Pcd::new(public_data.clone().into_iter().chain(clear_state).collect()), + &roles, + ) + .unwrap(); let commit_msg = CommitMessage { process_id, @@ -562,7 +623,8 @@ mod tests { #[test] fn test_handle_commit_new_process() { initialize_static_variables(); - let init_tx = deserialize::(&Vec::from_hex(INIT_TRANSACTION).unwrap()).unwrap(); + let init_tx = + deserialize::(&Vec::from_hex(INIT_TRANSACTION).unwrap()).unwrap(); let init_txid = init_tx.txid(); let process_id = OutPoint::new(init_txid, 0); @@ -584,7 +646,10 @@ mod tests { let updated_process = cache.get(&process_id); assert!(updated_process.is_some()); - let concurrent_states = updated_process.unwrap().get_latest_concurrent_states().unwrap(); + let concurrent_states = updated_process + .unwrap() + .get_latest_concurrent_states() + .unwrap(); // Constructing the roles_map that was inserted in the process let roles_object = serde_json::to_value(roles).unwrap(); @@ -603,7 +668,8 @@ mod tests { #[test] fn test_handle_commit_new_state() { initialize_static_variables(); - let init_tx = deserialize::(&Vec::from_hex(INIT_TRANSACTION).unwrap()).unwrap(); + let init_tx = + deserialize::(&Vec::from_hex(INIT_TRANSACTION).unwrap()).unwrap(); let init_txid = init_tx.txid(); let process_id = OutPoint::new(init_txid, 0); @@ -613,7 +679,12 @@ mod tests { let pcd_commitment = commit_msg.pcd_commitment.clone(); let process = Process::new(process_id); - CACHEDPROCESSES.get().unwrap().lock().unwrap().insert(process_id, process); + CACHEDPROCESSES + .get() + .unwrap() + .lock() + .unwrap() + .insert(process_id, process); let result = handle_commit_request(commit_msg); @@ -623,7 +694,10 @@ mod tests { let updated_process = cache.get(&process_id); assert!(updated_process.is_some()); - let concurrent_states = updated_process.unwrap().get_latest_concurrent_states().unwrap(); + let concurrent_states = updated_process + .unwrap() + .get_latest_concurrent_states() + .unwrap(); let roles_object = serde_json::to_value(roles).unwrap(); let mut roles_map = Map::new(); diff --git a/src/daemon.rs b/src/daemon.rs index 6b2e0f7..63a5168 100644 --- a/src/daemon.rs +++ b/src/daemon.rs @@ -126,11 +126,7 @@ pub struct Daemon { } impl RpcCall for Daemon { - fn connect( - rpcwallet: Option, - rpc_url: String, - network: Network, - ) -> Result { + fn connect(rpcwallet: Option, rpc_url: String, network: Network) -> Result { let mut rpc = rpc_connect(rpcwallet, network, rpc_url)?; loop { @@ -300,11 +296,7 @@ impl RpcCall for Daemon { Ok(txid) } - fn get_transaction_info( - &self, - txid: &Txid, - blockhash: Option, - ) -> Result { + fn get_transaction_info(&self, txid: &Txid, blockhash: Option) -> Result { // No need to parse the resulting JSON, just return it as-is to the client. self.rpc .call( @@ -314,11 +306,7 @@ impl RpcCall for Daemon { .context("failed to get transaction info") } - fn get_transaction_hex( - &self, - txid: &Txid, - blockhash: Option, - ) -> Result { + fn get_transaction_hex(&self, txid: &Txid, blockhash: Option) -> Result { use sdk_common::sp_client::bitcoin::consensus::serde::{hex::Lower, Hex, With}; let tx = self.get_transaction(txid, blockhash)?; @@ -328,11 +316,7 @@ impl RpcCall for Daemon { serde_json::to_value(TxAsHex(tx)).map_err(Into::into) } - fn get_transaction( - &self, - txid: &Txid, - blockhash: Option, - ) -> Result { + fn get_transaction(&self, txid: &Txid, blockhash: Option) -> Result { self.rpc .get_raw_transaction(txid, blockhash.as_ref()) .context("failed to get transaction") @@ -378,10 +362,7 @@ impl RpcCall for Daemon { .collect()) } - fn get_mempool_transactions( - &self, - txids: &[Txid], - ) -> Result>> { + fn get_mempool_transactions(&self, txids: &[Txid]) -> Result>> { let client = self.rpc.get_jsonrpc_client(); log::debug!("getting {} transactions", txids.len()); let args: Vec<_> = txids @@ -406,15 +387,12 @@ impl RpcCall for Daemon { }) .collect()) } - } pub(crate) trait RpcCall: Send + Sync + std::fmt::Debug { - fn connect( - rpcwallet: Option, - rpc_url: String, - network: Network, - ) -> Result where Self: Sized; + fn connect(rpcwallet: Option, rpc_url: String, network: Network) -> Result + where + Self: Sized; fn estimate_fee(&self, nblocks: u16) -> Result; @@ -451,23 +429,11 @@ pub(crate) trait RpcCall: Send + Sync + std::fmt::Debug { fn broadcast(&self, tx: &Transaction) -> Result; - fn get_transaction_info( - &self, - txid: &Txid, - blockhash: Option, - ) -> Result; + fn get_transaction_info(&self, txid: &Txid, blockhash: Option) -> Result; - fn get_transaction_hex( - &self, - txid: &Txid, - blockhash: Option, - ) -> Result; + fn get_transaction_hex(&self, txid: &Txid, blockhash: Option) -> Result; - fn get_transaction( - &self, - txid: &Txid, - blockhash: Option, - ) -> Result; + fn get_transaction(&self, txid: &Txid, blockhash: Option) -> Result; fn get_block_txids(&self, blockhash: BlockHash) -> Result>; @@ -478,10 +444,7 @@ pub(crate) trait RpcCall: Send + Sync + std::fmt::Debug { txids: &[Txid], ) -> Result>>; - fn get_mempool_transactions( - &self, - txids: &[Txid], - ) -> Result>>; + fn get_mempool_transactions(&self, txids: &[Txid]) -> Result>>; } pub(crate) type RpcError = bitcoincore_rpc::jsonrpc::error::RpcError; diff --git a/src/faucet.rs b/src/faucet.rs index b2d1dc4..8331ddf 100644 --- a/src/faucet.rs +++ b/src/faucet.rs @@ -31,7 +31,10 @@ use anyhow::{Error, Result}; use crate::lock_freezed_utxos; use crate::scan::check_transaction_alone; -use crate::{scan::compute_partial_tweak_to_transaction, MutexExt, DAEMON, FAUCET_AMT, WALLET, SilentPaymentAddress}; +use crate::{ + scan::compute_partial_tweak_to_transaction, MutexExt, SilentPaymentAddress, DAEMON, FAUCET_AMT, + WALLET, +}; fn spend_from_core(dest: XOnlyPublicKey) -> Result<(Transaction, Amount)> { let core = DAEMON @@ -63,8 +66,14 @@ fn spend_from_core(dest: XOnlyPublicKey) -> Result<(Transaction, Amount)> { } } -fn faucet_send(sp_address: SilentPaymentAddress, commitment: &str) -> Result<(Transaction, PublicKey)> { - let sp_wallet = WALLET.get().ok_or(Error::msg("Wallet not initialized"))?.lock_anyhow()?; +fn faucet_send( + sp_address: SilentPaymentAddress, + commitment: &str, +) -> Result<(Transaction, PublicKey)> { + let sp_wallet = WALLET + .get() + .ok_or(Error::msg("Wallet not initialized"))? + .lock_anyhow()?; let fee_estimate = DAEMON .get() @@ -85,7 +94,8 @@ fn faucet_send(sp_address: SilentPaymentAddress, commitment: &str) -> Result<(Tr let freezed_utxos = lock_freezed_utxos()?; // We filter out the freezed utxos from available list - let available_outpoints: Vec<(OutPoint, OwnedOutput)> = sp_wallet.get_outputs() + let available_outpoints: Vec<(OutPoint, OwnedOutput)> = sp_wallet + .get_outputs() .iter() .filter_map(|(outpoint, output)| { if !freezed_utxos.contains(&outpoint) { @@ -101,13 +111,12 @@ fn faucet_send(sp_address: SilentPaymentAddress, commitment: &str) -> Result<(Tr // We try to pay the faucet amount if let Ok(unsigned_transaction) = create_transaction( - available_outpoints, - sp_wallet.get_sp_client(), - vec![recipient], - Some(Vec::from_hex(commitment).unwrap()), - FeeRate::from_sat_per_vb(fee_estimate.to_sat() as f32), - ) - { + available_outpoints, + sp_wallet.get_sp_client(), + vec![recipient], + Some(Vec::from_hex(commitment).unwrap()), + FeeRate::from_sat_per_vb(fee_estimate.to_sat() as f32), + ) { let final_tx = sign_transaction(sp_wallet.get_sp_client(), unsigned_transaction)?; let partial_tweak = compute_partial_tweak_to_transaction(&final_tx)?; @@ -124,7 +133,7 @@ fn faucet_send(sp_address: SilentPaymentAddress, commitment: &str) -> Result<(Tr // We immediately add the new tx to our wallet to prevent accidental double spend check_transaction_alone(sp_wallet, &final_tx, &partial_tweak)?; - Ok((final_tx, partial_tweak)) + Ok((final_tx, partial_tweak)) } else { // let's try to spend directly from the mining address let secp = Secp256k1::signing_only(); diff --git a/src/main.rs b/src/main.rs index 32f0f0d..f46a425 100644 --- a/src/main.rs +++ b/src/main.rs @@ -10,26 +10,41 @@ use std::{ sync::{Mutex, MutexGuard, OnceLock}, }; -use bitcoincore_rpc::{bitcoin::secp256k1::SecretKey, json::{self as bitcoin_json}}; +use bitcoincore_rpc::{ + bitcoin::secp256k1::SecretKey, + json::{self as bitcoin_json}, +}; use commit::{lock_members, MEMBERLIST}; use futures_util::{future, pin_mut, stream::TryStreamExt, FutureExt, StreamExt}; use log::{debug, error, warn}; use message::{broadcast_message, process_message, BroadcastType, MessageCache, MESSAGECACHE}; use scan::{check_transaction_alone, compute_partial_tweak_to_transaction}; -use sdk_common::{network::HandshakeMessage, pcd::Member, process::{lock_processes, Process, CACHEDPROCESSES}, serialization::{OutPointMemberMap, OutPointProcessMap}, silentpayments::SpWallet, sp_client::{bitcoin::{ - consensus::deserialize, - hex::{DisplayHex, FromHex}, - Amount, Network, Transaction, -}, silentpayments::SilentPaymentAddress, OwnedOutput}, MutexExt}; use sdk_common::sp_client::{ - bitcoin::OutPoint, bitcoin::secp256k1::rand::{thread_rng, Rng}, - SpClient, SpendKey + bitcoin::OutPoint, + SpClient, SpendKey, }; use sdk_common::{ error::AnkError, network::{AnkFlag, NewTxMessage}, }; +use sdk_common::{ + network::HandshakeMessage, + pcd::Member, + process::{lock_processes, Process, CACHEDPROCESSES}, + serialization::{OutPointMemberMap, OutPointProcessMap}, + silentpayments::SpWallet, + sp_client::{ + bitcoin::{ + consensus::deserialize, + hex::{DisplayHex, FromHex}, + Amount, Network, Transaction, + }, + silentpayments::SilentPaymentAddress, + OwnedOutput, + }, + MutexExt, +}; use serde_json::Value; use tokio::net::{TcpListener, TcpStream}; @@ -40,16 +55,19 @@ use tokio_tungstenite::tungstenite::Message; use anyhow::{Error, Result}; use zeromq::{Socket, SocketRecv}; +mod commit; mod config; mod daemon; mod electrumclient; mod faucet; mod message; mod scan; -mod commit; use crate::config::Config; -use crate::{daemon::{Daemon, RpcCall}, scan::scan_blocks}; +use crate::{ + daemon::{Daemon, RpcCall}, + scan::scan_blocks, +}; type Tx = UnboundedSender; @@ -147,7 +165,11 @@ fn handle_new_tx_request(new_tx_msg: &NewTxMessage) -> Result<()> { Ok(()) } -async fn handle_connection(raw_stream: TcpStream, addr: SocketAddr, our_sp_address: SilentPaymentAddress) { +async fn handle_connection( + raw_stream: TcpStream, + addr: SocketAddr, + our_sp_address: SilentPaymentAddress, +) { debug!("Incoming TCP connection from: {}", addr); let peers = PEERMAP.get().expect("Peer Map not initialized"); @@ -177,17 +199,16 @@ async fn handle_connection(raw_stream: TcpStream, addr: SocketAddr, our_sp_addre let members = lock_members().unwrap().clone(); let init_msg = HandshakeMessage::new( - our_sp_address.to_string(), + our_sp_address.to_string(), OutPointMemberMap(members), OutPointProcessMap(processes), ); if let Err(e) = broadcast_message( - AnkFlag::Handshake, - format!("{}", init_msg.to_string()), - BroadcastType::Sender(addr) - ) - { + AnkFlag::Handshake, + format!("{}", init_msg.to_string()), + BroadcastType::Sender(addr), + ) { log::error!("Failed to send init message: {}", e); return; } @@ -230,7 +251,10 @@ fn create_new_tx_message(transaction: Vec) -> Result { let partial_tweak = compute_partial_tweak_to_transaction(&tx)?; - let sp_wallet = WALLET.get().ok_or_else(|| Error::msg("Wallet not initialized"))?.lock_anyhow()?; + let sp_wallet = WALLET + .get() + .ok_or_else(|| Error::msg("Wallet not initialized"))? + .lock_anyhow()?; check_transaction_alone(sp_wallet, &tx, &partial_tweak)?; Ok(NewTxMessage::new( @@ -391,7 +415,7 @@ async fn main() -> Result<()> { Ok(members) => { let deserialized: OutPointMemberMap = serde_json::from_value(members)?; deserialized.0 - }, + } Err(_) => { debug!("creating members file at {}", members_file.path.display()); members_file.create()?; @@ -401,10 +425,9 @@ async fn main() -> Result<()> { }; { - let utxo_to_freeze: HashSet = cached_processes.iter() - .map(|(_, process)| { - process.get_last_unspent_outpoint().unwrap() - }) + let utxo_to_freeze: HashSet = cached_processes + .iter() + .map(|(_, process)| process.get_last_unspent_outpoint().unwrap()) .collect(); let mut freezed_utxos = lock_freezed_utxos()?; @@ -413,10 +436,7 @@ async fn main() -> Result<()> { let our_sp_address = sp_wallet.get_sp_client().get_receiving_address(); - log::info!( - "Using wallet with address {}", - our_sp_address, - ); + log::info!("Using wallet with address {}", our_sp_address,); log::info!( "Found {} outputs for a total balance of {}", @@ -444,9 +464,7 @@ async fn main() -> Result<()> { members_file, }; - STORAGE - .set(Mutex::new(storage)) - .unwrap(); + STORAGE.set(Mutex::new(storage)).unwrap(); if last_scan < current_tip { log::info!("Scanning for our outputs"); diff --git a/src/message.rs b/src/message.rs index 5281fb6..8737ee8 100644 --- a/src/message.rs +++ b/src/message.rs @@ -10,7 +10,9 @@ use tokio_tungstenite::tungstenite::Message; use sdk_common::network::{AnkFlag, CommitMessage, Envelope, FaucetMessage, NewTxMessage}; -use crate::{commit::handle_commit_request, faucet::handle_faucet_request, handle_new_tx_request, PEERMAP}; +use crate::{ + commit::handle_commit_request, faucet::handle_faucet_request, handle_new_tx_request, PEERMAP, +}; pub(crate) static MESSAGECACHE: OnceLock = OnceLock::new(); @@ -55,9 +57,7 @@ impl MessageCache { let mut store = cache.store.lock().unwrap(); let now = Instant::now(); - store.retain(|_, entrytime| { - now.duration_since(*entrytime) <= MESSAGECACHEDURATION - }); + store.retain(|_, entrytime| now.duration_since(*entrytime) <= MESSAGECACHEDURATION); } } } @@ -84,7 +84,7 @@ pub(crate) fn broadcast_message( match ank_msg.flag { AnkFlag::Cipher => log::debug!("Broadcasting cipher"), AnkFlag::Handshake => log::debug!("Broadcasting handshake"), - _ => log::debug!("Broadcasting {} message: {}", ank_msg.flag.as_str(),msg) + _ => log::debug!("Broadcasting {} message: {}", ank_msg.flag.as_str(), msg), } match broadcast { BroadcastType::Sender(addr) => { @@ -184,16 +184,15 @@ fn process_commit_message(ank_msg: Envelope, addr: SocketAddr) { Ok(new_outpoint) => log::debug!("Processed commit msg for outpoint {}", new_outpoint), Err(e) => { log::error!("handle_commit_request returned error: {}", e); - // Temporary fix: we remove the message from the cache in case the client wants to try again + // Temporary fix: we remove the message from the cache in case the client wants to try again let cache = MESSAGECACHE.get().expect("Cache should be initialized"); cache.remove(ank_msg.to_string().as_str()); commit_msg.error = Some(e.into()); if let Err(e) = broadcast_message( - AnkFlag::Commit, + AnkFlag::Commit, serde_json::to_string(&commit_msg).expect("This shouldn't fail"), - BroadcastType::Sender(addr) - ) - { + BroadcastType::Sender(addr), + ) { log::error!("Failed to broadcast message: {}", e); } } diff --git a/src/scan.rs b/src/scan.rs index 3eb2d42..4a78b22 100644 --- a/src/scan.rs +++ b/src/scan.rs @@ -85,7 +85,11 @@ fn get_script_to_secret_map( Ok(res) } -pub fn check_transaction_alone(mut wallet: MutexGuard, tx: &Transaction, tweak_data: &PublicKey) -> Result> { +pub fn check_transaction_alone( + mut wallet: MutexGuard, + tx: &Transaction, + tweak_data: &PublicKey, +) -> Result> { let updates = match wallet.update_with_transaction(tx, tweak_data, 0) { Ok(updates) => updates, Err(e) => { @@ -95,8 +99,13 @@ pub fn check_transaction_alone(mut wallet: MutexGuard, tx: &Transactio }; if updates.len() > 0 { - let storage = STORAGE.get().ok_or_else(|| Error::msg("Failed to get STORAGE"))?; - storage.lock_anyhow()?.wallet_file.save(&serde_json::to_value(wallet.clone())?)?; + let storage = STORAGE + .get() + .ok_or_else(|| Error::msg("Failed to get STORAGE"))?; + storage + .lock_anyhow()? + .wallet_file + .save(&serde_json::to_value(wallet.clone())?)?; } Ok(updates) @@ -237,7 +246,10 @@ pub fn scan_blocks(mut n_blocks_to_scan: u32, electrum_url: &str) -> anyhow::Res log::info!("Starting a rescan"); let electrum_client = electrumclient::create_electrum_client(electrum_url)?; - let mut sp_wallet = WALLET.get().ok_or(Error::msg("Wallet not initialized"))?.lock_anyhow()?; + let mut sp_wallet = WALLET + .get() + .ok_or(Error::msg("Wallet not initialized"))? + .lock_anyhow()?; let core = DAEMON .get() @@ -278,9 +290,12 @@ pub fn scan_blocks(mut n_blocks_to_scan: u32, electrum_url: &str) -> anyhow::Res for (blkheight, blkhash, blkfilter) in filters { let spk2secret = match tweak_data_map.remove(&blkheight) { - Some(tweak_data_vec) => { - get_script_to_secret_map(&sp_wallet.get_sp_client().sp_receiver, tweak_data_vec, scan_sk.into(), &secp)? - } + Some(tweak_data_vec) => get_script_to_secret_map( + &sp_wallet.get_sp_client().sp_receiver, + tweak_data_vec, + scan_sk.into(), + &secp, + )?, None => HashMap::new(), }; @@ -288,7 +303,8 @@ pub fn scan_blocks(mut n_blocks_to_scan: u32, electrum_url: &str) -> anyhow::Res let candidate_spks: Vec<&[u8; 34]> = spk2secret.keys().collect(); // check if owned inputs are spent - let owned_spks: Vec> = sp_wallet.get_outputs() + let owned_spks: Vec> = sp_wallet + .get_outputs() .iter() .map(|(_, output)| { let script = output.script.to_bytes(); @@ -302,12 +318,14 @@ pub fn scan_blocks(mut n_blocks_to_scan: u32, electrum_url: &str) -> anyhow::Res let blk = core.get_block(blkhash)?; // scan block for new outputs, and add them to our list - let utxo_created_in_block = - scan_block_outputs(&sp_wallet.get_sp_client().sp_receiver, &blk.txdata, blkheight.into(), spk2secret)?; + let utxo_created_in_block = scan_block_outputs( + &sp_wallet.get_sp_client().sp_receiver, + &blk.txdata, + blkheight.into(), + spk2secret, + )?; if !utxo_created_in_block.is_empty() { - sp_wallet - .get_mut_outputs() - .extend(utxo_created_in_block); + sp_wallet.get_mut_outputs().extend(utxo_created_in_block); } // update the list of outputs just in case @@ -332,9 +350,13 @@ pub fn scan_blocks(mut n_blocks_to_scan: u32, electrum_url: &str) -> anyhow::Res ); // update last_scan height - sp_wallet - .set_last_scan(end); - STORAGE.get().unwrap().lock_anyhow()?.wallet_file.save(&serde_json::to_value(sp_wallet.clone())?)?; + sp_wallet.set_last_scan(end); + STORAGE + .get() + .unwrap() + .lock_anyhow()? + .wallet_file + .save(&serde_json::to_value(sp_wallet.clone())?)?; Ok(()) }