Refactor commit
This commit is contained in:
parent
dcf76a6df6
commit
be0ba64eef
162
src/commit.rs
162
src/commit.rs
@ -1,4 +1,3 @@
|
||||
use std::str::FromStr;
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
sync::{Mutex, MutexGuard, OnceLock},
|
||||
@ -13,8 +12,7 @@ use sdk_common::serialization::{OutPointMemberMap, OutPointProcessMap};
|
||||
use sdk_common::silentpayments::create_transaction;
|
||||
use sdk_common::sp_client::spclient::Recipient;
|
||||
use sdk_common::network::{AnkFlag, CommitMessage, HandshakeMessage};
|
||||
use sdk_common::sp_client::bitcoin::consensus::deserialize;
|
||||
use sdk_common::sp_client::bitcoin::{Amount, Transaction, OutPoint};
|
||||
use sdk_common::sp_client::bitcoin::{Amount, OutPoint};
|
||||
use sdk_common::process::{lock_processes, Process, ProcessState};
|
||||
use serde_json::json;
|
||||
|
||||
@ -22,48 +20,56 @@ use crate::message::{broadcast_message, BroadcastType};
|
||||
use crate::{lock_freezed_utxos, MutexExt, DAEMON, STORAGE, WALLET};
|
||||
|
||||
pub(crate) fn handle_commit_request(commit_msg: CommitMessage) -> Result<OutPoint> {
|
||||
// Attempt to process the initial transaction or reference
|
||||
match parse_initial_tx(&commit_msg.init_tx)? {
|
||||
InitialTx::Transaction(tx) => handle_initial_transaction(tx, &commit_msg),
|
||||
InitialTx::OutPoint(outpoint) => handle_existing_commitment(outpoint, commit_msg),
|
||||
}
|
||||
}
|
||||
|
||||
// Enum to differentiate between parsed transaction and outpoint
|
||||
enum InitialTx {
|
||||
Transaction(Transaction),
|
||||
OutPoint(OutPoint),
|
||||
}
|
||||
|
||||
// Parse the initial transaction as either a `Transaction` or `OutPoint`
|
||||
fn parse_initial_tx(init_tx: &str) -> Result<InitialTx> {
|
||||
if let Ok(tx_hex) = Vec::from_hex(init_tx) {
|
||||
let tx = deserialize::<Transaction>(&tx_hex)?;
|
||||
Ok(InitialTx::Transaction(tx))
|
||||
} else if let Ok(outpoint) = OutPoint::from_str(init_tx) {
|
||||
Ok(InitialTx::OutPoint(outpoint))
|
||||
let mut processes = lock_processes()?;
|
||||
if let Some(process) = processes.get_mut(&commit_msg.process_id) {
|
||||
handle_existing_commitment(process, &commit_msg)?;
|
||||
} else {
|
||||
Err(Error::msg("init_tx must be a valid transaction or txid"))
|
||||
let new_process = handle_new_process(&commit_msg)?;
|
||||
// Cache the process
|
||||
processes.insert(
|
||||
commit_msg.process_id,
|
||||
new_process,
|
||||
);
|
||||
}
|
||||
|
||||
// Dump to disk
|
||||
dump_cached_processes(processes.clone())?;
|
||||
|
||||
// Add to frozen UTXOs
|
||||
lock_freezed_utxos()?.insert(commit_msg.process_id);
|
||||
|
||||
// Update processes with the
|
||||
// Send an update to all connected client
|
||||
let our_sp_address = WALLET
|
||||
.get()
|
||||
.ok_or(Error::msg("Wallet not initialized"))?
|
||||
.get_wallet()?
|
||||
.get_client()
|
||||
.get_receiving_address();
|
||||
let mut new_process_map = HashMap::new();
|
||||
let new_process = processes.get(&commit_msg.process_id).unwrap().clone();
|
||||
new_process_map.insert(commit_msg.process_id, new_process);
|
||||
let init_msg = HandshakeMessage::new(
|
||||
our_sp_address,
|
||||
OutPointMemberMap(HashMap::new()),
|
||||
OutPointProcessMap(new_process_map),
|
||||
);
|
||||
|
||||
if let Err(e) = broadcast_message(
|
||||
AnkFlag::Handshake,
|
||||
format!("{}", init_msg.to_string()),
|
||||
BroadcastType::ToAll
|
||||
)
|
||||
{
|
||||
log::error!("Failed to send handshake message: {}", e);
|
||||
}
|
||||
|
||||
Ok(commit_msg.process_id)
|
||||
}
|
||||
|
||||
// Handle the case where `init_tx` is a new transaction
|
||||
fn handle_initial_transaction(tx: Transaction, commit_msg: &CommitMessage) -> Result<OutPoint> {
|
||||
let process_id = OutPoint::new(tx.txid(), 0);
|
||||
|
||||
// Validation tokens must be empty for the initial transaction
|
||||
if !commit_msg.validation_tokens.is_empty() {
|
||||
return Err(anyhow::Error::msg(
|
||||
"Validation tokens must be empty".to_string(),
|
||||
));
|
||||
}
|
||||
|
||||
// Broadcast the transaction
|
||||
let daemon = DAEMON.get().unwrap().lock_anyhow()?;
|
||||
daemon.broadcast(&tx)?;
|
||||
|
||||
fn handle_new_process(commit_msg: &CommitMessage) -> Result<Process> {
|
||||
// Process roles and commitments
|
||||
let roles_only_map = json!({ "roles": serde_json::to_value(&commit_msg.roles)? });
|
||||
let roles_only_map = json!({ "roles": serde_json::to_value(commit_msg.roles.clone())? });
|
||||
|
||||
let parsed_roles = roles_only_map.extract_roles()?;
|
||||
// let roles_commitment = roles_only_map.hash_fields(root_commitment)?;
|
||||
@ -77,7 +83,7 @@ fn handle_initial_transaction(tx: Transaction, commit_msg: &CommitMessage) -> Re
|
||||
|
||||
let merkle_root_bin = pcd_commitment.create_merkle_tree()?.root().unwrap();
|
||||
|
||||
if let Ok(pairing_process_id) = handle_member_list(&parsed_roles, process_id) {
|
||||
if let Ok(pairing_process_id) = handle_member_list(&parsed_roles, commit_msg.process_id) {
|
||||
dump_cached_members()?;
|
||||
// Send a handshake message to every connected client
|
||||
if let Some(new_member) = lock_members().unwrap().get(&pairing_process_id) {
|
||||
@ -108,9 +114,9 @@ fn handle_initial_transaction(tx: Transaction, commit_msg: &CommitMessage) -> Re
|
||||
}
|
||||
}
|
||||
|
||||
let mut new_process = Process::new(process_id);
|
||||
let mut new_process = Process::new(commit_msg.process_id);
|
||||
let init_state = ProcessState {
|
||||
commited_in: process_id,
|
||||
commited_in: commit_msg.process_id,
|
||||
encrypted_pcd: roles_only_map,
|
||||
pcd_commitment: commit_msg.pcd_commitment.clone(),
|
||||
state_id: merkle_root_bin.to_lower_hex_string(),
|
||||
@ -119,43 +125,7 @@ fn handle_initial_transaction(tx: Transaction, commit_msg: &CommitMessage) -> Re
|
||||
|
||||
new_process.insert_concurrent_state(init_state)?;
|
||||
|
||||
// Cache the process
|
||||
lock_processes()?.insert(
|
||||
process_id,
|
||||
new_process.clone(),
|
||||
);
|
||||
|
||||
// Dump to disk
|
||||
dump_cached_processes()?;
|
||||
|
||||
// Add to frozen UTXOs
|
||||
lock_freezed_utxos()?.insert(process_id);
|
||||
|
||||
// Send an update to all connected client
|
||||
let our_sp_address = WALLET
|
||||
.get()
|
||||
.ok_or(Error::msg("Wallet not initialized"))?
|
||||
.get_wallet()?
|
||||
.get_client()
|
||||
.get_receiving_address();
|
||||
let mut new_process_map = HashMap::new();
|
||||
new_process_map.insert(process_id, new_process);
|
||||
let init_msg = HandshakeMessage::new(
|
||||
our_sp_address,
|
||||
OutPointMemberMap(HashMap::new()),
|
||||
OutPointProcessMap(new_process_map),
|
||||
);
|
||||
|
||||
if let Err(e) = broadcast_message(
|
||||
AnkFlag::Handshake,
|
||||
format!("{}", init_msg.to_string()),
|
||||
BroadcastType::ToAll
|
||||
)
|
||||
{
|
||||
log::error!("Failed to send handshake message: {}", e);
|
||||
}
|
||||
|
||||
Ok(process_id)
|
||||
Ok(new_process)
|
||||
}
|
||||
|
||||
pub static MEMBERLIST: OnceLock<Mutex<HashMap<OutPoint, Member>>> = OnceLock::new();
|
||||
@ -195,26 +165,20 @@ fn handle_member_list(roles: &HashMap<String, RoleDefinition>, process_id: OutPo
|
||||
Err(Error::msg("Process is not a pairing process"))
|
||||
}
|
||||
|
||||
// Handle the case where `init_tx` is a reference to an existing outpoint
|
||||
fn handle_existing_commitment(outpoint: OutPoint, commit_msg: CommitMessage) -> Result<OutPoint> {
|
||||
let mut processes = lock_processes()?;
|
||||
let process = processes
|
||||
.get_mut(&outpoint)
|
||||
.ok_or(Error::msg(format!("Commitment not found: {}", outpoint)))?;
|
||||
|
||||
match register_new_state(process, &commit_msg) {
|
||||
Ok(new_state_id) => log::debug!("Registering new state for process {} with state id {}", outpoint, new_state_id),
|
||||
fn handle_existing_commitment(process_to_udpate: &mut Process, commit_msg: &CommitMessage) -> Result<()> {
|
||||
let process_id = process_to_udpate.get_process_id()?;
|
||||
match register_new_state(process_to_udpate, &commit_msg) {
|
||||
Ok(new_state_id) => log::debug!("Registering new state for process {} with state id {}", process_id, new_state_id),
|
||||
Err(existing_state_id) => log::debug!("State {} already exists", existing_state_id)
|
||||
}
|
||||
|
||||
if commit_msg.validation_tokens.len() > 0 {
|
||||
log::debug!("Received commit_msg with {} validation tokens for process {}", commit_msg.validation_tokens.len(), outpoint);
|
||||
log::debug!("Received commit_msg with {} validation tokens for process {}", commit_msg.validation_tokens.len(), process_id);
|
||||
// If the validation succeed, we return a new tip
|
||||
process_validation(process, commit_msg)
|
||||
} else {
|
||||
// We just return the current outpoint
|
||||
process.get_process_tip()
|
||||
process_validation(process_to_udpate, commit_msg)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn dump_cached_members() -> Result<(), anyhow::Error> {
|
||||
@ -233,9 +197,7 @@ pub fn dump_cached_members() -> Result<(), anyhow::Error> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn dump_cached_processes() -> Result<(), anyhow::Error> {
|
||||
let processes = lock_processes()?.clone();
|
||||
|
||||
pub fn dump_cached_processes(processes: HashMap<OutPoint, Process>) -> Result<(), anyhow::Error> {
|
||||
let storage = STORAGE.get().ok_or(Error::msg("STORAGE is not initialized"))?.lock_anyhow()?;
|
||||
|
||||
let processes_file = &storage.processes_file;
|
||||
@ -277,7 +239,7 @@ fn register_new_state(process: &mut Process, commit_msg: &CommitMessage) -> Resu
|
||||
.iter()
|
||||
.any(|state| state.state_id == new_state_id)
|
||||
{
|
||||
return Err(Error::msg(new_state_id));
|
||||
return Err(Error::msg("Can't add an already existing state"));
|
||||
}
|
||||
|
||||
// Add the new state
|
||||
@ -295,7 +257,7 @@ fn register_new_state(process: &mut Process, commit_msg: &CommitMessage) -> Resu
|
||||
}
|
||||
|
||||
// Process validation for a state with validation tokens
|
||||
fn process_validation(updated_process: &mut Process, commit_msg: CommitMessage) -> Result<OutPoint> {
|
||||
fn process_validation(updated_process: &mut Process, commit_msg: &CommitMessage) -> Result<OutPoint> {
|
||||
let new_state_id = commit_msg.pcd_commitment.create_merkle_tree()?.root().ok_or(Error::msg("Invalid merkle tree"))?;
|
||||
let new_state_id_hex = new_state_id.to_lower_hex_string();
|
||||
let mut state_to_validate = updated_process
|
||||
@ -305,7 +267,7 @@ fn process_validation(updated_process: &mut Process, commit_msg: CommitMessage)
|
||||
.ok_or(Error::msg("Unknown state"))?
|
||||
.clone();
|
||||
|
||||
state_to_validate.validation_tokens = commit_msg.validation_tokens;
|
||||
state_to_validate.validation_tokens = commit_msg.validation_tokens.clone();
|
||||
state_to_validate.is_valid(updated_process.get_latest_commited_state())?;
|
||||
|
||||
let commited_in = commit_new_transaction(updated_process, state_to_validate)?;
|
||||
|
Loading…
x
Reference in New Issue
Block a user