use std::{ collections::HashMap, sync::{Mutex, MutexGuard, OnceLock}, }; use anyhow::{Error, Result}; use bitcoincore_rpc::bitcoin::hex::DisplayHex; use sdk_common::network::{AnkFlag, CommitMessage, HandshakeMessage}; use sdk_common::process::{lock_processes, Process, ProcessState}; use sdk_common::serialization::{OutPointMemberMap, OutPointProcessMap}; use sdk_common::silentpayments::create_transaction; use sdk_common::sp_client::bitcoin::{Amount, OutPoint}; use sdk_common::sp_client::{FeeRate, Recipient}; use sdk_common::{ pcd::Member, silentpayments::sign_transaction, sp_client::{silentpayments::SilentPaymentAddress, RecipientAddress}, }; use crate::{lock_freezed_utxos, MutexExt, DAEMON, STORAGE, WALLET}; use crate::{ message::{broadcast_message, BroadcastType}, CHAIN_TIP, }; pub(crate) fn handle_commit_request(commit_msg: CommitMessage) -> Result { let mut processes = lock_processes()?; if let Some(process) = processes.get_mut(&commit_msg.process_id) { handle_existing_commitment(process, &commit_msg)?; } else { let new_process = handle_new_process(&commit_msg)?; // Cache the process processes.insert(commit_msg.process_id, new_process); } // Dump to disk dump_cached_processes(processes.clone())?; // Add to frozen UTXOs lock_freezed_utxos()?.insert(commit_msg.process_id); // Send an update to all connected clients if wallet is available if let Some(wallet_lock) = WALLET.get() { let our_sp_address = wallet_lock .lock_anyhow()? .get_sp_client() .get_receiving_address(); let mut new_process_map = HashMap::new(); let new_process = processes.get(&commit_msg.process_id).unwrap().clone(); new_process_map.insert(commit_msg.process_id, new_process); let current_tip = CHAIN_TIP.load(std::sync::atomic::Ordering::SeqCst); let init_msg = HandshakeMessage::new( our_sp_address.to_string(), OutPointMemberMap(HashMap::new()), OutPointProcessMap(new_process_map), current_tip.into(), ); if let Err(e) = broadcast_message( AnkFlag::Handshake, format!("{}", init_msg.to_string()), BroadcastType::ToAll, ) { log::error!("Failed to send handshake message: {}", e); } } else { log::debug!("WALLET not initialized: skipping initial handshake broadcast"); } Ok(commit_msg.process_id) } fn send_members_update(pairing_process_id: OutPoint) -> Result<()> { dump_cached_members()?; // Broadcast members update if wallet is available if let Some(wallet_lock) = WALLET.get() { if let Some(new_member) = lock_members().unwrap().get(&pairing_process_id) { let our_sp_address = wallet_lock .lock_anyhow()? .get_sp_client() .get_receiving_address(); let mut new_member_map = HashMap::new(); new_member_map.insert(pairing_process_id, new_member.clone()); let init_msg = HandshakeMessage::new( our_sp_address.into(), OutPointMemberMap(new_member_map), OutPointProcessMap(HashMap::new()), CHAIN_TIP.load(std::sync::atomic::Ordering::SeqCst).into(), ); if let Err(e) = broadcast_message( AnkFlag::Handshake, format!("{}", init_msg.to_string()), BroadcastType::ToAll, ) { log::warn!("Failed to send handshake message: {}", e); } Ok(()) } else { Err(Error::msg(format!( "Failed to find new member with process id {}", pairing_process_id ))) } } else { log::debug!("WALLET not initialized: skipping members update broadcast"); Ok(()) } } fn handle_new_process(commit_msg: &CommitMessage) -> Result { let pcd_commitment = &commit_msg.pcd_commitment; let merkle_root_bin = pcd_commitment.create_merkle_tree()?.root().unwrap(); if let Ok(pairing_process_id) = handle_member_list(&commit_msg) { send_members_update(pairing_process_id)?; } let mut new_process = Process::new(commit_msg.process_id); let init_state = ProcessState { commited_in: commit_msg.process_id, roles: commit_msg.roles.clone(), pcd_commitment: commit_msg.pcd_commitment.clone(), state_id: merkle_root_bin, public_data: commit_msg.public_data.clone(), ..Default::default() }; new_process.insert_concurrent_state(init_state)?; Ok(new_process) } pub static MEMBERLIST: OnceLock>> = OnceLock::new(); pub fn lock_members() -> Result>, anyhow::Error> { MEMBERLIST .get_or_init(|| Mutex::new(HashMap::new())) .lock_anyhow() } fn handle_member_list(commit_msg: &CommitMessage) -> Result { //Check if there is one role with one member if commit_msg.roles.len() != 1 { return Err(Error::msg("Process is not a pairing process")); } if let Some(pairing_role) = commit_msg.roles.get("pairing") { if !pairing_role.members.is_empty() { return Err(Error::msg("Process is not a pairing process")); } } else { return Err(Error::msg("Process is not a pairing process")); } if let Ok(paired_addresses) = commit_msg.public_data.get_as_json("pairedAddresses") { let paired_addresses: Vec = serde_json::from_value(paired_addresses.clone())?; let mut memberlist = lock_members()?; memberlist.insert(commit_msg.process_id, Member::new(paired_addresses)); return Ok(commit_msg.process_id); } Err(Error::msg("Process is not a pairing process")) } fn handle_existing_commitment( process_to_udpate: &mut Process, commit_msg: &CommitMessage, ) -> Result<()> { let process_id = process_to_udpate.get_process_id()?; match register_new_state(process_to_udpate, &commit_msg) { Ok(new_state_id) => log::debug!( "Registering new state for process {} with state id {}", process_id, new_state_id.to_lower_hex_string() ), Err(existing_state_id) => log::debug!("State {} already exists", existing_state_id), } if commit_msg.validation_tokens.len() > 0 { log::debug!( "Received commit_msg with {} validation tokens for process {}", commit_msg.validation_tokens.len(), process_id ); // If the validation succeed, we return a new tip process_validation(process_to_udpate, commit_msg)?; if let Ok(pairing_process_id) = handle_member_list(commit_msg) { debug_assert_eq!(pairing_process_id, process_id); send_members_update(process_id)?; } } Ok(()) } pub fn dump_cached_members() -> Result<(), anyhow::Error> { let members = lock_members()?.clone(); let storage = STORAGE .get() .ok_or(Error::msg("STORAGE is not initialized"))? .lock_anyhow()?; let members_file = &storage.members_file; let members_map = OutPointMemberMap(members); let json = serde_json::to_value(&members_map)?; members_file.save(&json)?; log::debug!("saved members"); Ok(()) } pub fn dump_cached_processes(processes: HashMap) -> Result<(), anyhow::Error> { let storage = STORAGE .get() .ok_or(Error::msg("STORAGE is not initialized"))? .lock_anyhow()?; let processes_file = &storage.processes_file; let outpoints_map = OutPointProcessMap(processes); let json = serde_json::to_value(&outpoints_map)?; processes_file.save(&json)?; log::debug!("saved processes"); Ok(()) } // Register a new state fn register_new_state(process: &mut Process, commit_msg: &CommitMessage) -> Result<[u8; 32]> { let last_commited_state = process.get_latest_commited_state(); let new_state_id = commit_msg .pcd_commitment .create_merkle_tree()? .root() .unwrap(); if let Some(state) = last_commited_state { if new_state_id == state.state_id { return Err(Error::msg(format!( "{}", new_state_id.to_lower_hex_string() ))); } } let concurrent_states = process.get_latest_concurrent_states()?; let (empty_state, actual_states) = concurrent_states.split_last().unwrap(); let current_outpoint = empty_state.commited_in; // Ensure no duplicate states if actual_states .iter() .any(|state| state.state_id == new_state_id) { return Err(Error::msg(format!( "{}", new_state_id.to_lower_hex_string() ))); } // Add the new state let new_state = ProcessState { commited_in: current_outpoint, pcd_commitment: commit_msg.pcd_commitment.clone(), state_id: new_state_id.clone(), roles: commit_msg.roles.clone(), public_data: commit_msg.public_data.clone(), ..Default::default() }; process.insert_concurrent_state(new_state)?; Ok(new_state_id) } // Process validation for a state with validation tokens fn process_validation( updated_process: &mut Process, commit_msg: &CommitMessage, ) -> Result { let new_state_id = if commit_msg.pcd_commitment.is_empty() { // We're dealing with an obliteration attempt [0u8; 32] } else { commit_msg .pcd_commitment .create_merkle_tree()? .root() .ok_or(Error::msg("Invalid merkle tree"))? }; { let state_to_update = updated_process.get_state_for_id_mut(&new_state_id)?; // Complete with the received tokens state_to_update .validation_tokens .extend(commit_msg.validation_tokens.iter()); state_to_update.validation_tokens.sort_unstable(); state_to_update.validation_tokens.dedup(); } let state_to_validate = updated_process.get_state_for_id(&new_state_id)?; let members = lock_members()?.clone(); state_to_validate.is_valid( updated_process.get_latest_commited_state(), &OutPointMemberMap(members), )?; let commited_in = commit_new_transaction(updated_process, state_to_validate.clone())?; Ok(commited_in) } // Commit the new transaction and update the process state fn commit_new_transaction( updated_process: &mut Process, state_to_commit: ProcessState, ) -> Result { let sp_wallet = WALLET .get() .ok_or(Error::msg("Wallet not initialized"))? .lock_anyhow()?; let commitment_payload = Vec::from(state_to_commit.state_id); let mut recipients = vec![]; recipients.push(Recipient { address: RecipientAddress::SpAddress(sp_wallet.get_sp_client().get_receiving_address()), amount: Amount::from_sat(1000), }); // TODO not sure if this is still used // If the process is a pairing, we add another output that directly pays the owner of the process // We can find out simply by looking at the members list if let Some(member) = lock_members()?.get(&updated_process.get_process_id().unwrap()) { // We just pick one of the devices of the member at random en pay to it, member can then share the private key between all devices // For now we take the first address let address: SilentPaymentAddress = member.get_addresses().get(0).unwrap().as_str().try_into()?; recipients.push(Recipient { address: RecipientAddress::SpAddress(address), amount: Amount::from_sat(1000), }); } // This output is used to generate publicly available public keys without having to go through too many loops let daemon = DAEMON.get().unwrap().lock_anyhow()?; let fee_rate = daemon .estimate_fee(6) .unwrap_or(Amount::from_sat(1000)) .checked_div(1000) .unwrap(); let mut freezed_utxos = lock_freezed_utxos()?; let next_commited_in = updated_process.get_process_tip()?; if !freezed_utxos.contains(&next_commited_in) { return Err(Error::msg(format!( "Missing next commitment outpoint for process {}", updated_process.get_process_id()? ))); }; let unspent_outputs = sp_wallet.get_unspent_outputs(); let mut available_outpoints = vec![]; // We push the next_commited_in at the top of the available outpoints if let Some(output) = unspent_outputs.get(&next_commited_in) { available_outpoints.push((next_commited_in, output.clone())); } // We filter out freezed utxos for (outpoint, output) in unspent_outputs { if !freezed_utxos.contains(&outpoint) { available_outpoints.push((outpoint, output)); } } let unsigned_transaction = create_transaction( available_outpoints, sp_wallet.get_sp_client(), recipients, Some(commitment_payload), FeeRate::from_sat_per_vb(fee_rate.to_sat() as f32), )?; let final_tx = sign_transaction(sp_wallet.get_sp_client(), unsigned_transaction)?; daemon.test_mempool_accept(&final_tx)?; let txid = daemon.broadcast(&final_tx)?; let commited_in = OutPoint::new(txid, 0); freezed_utxos.insert(commited_in); freezed_utxos.remove(&next_commited_in); updated_process.remove_all_concurrent_states()?; updated_process.insert_concurrent_state(state_to_commit)?; updated_process.update_states_tip(commited_in)?; Ok(commited_in) } // TODO tests are broken, we need a complete overhaul to make it work again #[cfg(test)] mod tests { use super::*; use crate::daemon::RpcCall; use crate::DiskStorage; use crate::StateFile; use bitcoincore_rpc::bitcoin::consensus::deserialize; use bitcoincore_rpc::bitcoin::hex::DisplayHex; use bitcoincore_rpc::bitcoin::*; use mockall::mock; use mockall::predicate::*; use sdk_common::pcd::Member; use sdk_common::pcd::Pcd; use sdk_common::pcd::PcdCommitments; use sdk_common::pcd::RoleDefinition; use sdk_common::pcd::Roles; use sdk_common::pcd::ValidationRule; use sdk_common::process::CACHEDPROCESSES; use sdk_common::sp_client::bitcoin::consensus::serialize; use sdk_common::sp_client::bitcoin::hex::FromHex; use sdk_common::sp_client::silentpayments::SilentPaymentAddress; use serde_json::json; use serde_json::{Map, Value}; use std::collections::BTreeMap; use std::collections::HashMap; use std::path::PathBuf; use std::str::FromStr; use std::sync::Mutex; use std::sync::OnceLock; use std::sync::Once; const LOCAL_ADDRESS: &str = "sprt1qq222dhaxlzmjft2pa7qtspw2aw55vwfmtnjyllv5qrsqwm3nufxs6q7t88jf9asvd7rxhczt87de68du3jhem54xvqxy80wc6ep7lauxacsrq79v"; const INIT_TRANSACTION: &str = "02000000000102b01b832bf34cf87583c628839c5316546646dcd4939e339c1d83e693216cdfa00100000000fdffffffdd1ca865b199accd4801634488fca87e0cf81b36ee7e9bec526a8f922539b8670000000000fdffffff0200e1f505000000001600140798fac9f310cefad436ea928f0bdacf03a11be544e0f5050000000016001468a66f38e7c2c9e367577d6fad8532ae2c728ed2014043764b77de5041f80d19e3d872f205635f87486af015c00d2a3b205c694a0ae1cbc60e70b18bcd4470abbd777de63ae52600aba8f5ad1334cdaa6bcd931ab78b0140b56dd8e7ac310d6dcbc3eff37f111ced470990d911b55cd6ff84b74b579c17d0bba051ec23b738eeeedba405a626d95f6bdccb94c626db74c57792254bfc5a7c00000000"; const TMP_WALLET: &str = "/tmp/.4nk/wallet"; const TMP_PROCESSES: &str = "/tmp/.4nk/processes"; const TMP_MEMBERS: &str = "/tmp/.4nk/members"; static INIT_ONCE: Once = Once::new(); // Define the mock for Daemon with the required methods mock! { #[derive(Debug)] pub Daemon {} impl RpcCall for Daemon { fn connect( rpcwallet: Option, rpc_url: String, network: bitcoincore_rpc::bitcoin::Network, cookie_path: Option, ) -> Result where Self: Sized; fn estimate_fee(&self, nblocks: u16) -> Result; fn get_relay_fee(&self) -> Result; fn get_current_height(&self) -> Result; fn get_block(&self, block_hash: BlockHash) -> Result; fn get_filters(&self, block_height: u32) -> Result<(u32, BlockHash, bip158::BlockFilter)>; fn list_unspent_from_to( &self, minamt: Option, ) -> Result>; fn create_psbt( &self, unspents: &[bitcoincore_rpc::json::ListUnspentResultEntry], spk: ScriptBuf, network: Network, ) -> Result; fn process_psbt(&self, psbt: String) -> Result; fn finalize_psbt(&self, psbt: String) -> Result; fn get_network(&self) -> Result; fn test_mempool_accept(&self, tx: &Transaction) -> Result; fn broadcast(&self, tx: &Transaction) -> Result; fn get_transaction_info(&self, txid: &Txid, blockhash: Option) -> Result; fn get_transaction_hex(&self, txid: &Txid, blockhash: Option) -> Result; fn get_transaction(&self, txid: &Txid, blockhash: Option) -> Result; fn get_block_txids(&self, blockhash: BlockHash) -> Result>; fn get_mempool_txids(&self) -> Result>; fn get_mempool_entries(&self, txids: &[Txid]) -> Result>>; fn get_mempool_transactions(&self, txids: &[Txid]) -> Result>>; } } mock! { #[derive(Debug)] pub SpWallet { fn get_receiving_address(&self) -> Result; } } mock! { #[derive(Debug)] pub SilentPaymentWallet { fn get_sp_wallet(&self) -> Result; } } static WALLET: OnceLock = OnceLock::new(); pub fn initialize_static_variables() { INIT_ONCE.call_once(|| { if DAEMON.get().is_none() { let mut daemon = MockDaemon::new(); daemon .expect_broadcast() .withf(|tx: &Transaction| serialize(tx).to_lower_hex_string() == INIT_TRANSACTION) .returning(|tx| Ok(tx.txid())); DAEMON .set(Mutex::new(Box::new(daemon))) .expect("DAEMON should only be initialized once"); println!("Initialized DAEMON"); } if WALLET.get().is_none() { let mut wallet = MockSilentPaymentWallet::new(); wallet .expect_get_sp_wallet() .returning(|| Ok(MockSpWallet::new())); WALLET .set(wallet) .expect("WALLET should only be initialized once"); println!("Initialized WALLET"); } if CACHEDPROCESSES.get().is_none() { CACHEDPROCESSES .set(Mutex::new(HashMap::new())) .expect("CACHEDPROCESSES should only be initialized once"); println!("Initialized CACHEDPROCESSES"); } if STORAGE.get().is_none() { // Respect parent ".4nk" constraint: unique filenames under /tmp/.4nk let base_dir = PathBuf::from("/tmp/.4nk"); if let Err(e) = std::fs::create_dir_all(&base_dir) { eprintln!("Failed to create base test storage dir {:?}: {}", base_dir, e); } let uid = uuid::Uuid::new_v4(); let wallet_path = base_dir.join(format!("wallet_{}", uid)); let processes_path = base_dir.join(format!("processes_{}", uid)); let members_path = base_dir.join(format!("members_{}", uid)); let wallet_file = StateFile::new(wallet_path); let processes_file = StateFile::new(processes_path); let members_file = StateFile::new(members_path); wallet_file.create().unwrap(); processes_file.create().unwrap(); members_file.create().unwrap(); let disk_storage = DiskStorage { wallet_file, processes_file, members_file, }; STORAGE .set(Mutex::new(disk_storage)) .expect("STORAGE should initialize only once"); println!("Initialized STORAGE"); } }); } fn mock_commit_msg(process_id: OutPoint) -> CommitMessage { let field_names = [ "a".to_owned(), "b".to_owned(), "pub_a".to_owned(), "roles".to_owned(), ]; let pairing_id = OutPoint::from_str( "b0c8378ee68e9a73836b04423ddb6de9fc0e2e715e04ffe6aa34117bb1025f01:0", ) .unwrap(); let member = Member::new(vec![SilentPaymentAddress::try_from(LOCAL_ADDRESS).unwrap()]); let validation_rule = ValidationRule::new(1.0, Vec::from(field_names), 1.0).unwrap(); let role_def = RoleDefinition { members: vec![pairing_id], validation_rules: vec![validation_rule], storages: vec![], }; let roles = Roles::new(BTreeMap::from([(String::from("role_name"), role_def)])); let public_data = TryInto::::try_into(json!({"pub_a": Value::Null})).unwrap(); let clear_state = TryInto::::try_into(json!({"a": Value::Null, "b": Value::Null})).unwrap(); let pcd_commitments = PcdCommitments::new( &process_id, &Pcd::new(public_data.clone().into_iter().chain(clear_state).collect()), &roles, ) .unwrap(); let commit_msg = CommitMessage { process_id, roles, public_data, validation_tokens: vec![], pcd_commitment: pcd_commitments, error: None, }; commit_msg } #[test] fn test_handle_commit_new_process() { initialize_static_variables(); let init_tx = deserialize::(&Vec::from_hex(INIT_TRANSACTION).unwrap()).unwrap(); let init_txid = init_tx.txid(); let process_id = OutPoint::new(init_txid, 0); let commit_msg = mock_commit_msg(process_id); let roles = commit_msg.roles.clone(); let pcd_commitment = commit_msg.pcd_commitment.clone(); let empty_state = ProcessState { commited_in: process_id, ..Default::default() }; let result = handle_commit_request(commit_msg); assert_eq!(result.unwrap(), process_id); let cache = CACHEDPROCESSES.get().unwrap().lock().unwrap(); let updated_process = cache.get(&process_id); assert!(updated_process.is_some()); let concurrent_states = updated_process .unwrap() .get_latest_concurrent_states() .unwrap(); assert!(concurrent_states.len() >= 2); let first = &concurrent_states[0]; let second = &concurrent_states[concurrent_states.len() - 1]; assert_eq!(first.commited_in, process_id); assert_eq!(first.state_id, [0u8; 32]); assert_eq!(second.commited_in, process_id); assert!(!second.pcd_commitment.is_empty()); assert_ne!(second.state_id, [0u8; 32]); } #[test] fn test_handle_commit_new_state() { initialize_static_variables(); let init_tx = deserialize::(&Vec::from_hex(INIT_TRANSACTION).unwrap()).unwrap(); let init_txid = init_tx.txid(); let process_id = OutPoint::new(init_txid, 0); let commit_msg = mock_commit_msg(process_id); let roles = commit_msg.roles.clone(); let pcd_commitment = commit_msg.pcd_commitment.clone(); let process = Process::new(process_id); CACHEDPROCESSES .get() .unwrap() .lock() .unwrap() .insert(process_id, process); let result = handle_commit_request(commit_msg); assert_eq!(result.unwrap(), process_id); let cache = CACHEDPROCESSES.get().unwrap().lock().unwrap(); let updated_process = cache.get(&process_id); assert!(updated_process.is_some()); let concurrent_states = updated_process .unwrap() .get_latest_concurrent_states() .unwrap(); assert!(concurrent_states.len() >= 2); let first = &concurrent_states[0]; let second = &concurrent_states[concurrent_states.len() - 1]; assert_eq!(first.commited_in, process_id); assert_eq!(first.state_id, [0u8; 32]); assert_eq!(second.commited_in, process_id); assert!(!second.pcd_commitment.is_empty()); assert_ne!(second.state_id, [0u8; 32]); } }