use std::str::FromStr; use std::{ collections::HashMap, sync::{Mutex, MutexGuard, OnceLock}, }; use anyhow::{Error, Result}; use bitcoincore_rpc::bitcoin::hex::DisplayHex; use hex::FromHex; use sdk_common::pcd::{Member, Pcd, RoleDefinition}; use sdk_common::serialization::{MemberOutPointMap, OutPointProcessMap}; use sdk_common::silentpayments::create_transaction; use sdk_common::sp_client::spclient::Recipient; use sdk_common::network::CommitMessage; use sdk_common::sp_client::bitcoin::consensus::deserialize; use sdk_common::sp_client::bitcoin::{Amount, Transaction, OutPoint}; use sdk_common::process::{lock_processes, Process, ProcessState, CACHEDPROCESSES}; use serde_json::json; use serde_json::Value; use crate::{lock_freezed_utxos, MutexExt, DAEMON, STORAGE, WALLET}; pub(crate) fn handle_commit_request(commit_msg: CommitMessage) -> Result { // Attempt to process the initial transaction or reference match parse_initial_tx(&commit_msg.init_tx)? { InitialTx::Transaction(tx) => handle_initial_transaction(tx, &commit_msg), InitialTx::OutPoint(outpoint) => handle_existing_commitment(outpoint, commit_msg), } } // Enum to differentiate between parsed transaction and outpoint enum InitialTx { Transaction(Transaction), OutPoint(OutPoint), } // Parse the initial transaction as either a `Transaction` or `OutPoint` fn parse_initial_tx(init_tx: &str) -> Result { if let Ok(tx_hex) = Vec::from_hex(init_tx) { let tx = deserialize::(&tx_hex)?; Ok(InitialTx::Transaction(tx)) } else if let Ok(outpoint) = OutPoint::from_str(init_tx) { Ok(InitialTx::OutPoint(outpoint)) } else { Err(Error::msg("init_tx must be a valid transaction or txid")) } } // Handle the case where `init_tx` is a new transaction fn handle_initial_transaction(tx: Transaction, commit_msg: &CommitMessage) -> Result { let root_commitment = OutPoint::new(tx.txid(), 0); // Validation tokens must be empty for the initial transaction if !commit_msg.validation_tokens.is_empty() { return Err(anyhow::Error::msg( "Validation tokens must be empty".to_string(), )); } // Broadcast the transaction let daemon = DAEMON.get().unwrap().lock_anyhow()?; daemon.broadcast(&tx)?; // Process roles and commitments let roles_only_map = json!({ "roles": serde_json::to_value(&commit_msg.roles)? }); let parsed_roles = roles_only_map.extract_roles()?; // let roles_commitment = roles_only_map.hash_fields(root_commitment)?; // TODO make that kind of check reliable, needs more work on json serialization // if roles_commitment.get("roles") != commit_msg.pcd_commitment.get("roles") { // return Err(Error::msg("Role commitment mismatch")); // } let pcd_commitment = &commit_msg.pcd_commitment; let merkle_root_bin = pcd_commitment.create_merkle_tree()?.root().unwrap(); match handle_member_list(pcd_commitment, &parsed_roles, root_commitment) { Ok(()) => { dump_cached_members()?; } Err(e) => log::error!("{}", e) } let mut new_process = Process::new(root_commitment); let init_state = ProcessState { commited_in: root_commitment, encrypted_pcd: roles_only_map, pcd_commitment: commit_msg.pcd_commitment.clone(), state_id: merkle_root_bin.to_lower_hex_string(), ..Default::default() }; new_process.insert_concurrent_state(init_state)?; // Cache the process lock_processes()?.insert( root_commitment, new_process, ); // Dump to disk dump_cached_processes()?; // Add to frozen UTXOs lock_freezed_utxos()?.insert(root_commitment); Ok(root_commitment) } pub static MEMBERLIST: OnceLock>> = OnceLock::new(); pub fn lock_members() -> Result>, anyhow::Error> { MEMBERLIST .get_or_init(|| Mutex::new(HashMap::new())) .lock_anyhow() } fn handle_member_list(pcd_commitment: &Value, roles: &HashMap, root_commitment: OutPoint) -> Result<()> { //Check if the keys exists in the pcd if let Value::Object(ref pcd_map) = pcd_commitment { if !pcd_map.contains_key("key_parity") || !pcd_map.contains_key("session_privkey") || !pcd_map.contains_key("session_pubkey") { return Err(Error::msg("Process is not a pairing process")); } } else { return Err(Error::msg("Pcd is missing")); } //Check if there is one role with one member if roles.len() != 1 { return Err(Error::msg("Process is not a pairing process")); } if let Some(owner_role) = roles.get("owner") { if owner_role.members.len() == 1 { let member = owner_role.members.get(0).unwrap(); let mut memberlist = lock_members()?; memberlist.insert( member.clone(), root_commitment, ); return Ok(()); } } Err(Error::msg("Process is not a pairing process")) } // Handle the case where `init_tx` is a reference to an existing outpoint fn handle_existing_commitment(outpoint: OutPoint, commit_msg: CommitMessage) -> Result { let mut processes = lock_processes()?; let process = processes .get_mut(&outpoint) .ok_or(Error::msg(format!("Commitment not found: {}", outpoint)))?; if commit_msg.validation_tokens.is_empty() { register_new_state(process, commit_msg) } else { process_validation(process, commit_msg) } } pub fn dump_cached_members() -> Result<(), anyhow::Error> { let members = lock_members()?.clone(); let storage = STORAGE.get().ok_or(Error::msg("STORAGE is not initialized"))?.lock_anyhow()?; let members_file = &storage.members_file; let members_map = MemberOutPointMap(members); let json = serde_json::to_value(&members_map)?; members_file.save(&json)?; log::debug!("saved members"); Ok(()) } pub fn dump_cached_processes() -> Result<(), anyhow::Error> { let processes = lock_processes()?.clone(); let storage = STORAGE.get().ok_or(Error::msg("STORAGE is not initialized"))?.lock_anyhow()?; let processes_file = &storage.processes_file; let outpoints_map = OutPointProcessMap(processes); let json = serde_json::to_value(&outpoints_map)?; processes_file.save(&json)?; log::debug!("saved processes"); Ok(()) } // Register a new state when validation tokens are empty fn register_new_state(commitment: &mut Process, commit_msg: CommitMessage) -> Result { let concurrent_states = commitment.get_latest_concurrent_states()?; let (empty_state, actual_states) = concurrent_states.split_last().unwrap(); let current_outpoint = empty_state.commited_in; let new_state_id = commit_msg.pcd_commitment.create_merkle_tree()?.root().unwrap().to_lower_hex_string(); // Ensure no duplicate states if actual_states .iter() .any(|state| state.state_id == new_state_id) { return Err(Error::msg("Proposed state already exists")); } // Add the new state let roles_only_map = json!({ "roles": serde_json::to_value(&commit_msg.roles)? }); let new_state = ProcessState { commited_in: current_outpoint, pcd_commitment: commit_msg.pcd_commitment, encrypted_pcd: roles_only_map, state_id: new_state_id, ..Default::default() }; commitment.insert_concurrent_state(new_state)?; Ok(current_outpoint) } // Process validation for a state with validation tokens fn process_validation(updated_process: &mut Process, commit_msg: CommitMessage) -> Result { let new_state_id = commit_msg.pcd_commitment.create_merkle_tree()?.root().ok_or(Error::msg("Invalid merkle tree"))?; let new_state_id_hex = new_state_id.to_lower_hex_string(); let mut state_to_validate = updated_process .get_latest_concurrent_states()? .into_iter() .find(|state| state.state_id == new_state_id_hex) .ok_or(Error::msg("Unknown state"))? .clone(); state_to_validate.validation_tokens = commit_msg.validation_tokens; state_to_validate.is_valid(updated_process.get_latest_commited_state())?; let commited_in = commit_new_transaction(updated_process, state_to_validate)?; Ok(commited_in) } // Commit the new transaction and update the process state fn commit_new_transaction( updated_process: &mut Process, state_to_commit: ProcessState, ) -> Result { let sp_wallet = WALLET .get() .ok_or(Error::msg("Wallet not initialized"))? .get_wallet()?; let recipient = Recipient { address: sp_wallet.get_client().get_receiving_address(), amount: Amount::from_sat(1000), nb_outputs: 1, }; let daemon = DAEMON.get().unwrap().lock_anyhow()?; let fee_rate = daemon.estimate_fee(6) .unwrap_or(Amount::from_sat(1000)) .checked_div(1000) .unwrap(); let mut freezed_utxos = lock_freezed_utxos()?; let next_commited_in = updated_process.get_process_tip()?; let mandatory_input = if let Some(next_outpoint) = freezed_utxos.take(&next_commited_in) { next_outpoint } else { return Err(Error::msg(format!("Missing next commitment outpoint for process {}", updated_process.get_process_id()?))); }; let commitment_payload = Vec::from_hex(state_to_commit.state_id)?; let psbt = create_transaction( vec![mandatory_input], &freezed_utxos, &sp_wallet, vec![recipient], Some(commitment_payload), fee_rate, None, )?; let new_tx = psbt.extract_tx()?; let txid = daemon.broadcast(&new_tx)?; let commited_in = OutPoint::new(txid, 0); freezed_utxos.insert(commited_in); updated_process.update_states_tip(commited_in)?; Ok(commited_in) } #[cfg(test)] mod tests { use super::*; use bitcoincore_rpc::bitcoin::hex::DisplayHex; use sdk_common::pcd::Member; use sdk_common::pcd::RoleDefinition; use sdk_common::pcd::ValidationRule; use sdk_common::sp_client::silentpayments::utils::SilentPaymentAddress; use serde_json::json; use mockall::predicate::*; use mockall::mock; use std::collections::HashMap; use std::sync::Mutex; use bitcoincore_rpc::bitcoin::*; use crate::daemon::RpcCall; use std::sync::OnceLock; use sdk_common::sp_client::bitcoin::consensus::serialize; use serde_json::{Map, Value}; const LOCAL_ADDRESS: &str = "sprt1qq222dhaxlzmjft2pa7qtspw2aw55vwfmtnjyllv5qrsqwm3nufxs6q7t88jf9asvd7rxhczt87de68du3jhem54xvqxy80wc6ep7lauxacsrq79v"; const INIT_TRANSACTION: &str = "02000000000102b01b832bf34cf87583c628839c5316546646dcd4939e339c1d83e693216cdfa00100000000fdffffffdd1ca865b199accd4801634488fca87e0cf81b36ee7e9bec526a8f922539b8670000000000fdffffff0200e1f505000000001600140798fac9f310cefad436ea928f0bdacf03a11be544e0f5050000000016001468a66f38e7c2c9e367577d6fad8532ae2c728ed2014043764b77de5041f80d19e3d872f205635f87486af015c00d2a3b205c694a0ae1cbc60e70b18bcd4470abbd777de63ae52600aba8f5ad1334cdaa6bcd931ab78b0140b56dd8e7ac310d6dcbc3eff37f111ced470990d911b55cd6ff84b74b579c17d0bba051ec23b738eeeedba405a626d95f6bdccb94c626db74c57792254bfc5a7c00000000"; // Define the mock for Daemon with the required methods mock! { #[derive(Debug)] pub Daemon {} impl RpcCall for Daemon { fn connect( rpcwallet: Option, rpc_url: String, network: bitcoincore_rpc::bitcoin::Network, ) -> Result where Self: Sized; fn estimate_fee(&self, nblocks: u16) -> Result; fn get_relay_fee(&self) -> Result; fn get_current_height(&self) -> Result; fn get_block(&self, block_hash: BlockHash) -> Result; fn get_filters(&self, block_height: u32) -> Result<(u32, BlockHash, bip158::BlockFilter)>; fn list_unspent_from_to( &self, minamt: Option, ) -> Result>; fn create_psbt( &self, unspents: &[bitcoincore_rpc::json::ListUnspentResultEntry], spk: ScriptBuf, network: Network, ) -> Result; fn process_psbt(&self, psbt: String) -> Result; fn finalize_psbt(&self, psbt: String) -> Result; fn get_network(&self) -> Result; fn test_mempool_accept( &self, tx: &Transaction, ) -> Result; fn broadcast(&self, tx: &Transaction) -> Result; fn get_transaction_info( &self, txid: &Txid, blockhash: Option, ) -> Result; fn get_transaction_hex( &self, txid: &Txid, blockhash: Option, ) -> Result; fn get_transaction( &self, txid: &Txid, blockhash: Option, ) -> Result; fn get_block_txids(&self, blockhash: BlockHash) -> Result>; fn get_mempool_txids(&self) -> Result>; fn get_mempool_entries( &self, txids: &[Txid], ) -> Result>>; fn get_mempool_transactions( &self, txids: &[Txid], ) -> Result>>; } } mock! { #[derive(Debug)] pub SpWallet { fn get_receiving_address(&self) -> Result; } } mock! { #[derive(Debug)] pub SilentPaymentWallet { fn get_sp_wallet(&self) -> Result; } } static WALLET: OnceLock = OnceLock::new(); pub fn initialize_static_variables() { if DAEMON.get().is_none() { let mut daemon = MockDaemon::new(); daemon.expect_broadcast() .withf(|tx: &Transaction| serialize(tx).to_lower_hex_string() == INIT_TRANSACTION) .returning(|tx| Ok(tx.txid())); DAEMON.set(Mutex::new(Box::new(daemon))).expect("DAEMON should only be initialized once"); println!("Initialized DAEMON"); } if WALLET.get().is_none() { let mut wallet = MockSilentPaymentWallet::new(); wallet.expect_get_sp_wallet().returning(|| Ok(MockSpWallet::new())); WALLET.set(wallet).expect("WALLET should only be initialized once"); println!("Initialized WALLET"); } if CACHEDPROCESSES.get().is_none() { CACHEDPROCESSES .set(Mutex::new(HashMap::new())) .expect("CACHEDPROCESSES should only be initialized once"); println!("Initialized CACHEDPROCESSES"); } } fn mock_commit_msg(init_tx: Transaction, first: bool) -> CommitMessage { let field_name = "roles".to_owned(); let member = Member::new(vec![SilentPaymentAddress::try_from(LOCAL_ADDRESS).unwrap()]).unwrap(); let validation_rule = ValidationRule::new(1.0, vec![field_name.clone()], 1.0).unwrap(); let role_def = RoleDefinition { members: vec![member], validation_rules: vec![validation_rule], storages: vec![], }; let roles = HashMap::from([(String::from("role_name"), role_def)]); let pcd_commitment = json!({field_name: "b30212b9649054b71f938fbe0d1c08e72de95bdb12b8008082795c6e9c4ad26a"}); let init_tx = if first { serialize(&init_tx).to_lower_hex_string() } else { OutPoint::new(init_tx.txid(), 0).to_string() }; let commit_msg = CommitMessage { init_tx, roles: roles.clone(), validation_tokens: vec![], pcd_commitment: pcd_commitment.clone(), error: None, }; commit_msg } #[test] fn test_handle_commit_new_process() { initialize_static_variables(); let init_tx = deserialize::(&Vec::from_hex(INIT_TRANSACTION).unwrap()).unwrap(); let init_txid = init_tx.txid(); let init_commitment = OutPoint::new(init_txid, 0); let commit_msg = mock_commit_msg(init_tx, true); let roles = commit_msg.roles.clone(); let pcd_commitment = commit_msg.pcd_commitment.clone(); let empty_state = ProcessState { commited_in: init_commitment, ..Default::default() }; let result = handle_commit_request(commit_msg); assert_eq!(result.unwrap(), init_commitment); let cache = CACHEDPROCESSES.get().unwrap().lock().unwrap(); let updated_process = cache.get(&init_commitment); assert!(updated_process.is_some()); let concurrent_states = updated_process.unwrap().get_latest_concurrent_states().unwrap(); // Constructing the roles_map that was inserted in the process let roles_object = serde_json::to_value(roles).unwrap(); let mut roles_map = Map::new(); roles_map.insert("roles".to_owned(), roles_object); let new_state = ProcessState { commited_in: init_commitment, pcd_commitment, encrypted_pcd: Value::Object(roles_map), ..Default::default() }; let target = vec![&empty_state, &new_state]; assert_eq!(concurrent_states, target); } #[test] fn test_handle_commit_new_state() { initialize_static_variables(); let init_tx = deserialize::(&Vec::from_hex(INIT_TRANSACTION).unwrap()).unwrap(); let init_txid = init_tx.txid(); let init_commitment = OutPoint::new(init_txid, 0); let commit_msg = mock_commit_msg(init_tx, false); let roles = commit_msg.roles.clone(); let pcd_commitment = commit_msg.pcd_commitment.clone(); let process = Process::new(init_commitment); CACHEDPROCESSES.get().unwrap().lock().unwrap().insert(init_commitment, process); let result = handle_commit_request(commit_msg); assert_eq!(result.unwrap(), init_commitment); let cache = CACHEDPROCESSES.get().unwrap().lock().unwrap(); let updated_process = cache.get(&init_commitment); assert!(updated_process.is_some()); let concurrent_states = updated_process.unwrap().get_latest_concurrent_states().unwrap(); let roles_object = serde_json::to_value(roles).unwrap(); let mut roles_map = Map::new(); roles_map.insert("roles".to_owned(), roles_object); let new_state = ProcessState { commited_in: init_commitment, pcd_commitment, encrypted_pcd: Value::Object(roles_map), ..Default::default() }; let empty_state = ProcessState { commited_in: init_commitment, ..Default::default() }; let target = vec![&empty_state, &new_state]; assert_eq!(concurrent_states, target); } // #[test] // fn test_handle_commit_request_invalid_init_tx() { // let commit_msg = CommitMessage { // init_tx: "invalid_tx_hex".to_string(), // roles: HashMap::new(), // validation_tokens: vec![], // pcd_commitment: json!({"roles": "expected_roles"}).as_object().unwrap().clone(), // }; // // Call the function under test // let result = handle_commit_request(commit_msg); // // Assertions for error // assert!(result.is_err()); // assert_eq!(result.unwrap_err().to_string(), "init_tx must be a valid transaction or txid"); // } // // Example test for adding a new state to an existing commitment // #[test] // fn test_handle_commit_request_add_state() { // // Set up data for adding a state to an existing commitment // let commit_msg = CommitMessage { // init_tx: "existing_outpoint_hex".to_string(), // roles: HashMap::new(), // validation_tokens: vec![], // pcd_commitment: json!({"roles": "expected_roles"}).as_object().unwrap().clone(), // }; // // Mock daemon and cache initialization // let mut daemon = MockDaemon::new(); // daemon.expect_broadcast().returning(|_| Ok(Txid::new())); // DAEMON.set(Arc::new(Mutex::new(daemon))).unwrap(); // let process_state = Process::new(vec![], vec![]); // CACHEDPROCESSES.lock().unwrap().insert(OutPoint::new("mock_txid", 0), process_state); // // Run the function // let result = handle_commit_request(commit_msg); // // Assert success and that a new state was added // assert!(result.is_ok()); // assert_eq!(result.unwrap(), OutPoint::new("mock_txid", 0)); // } // // Additional tests for errors and validation tokens would follow a similar setup }