From be0ba64eefe8c82b78b1897a8a8889f8fd565df9 Mon Sep 17 00:00:00 2001 From: NicolasCantu Date: Wed, 5 Feb 2025 14:36:33 +0100 Subject: [PATCH] Refactor commit --- src/commit.rs | 162 +++++++++++++++++++------------------------------- 1 file changed, 62 insertions(+), 100 deletions(-) diff --git a/src/commit.rs b/src/commit.rs index 09d89c1..0393a4c 100644 --- a/src/commit.rs +++ b/src/commit.rs @@ -1,4 +1,3 @@ -use std::str::FromStr; use std::{ collections::HashMap, sync::{Mutex, MutexGuard, OnceLock}, @@ -13,8 +12,7 @@ use sdk_common::serialization::{OutPointMemberMap, OutPointProcessMap}; use sdk_common::silentpayments::create_transaction; use sdk_common::sp_client::spclient::Recipient; use sdk_common::network::{AnkFlag, CommitMessage, HandshakeMessage}; -use sdk_common::sp_client::bitcoin::consensus::deserialize; -use sdk_common::sp_client::bitcoin::{Amount, Transaction, OutPoint}; +use sdk_common::sp_client::bitcoin::{Amount, OutPoint}; use sdk_common::process::{lock_processes, Process, ProcessState}; use serde_json::json; @@ -22,48 +20,56 @@ use crate::message::{broadcast_message, BroadcastType}; use crate::{lock_freezed_utxos, MutexExt, DAEMON, STORAGE, WALLET}; pub(crate) fn handle_commit_request(commit_msg: CommitMessage) -> Result { - // Attempt to process the initial transaction or reference - match parse_initial_tx(&commit_msg.init_tx)? { - InitialTx::Transaction(tx) => handle_initial_transaction(tx, &commit_msg), - InitialTx::OutPoint(outpoint) => handle_existing_commitment(outpoint, commit_msg), - } -} - -// Enum to differentiate between parsed transaction and outpoint -enum InitialTx { - Transaction(Transaction), - OutPoint(OutPoint), -} - -// Parse the initial transaction as either a `Transaction` or `OutPoint` -fn parse_initial_tx(init_tx: &str) -> Result { - if let Ok(tx_hex) = Vec::from_hex(init_tx) { - let tx = deserialize::(&tx_hex)?; - Ok(InitialTx::Transaction(tx)) - } else if let Ok(outpoint) = OutPoint::from_str(init_tx) { - Ok(InitialTx::OutPoint(outpoint)) + let mut processes = lock_processes()?; + if let Some(process) = processes.get_mut(&commit_msg.process_id) { + handle_existing_commitment(process, &commit_msg)?; } else { - Err(Error::msg("init_tx must be a valid transaction or txid")) + let new_process = handle_new_process(&commit_msg)?; + // Cache the process + processes.insert( + commit_msg.process_id, + new_process, + ); } + + // Dump to disk + dump_cached_processes(processes.clone())?; + + // Add to frozen UTXOs + lock_freezed_utxos()?.insert(commit_msg.process_id); + + // Update processes with the + // Send an update to all connected client + let our_sp_address = WALLET + .get() + .ok_or(Error::msg("Wallet not initialized"))? + .get_wallet()? + .get_client() + .get_receiving_address(); + let mut new_process_map = HashMap::new(); + let new_process = processes.get(&commit_msg.process_id).unwrap().clone(); + new_process_map.insert(commit_msg.process_id, new_process); + let init_msg = HandshakeMessage::new( + our_sp_address, + OutPointMemberMap(HashMap::new()), + OutPointProcessMap(new_process_map), + ); + + if let Err(e) = broadcast_message( + AnkFlag::Handshake, + format!("{}", init_msg.to_string()), + BroadcastType::ToAll + ) + { + log::error!("Failed to send handshake message: {}", e); + } + + Ok(commit_msg.process_id) } -// Handle the case where `init_tx` is a new transaction -fn handle_initial_transaction(tx: Transaction, commit_msg: &CommitMessage) -> Result { - let process_id = OutPoint::new(tx.txid(), 0); - - // Validation tokens must be empty for the initial transaction - if !commit_msg.validation_tokens.is_empty() { - return Err(anyhow::Error::msg( - "Validation tokens must be empty".to_string(), - )); - } - - // Broadcast the transaction - let daemon = DAEMON.get().unwrap().lock_anyhow()?; - daemon.broadcast(&tx)?; - +fn handle_new_process(commit_msg: &CommitMessage) -> Result { // Process roles and commitments - let roles_only_map = json!({ "roles": serde_json::to_value(&commit_msg.roles)? }); + let roles_only_map = json!({ "roles": serde_json::to_value(commit_msg.roles.clone())? }); let parsed_roles = roles_only_map.extract_roles()?; // let roles_commitment = roles_only_map.hash_fields(root_commitment)?; @@ -77,7 +83,7 @@ fn handle_initial_transaction(tx: Transaction, commit_msg: &CommitMessage) -> Re let merkle_root_bin = pcd_commitment.create_merkle_tree()?.root().unwrap(); - if let Ok(pairing_process_id) = handle_member_list(&parsed_roles, process_id) { + if let Ok(pairing_process_id) = handle_member_list(&parsed_roles, commit_msg.process_id) { dump_cached_members()?; // Send a handshake message to every connected client if let Some(new_member) = lock_members().unwrap().get(&pairing_process_id) { @@ -108,9 +114,9 @@ fn handle_initial_transaction(tx: Transaction, commit_msg: &CommitMessage) -> Re } } - let mut new_process = Process::new(process_id); + let mut new_process = Process::new(commit_msg.process_id); let init_state = ProcessState { - commited_in: process_id, + commited_in: commit_msg.process_id, encrypted_pcd: roles_only_map, pcd_commitment: commit_msg.pcd_commitment.clone(), state_id: merkle_root_bin.to_lower_hex_string(), @@ -119,43 +125,7 @@ fn handle_initial_transaction(tx: Transaction, commit_msg: &CommitMessage) -> Re new_process.insert_concurrent_state(init_state)?; - // Cache the process - lock_processes()?.insert( - process_id, - new_process.clone(), - ); - - // Dump to disk - dump_cached_processes()?; - - // Add to frozen UTXOs - lock_freezed_utxos()?.insert(process_id); - - // Send an update to all connected client - let our_sp_address = WALLET - .get() - .ok_or(Error::msg("Wallet not initialized"))? - .get_wallet()? - .get_client() - .get_receiving_address(); - let mut new_process_map = HashMap::new(); - new_process_map.insert(process_id, new_process); - let init_msg = HandshakeMessage::new( - our_sp_address, - OutPointMemberMap(HashMap::new()), - OutPointProcessMap(new_process_map), - ); - - if let Err(e) = broadcast_message( - AnkFlag::Handshake, - format!("{}", init_msg.to_string()), - BroadcastType::ToAll - ) - { - log::error!("Failed to send handshake message: {}", e); - } - - Ok(process_id) + Ok(new_process) } pub static MEMBERLIST: OnceLock>> = OnceLock::new(); @@ -195,26 +165,20 @@ fn handle_member_list(roles: &HashMap, process_id: OutPo Err(Error::msg("Process is not a pairing process")) } -// Handle the case where `init_tx` is a reference to an existing outpoint -fn handle_existing_commitment(outpoint: OutPoint, commit_msg: CommitMessage) -> Result { - let mut processes = lock_processes()?; - let process = processes - .get_mut(&outpoint) - .ok_or(Error::msg(format!("Commitment not found: {}", outpoint)))?; - - match register_new_state(process, &commit_msg) { - Ok(new_state_id) => log::debug!("Registering new state for process {} with state id {}", outpoint, new_state_id), +fn handle_existing_commitment(process_to_udpate: &mut Process, commit_msg: &CommitMessage) -> Result<()> { + let process_id = process_to_udpate.get_process_id()?; + match register_new_state(process_to_udpate, &commit_msg) { + Ok(new_state_id) => log::debug!("Registering new state for process {} with state id {}", process_id, new_state_id), Err(existing_state_id) => log::debug!("State {} already exists", existing_state_id) } if commit_msg.validation_tokens.len() > 0 { - log::debug!("Received commit_msg with {} validation tokens for process {}", commit_msg.validation_tokens.len(), outpoint); + log::debug!("Received commit_msg with {} validation tokens for process {}", commit_msg.validation_tokens.len(), process_id); // If the validation succeed, we return a new tip - process_validation(process, commit_msg) - } else { - // We just return the current outpoint - process.get_process_tip() + process_validation(process_to_udpate, commit_msg)?; } + + Ok(()) } pub fn dump_cached_members() -> Result<(), anyhow::Error> { @@ -233,9 +197,7 @@ pub fn dump_cached_members() -> Result<(), anyhow::Error> { Ok(()) } -pub fn dump_cached_processes() -> Result<(), anyhow::Error> { - let processes = lock_processes()?.clone(); - +pub fn dump_cached_processes(processes: HashMap) -> Result<(), anyhow::Error> { let storage = STORAGE.get().ok_or(Error::msg("STORAGE is not initialized"))?.lock_anyhow()?; let processes_file = &storage.processes_file; @@ -277,7 +239,7 @@ fn register_new_state(process: &mut Process, commit_msg: &CommitMessage) -> Resu .iter() .any(|state| state.state_id == new_state_id) { - return Err(Error::msg(new_state_id)); + return Err(Error::msg("Can't add an already existing state")); } // Add the new state @@ -295,7 +257,7 @@ fn register_new_state(process: &mut Process, commit_msg: &CommitMessage) -> Resu } // Process validation for a state with validation tokens -fn process_validation(updated_process: &mut Process, commit_msg: CommitMessage) -> Result { +fn process_validation(updated_process: &mut Process, commit_msg: &CommitMessage) -> Result { let new_state_id = commit_msg.pcd_commitment.create_merkle_tree()?.root().ok_or(Error::msg("Invalid merkle tree"))?; let new_state_id_hex = new_state_id.to_lower_hex_string(); let mut state_to_validate = updated_process @@ -305,7 +267,7 @@ fn process_validation(updated_process: &mut Process, commit_msg: CommitMessage) .ok_or(Error::msg("Unknown state"))? .clone(); - state_to_validate.validation_tokens = commit_msg.validation_tokens; + state_to_validate.validation_tokens = commit_msg.validation_tokens.clone(); state_to_validate.is_valid(updated_process.get_latest_commited_state())?; let commited_in = commit_new_transaction(updated_process, state_to_validate)?;