Refactor commit

This commit is contained in:
NicolasCantu 2025-02-05 14:36:33 +01:00
parent efa1129e45
commit fd8c40e09a

View File

@ -1,4 +1,3 @@
use std::str::FromStr;
use std::{
collections::HashMap,
sync::{Mutex, MutexGuard, OnceLock},
@ -13,8 +12,7 @@ use sdk_common::serialization::{OutPointMemberMap, OutPointProcessMap};
use sdk_common::silentpayments::create_transaction;
use sdk_common::sp_client::spclient::Recipient;
use sdk_common::network::{AnkFlag, CommitMessage, HandshakeMessage};
use sdk_common::sp_client::bitcoin::consensus::deserialize;
use sdk_common::sp_client::bitcoin::{Amount, Transaction, OutPoint};
use sdk_common::sp_client::bitcoin::{Amount, OutPoint};
use sdk_common::process::{lock_processes, Process, ProcessState};
use serde_json::json;
@ -22,48 +20,56 @@ use crate::message::{broadcast_message, BroadcastType};
use crate::{lock_freezed_utxos, MutexExt, DAEMON, STORAGE, WALLET};
pub(crate) fn handle_commit_request(commit_msg: CommitMessage) -> Result<OutPoint> {
// Attempt to process the initial transaction or reference
match parse_initial_tx(&commit_msg.init_tx)? {
InitialTx::Transaction(tx) => handle_initial_transaction(tx, &commit_msg),
InitialTx::OutPoint(outpoint) => handle_existing_commitment(outpoint, commit_msg),
}
}
// Enum to differentiate between parsed transaction and outpoint
enum InitialTx {
Transaction(Transaction),
OutPoint(OutPoint),
}
// Parse the initial transaction as either a `Transaction` or `OutPoint`
fn parse_initial_tx(init_tx: &str) -> Result<InitialTx> {
if let Ok(tx_hex) = Vec::from_hex(init_tx) {
let tx = deserialize::<Transaction>(&tx_hex)?;
Ok(InitialTx::Transaction(tx))
} else if let Ok(outpoint) = OutPoint::from_str(init_tx) {
Ok(InitialTx::OutPoint(outpoint))
let mut processes = lock_processes()?;
if let Some(process) = processes.get_mut(&commit_msg.process_id) {
handle_existing_commitment(process, &commit_msg)?;
} else {
Err(Error::msg("init_tx must be a valid transaction or txid"))
let new_process = handle_new_process(&commit_msg)?;
// Cache the process
processes.insert(
commit_msg.process_id,
new_process,
);
}
// Dump to disk
dump_cached_processes(processes.clone())?;
// Add to frozen UTXOs
lock_freezed_utxos()?.insert(commit_msg.process_id);
// Update processes with the
// Send an update to all connected client
let our_sp_address = WALLET
.get()
.ok_or(Error::msg("Wallet not initialized"))?
.get_wallet()?
.get_client()
.get_receiving_address();
let mut new_process_map = HashMap::new();
let new_process = processes.get(&commit_msg.process_id).unwrap().clone();
new_process_map.insert(commit_msg.process_id, new_process);
let init_msg = HandshakeMessage::new(
our_sp_address,
OutPointMemberMap(HashMap::new()),
OutPointProcessMap(new_process_map),
);
if let Err(e) = broadcast_message(
AnkFlag::Handshake,
format!("{}", init_msg.to_string()),
BroadcastType::ToAll
)
{
log::error!("Failed to send handshake message: {}", e);
}
Ok(commit_msg.process_id)
}
// Handle the case where `init_tx` is a new transaction
fn handle_initial_transaction(tx: Transaction, commit_msg: &CommitMessage) -> Result<OutPoint> {
let process_id = OutPoint::new(tx.txid(), 0);
// Validation tokens must be empty for the initial transaction
if !commit_msg.validation_tokens.is_empty() {
return Err(anyhow::Error::msg(
"Validation tokens must be empty".to_string(),
));
}
// Broadcast the transaction
let daemon = DAEMON.get().unwrap().lock_anyhow()?;
daemon.broadcast(&tx)?;
fn handle_new_process(commit_msg: &CommitMessage) -> Result<Process> {
// Process roles and commitments
let roles_only_map = json!({ "roles": serde_json::to_value(&commit_msg.roles)? });
let roles_only_map = json!({ "roles": serde_json::to_value(commit_msg.roles.clone())? });
let parsed_roles = roles_only_map.extract_roles()?;
// let roles_commitment = roles_only_map.hash_fields(root_commitment)?;
@ -77,7 +83,7 @@ fn handle_initial_transaction(tx: Transaction, commit_msg: &CommitMessage) -> Re
let merkle_root_bin = pcd_commitment.create_merkle_tree()?.root().unwrap();
if let Ok(pairing_process_id) = handle_member_list(&parsed_roles, process_id) {
if let Ok(pairing_process_id) = handle_member_list(&parsed_roles, commit_msg.process_id) {
dump_cached_members()?;
// Send a handshake message to every connected client
if let Some(new_member) = lock_members().unwrap().get(&pairing_process_id) {
@ -108,9 +114,9 @@ fn handle_initial_transaction(tx: Transaction, commit_msg: &CommitMessage) -> Re
}
}
let mut new_process = Process::new(process_id);
let mut new_process = Process::new(commit_msg.process_id);
let init_state = ProcessState {
commited_in: process_id,
commited_in: commit_msg.process_id,
encrypted_pcd: roles_only_map,
pcd_commitment: commit_msg.pcd_commitment.clone(),
state_id: merkle_root_bin.to_lower_hex_string(),
@ -119,43 +125,7 @@ fn handle_initial_transaction(tx: Transaction, commit_msg: &CommitMessage) -> Re
new_process.insert_concurrent_state(init_state)?;
// Cache the process
lock_processes()?.insert(
process_id,
new_process.clone(),
);
// Dump to disk
dump_cached_processes()?;
// Add to frozen UTXOs
lock_freezed_utxos()?.insert(process_id);
// Send an update to all connected client
let our_sp_address = WALLET
.get()
.ok_or(Error::msg("Wallet not initialized"))?
.get_wallet()?
.get_client()
.get_receiving_address();
let mut new_process_map = HashMap::new();
new_process_map.insert(process_id, new_process);
let init_msg = HandshakeMessage::new(
our_sp_address,
OutPointMemberMap(HashMap::new()),
OutPointProcessMap(new_process_map),
);
if let Err(e) = broadcast_message(
AnkFlag::Handshake,
format!("{}", init_msg.to_string()),
BroadcastType::ToAll
)
{
log::error!("Failed to send handshake message: {}", e);
}
Ok(process_id)
Ok(new_process)
}
pub static MEMBERLIST: OnceLock<Mutex<HashMap<OutPoint, Member>>> = OnceLock::new();
@ -195,26 +165,20 @@ fn handle_member_list(roles: &HashMap<String, RoleDefinition>, process_id: OutPo
Err(Error::msg("Process is not a pairing process"))
}
// Handle the case where `init_tx` is a reference to an existing outpoint
fn handle_existing_commitment(outpoint: OutPoint, commit_msg: CommitMessage) -> Result<OutPoint> {
let mut processes = lock_processes()?;
let process = processes
.get_mut(&outpoint)
.ok_or(Error::msg(format!("Commitment not found: {}", outpoint)))?;
match register_new_state(process, &commit_msg) {
Ok(new_state_id) => log::debug!("Registering new state for process {} with state id {}", outpoint, new_state_id),
fn handle_existing_commitment(process_to_udpate: &mut Process, commit_msg: &CommitMessage) -> Result<()> {
let process_id = process_to_udpate.get_process_id()?;
match register_new_state(process_to_udpate, &commit_msg) {
Ok(new_state_id) => log::debug!("Registering new state for process {} with state id {}", process_id, new_state_id),
Err(existing_state_id) => log::debug!("State {} already exists", existing_state_id)
}
if commit_msg.validation_tokens.len() > 0 {
log::debug!("Received commit_msg with {} validation tokens for process {}", commit_msg.validation_tokens.len(), outpoint);
log::debug!("Received commit_msg with {} validation tokens for process {}", commit_msg.validation_tokens.len(), process_id);
// If the validation succeed, we return a new tip
process_validation(process, commit_msg)
} else {
// We just return the current outpoint
process.get_process_tip()
process_validation(process_to_udpate, commit_msg)?;
}
Ok(())
}
pub fn dump_cached_members() -> Result<(), anyhow::Error> {
@ -233,9 +197,7 @@ pub fn dump_cached_members() -> Result<(), anyhow::Error> {
Ok(())
}
pub fn dump_cached_processes() -> Result<(), anyhow::Error> {
let processes = lock_processes()?.clone();
pub fn dump_cached_processes(processes: HashMap<OutPoint, Process>) -> Result<(), anyhow::Error> {
let storage = STORAGE.get().ok_or(Error::msg("STORAGE is not initialized"))?.lock_anyhow()?;
let processes_file = &storage.processes_file;
@ -277,7 +239,7 @@ fn register_new_state(process: &mut Process, commit_msg: &CommitMessage) -> Resu
.iter()
.any(|state| state.state_id == new_state_id)
{
return Err(Error::msg(new_state_id));
return Err(Error::msg("Can't add an already existing state"));
}
// Add the new state
@ -295,7 +257,7 @@ fn register_new_state(process: &mut Process, commit_msg: &CommitMessage) -> Resu
}
// Process validation for a state with validation tokens
fn process_validation(updated_process: &mut Process, commit_msg: CommitMessage) -> Result<OutPoint> {
fn process_validation(updated_process: &mut Process, commit_msg: &CommitMessage) -> Result<OutPoint> {
let new_state_id = commit_msg.pcd_commitment.create_merkle_tree()?.root().ok_or(Error::msg("Invalid merkle tree"))?;
let new_state_id_hex = new_state_id.to_lower_hex_string();
let mut state_to_validate = updated_process
@ -305,7 +267,7 @@ fn process_validation(updated_process: &mut Process, commit_msg: CommitMessage)
.ok_or(Error::msg("Unknown state"))?
.clone();
state_to_validate.validation_tokens = commit_msg.validation_tokens;
state_to_validate.validation_tokens = commit_msg.validation_tokens.clone();
state_to_validate.is_valid(updated_process.get_latest_commited_state())?;
let commited_in = commit_new_transaction(updated_process, state_to_validate)?;