sdk_relay/src/commit.rs

611 lines
21 KiB
Rust

use std::str::FromStr;
use std::{
collections::HashMap,
sync::{Mutex, MutexGuard, OnceLock},
};
use anyhow::{Error, Result};
use bitcoincore_rpc::bitcoin::hex::DisplayHex;
use hex::FromHex;
use sdk_common::pcd::{Member, Pcd, RoleDefinition};
use sdk_common::serialization::{MemberOutPointMap, OutPointProcessMap};
use sdk_common::silentpayments::create_transaction;
use sdk_common::sp_client::spclient::Recipient;
use sdk_common::network::CommitMessage;
use sdk_common::sp_client::bitcoin::consensus::deserialize;
use sdk_common::sp_client::bitcoin::{Amount, Transaction, OutPoint};
use sdk_common::process::{lock_processes, Process, ProcessState};
use serde_json::json;
use serde_json::Value;
use crate::{lock_freezed_utxos, MutexExt, DAEMON, STORAGE, WALLET};
pub(crate) fn handle_commit_request(commit_msg: CommitMessage) -> Result<OutPoint> {
// Attempt to process the initial transaction or reference
match parse_initial_tx(&commit_msg.init_tx)? {
InitialTx::Transaction(tx) => handle_initial_transaction(tx, &commit_msg),
InitialTx::OutPoint(outpoint) => handle_existing_commitment(outpoint, commit_msg),
}
}
// Enum to differentiate between parsed transaction and outpoint
enum InitialTx {
Transaction(Transaction),
OutPoint(OutPoint),
}
// Parse the initial transaction as either a `Transaction` or `OutPoint`
fn parse_initial_tx(init_tx: &str) -> Result<InitialTx> {
if let Ok(tx_hex) = Vec::from_hex(init_tx) {
let tx = deserialize::<Transaction>(&tx_hex)?;
Ok(InitialTx::Transaction(tx))
} else if let Ok(outpoint) = OutPoint::from_str(init_tx) {
Ok(InitialTx::OutPoint(outpoint))
} else {
Err(Error::msg("init_tx must be a valid transaction or txid"))
}
}
// Handle the case where `init_tx` is a new transaction
fn handle_initial_transaction(tx: Transaction, commit_msg: &CommitMessage) -> Result<OutPoint> {
let root_commitment = OutPoint::new(tx.txid(), 0);
// Validation tokens must be empty for the initial transaction
if !commit_msg.validation_tokens.is_empty() {
return Err(anyhow::Error::msg(
"Validation tokens must be empty".to_string(),
));
}
// Broadcast the transaction
let daemon = DAEMON.get().unwrap().lock_anyhow()?;
daemon.broadcast(&tx)?;
// Process roles and commitments
let roles_only_map = json!({ "roles": serde_json::to_value(&commit_msg.roles)? });
let parsed_roles = roles_only_map.extract_roles()?;
// let roles_commitment = roles_only_map.hash_fields(root_commitment)?;
// TODO make that kind of check reliable, needs more work on json serialization
// if roles_commitment.get("roles") != commit_msg.pcd_commitment.get("roles") {
// return Err(Error::msg("Role commitment mismatch"));
// }
let pcd_commitment = &commit_msg.pcd_commitment;
let merkle_root_bin = pcd_commitment.create_merkle_tree()?.root().unwrap();
match handle_member_list(pcd_commitment, &parsed_roles, root_commitment) {
Ok(()) => {
dump_cached_members()?;
}
Err(e) => log::error!("{}", e)
}
let mut new_process = Process::new(root_commitment);
let init_state = ProcessState {
commited_in: root_commitment,
encrypted_pcd: roles_only_map,
pcd_commitment: commit_msg.pcd_commitment.clone(),
state_id: merkle_root_bin.to_lower_hex_string(),
..Default::default()
};
new_process.insert_concurrent_state(init_state)?;
// Cache the process
lock_processes()?.insert(
root_commitment,
new_process,
);
// Dump to disk
dump_cached_processes()?;
// Add to frozen UTXOs
lock_freezed_utxos()?.insert(root_commitment);
Ok(root_commitment)
}
pub static MEMBERLIST: OnceLock<Mutex<HashMap<Member, OutPoint>>> = OnceLock::new();
pub fn lock_members() -> Result<MutexGuard<'static, HashMap<Member, OutPoint>>, anyhow::Error> {
MEMBERLIST
.get_or_init(|| Mutex::new(HashMap::new()))
.lock_anyhow()
}
fn handle_member_list(pcd_commitment: &Value, roles: &HashMap<String, RoleDefinition>, root_commitment: OutPoint) -> Result<()> {
//Check if the keys exists in the pcd
if let Value::Object(ref pcd_map) = pcd_commitment {
if !pcd_map.contains_key("key_parity")
|| !pcd_map.contains_key("session_privkey")
|| !pcd_map.contains_key("session_pubkey")
{
return Err(Error::msg("Process is not a pairing process"));
}
} else {
return Err(Error::msg("Pcd is missing"));
}
//Check if there is one role with one member
if roles.len() != 1 {
return Err(Error::msg("Process is not a pairing process"));
}
if let Some(owner_role) = roles.get("owner") {
if owner_role.members.len() == 1 {
let member = owner_role.members.get(0).unwrap();
let mut memberlist = lock_members()?;
memberlist.insert(
member.clone(),
root_commitment,
);
return Ok(());
}
}
Err(Error::msg("Process is not a pairing process"))
}
// Handle the case where `init_tx` is a reference to an existing outpoint
fn handle_existing_commitment(outpoint: OutPoint, commit_msg: CommitMessage) -> Result<OutPoint> {
let mut processes = lock_processes()?;
let process = processes
.get_mut(&outpoint)
.ok_or(Error::msg(format!("Commitment not found: {}", outpoint)))?;
if commit_msg.validation_tokens.is_empty() {
register_new_state(process, commit_msg)
} else {
log::debug!("Received commit_msg with {} validation tokens for process {}", commit_msg.validation_tokens.len(), outpoint);
process_validation(process, commit_msg)
}
}
pub fn dump_cached_members() -> Result<(), anyhow::Error> {
let members = lock_members()?.clone();
let storage = STORAGE.get().ok_or(Error::msg("STORAGE is not initialized"))?.lock_anyhow()?;
let members_file = &storage.members_file;
let members_map = MemberOutPointMap(members);
let json = serde_json::to_value(&members_map)?;
members_file.save(&json)?;
log::debug!("saved members");
Ok(())
}
pub fn dump_cached_processes() -> Result<(), anyhow::Error> {
let processes = lock_processes()?.clone();
let storage = STORAGE.get().ok_or(Error::msg("STORAGE is not initialized"))?.lock_anyhow()?;
let processes_file = &storage.processes_file;
let outpoints_map = OutPointProcessMap(processes);
let json = serde_json::to_value(&outpoints_map)?;
processes_file.save(&json)?;
log::debug!("saved processes");
Ok(())
}
// Register a new state when validation tokens are empty
fn register_new_state(commitment: &mut Process, commit_msg: CommitMessage) -> Result<OutPoint> {
let concurrent_states = commitment.get_latest_concurrent_states()?;
let (empty_state, actual_states) = concurrent_states.split_last().unwrap();
let current_outpoint = empty_state.commited_in;
let new_state_id = commit_msg.pcd_commitment.create_merkle_tree()?.root().unwrap().to_lower_hex_string();
// Ensure no duplicate states
if actual_states
.iter()
.any(|state| state.state_id == new_state_id)
{
return Err(Error::msg("Proposed state already exists"));
}
// Add the new state
let roles_only_map = json!({ "roles": serde_json::to_value(&commit_msg.roles)? });
let new_state = ProcessState {
commited_in: current_outpoint,
pcd_commitment: commit_msg.pcd_commitment,
encrypted_pcd: roles_only_map,
state_id: new_state_id,
..Default::default()
};
commitment.insert_concurrent_state(new_state)?;
Ok(current_outpoint)
}
// Process validation for a state with validation tokens
fn process_validation(updated_process: &mut Process, commit_msg: CommitMessage) -> Result<OutPoint> {
let new_state_id = commit_msg.pcd_commitment.create_merkle_tree()?.root().ok_or(Error::msg("Invalid merkle tree"))?;
let new_state_id_hex = new_state_id.to_lower_hex_string();
let mut state_to_validate = updated_process
.get_latest_concurrent_states()?
.into_iter()
.find(|state| state.state_id == new_state_id_hex)
.ok_or(Error::msg("Unknown state"))?
.clone();
state_to_validate.validation_tokens = commit_msg.validation_tokens;
state_to_validate.is_valid(updated_process.get_latest_commited_state())?;
let commited_in = commit_new_transaction(updated_process, state_to_validate)?;
Ok(commited_in)
}
// Commit the new transaction and update the process state
fn commit_new_transaction(
updated_process: &mut Process,
state_to_commit: ProcessState,
) -> Result<OutPoint> {
let sp_wallet = WALLET
.get()
.ok_or(Error::msg("Wallet not initialized"))?
.get_wallet()?;
let recipient = Recipient {
address: sp_wallet.get_client().get_receiving_address(),
amount: Amount::from_sat(1000),
nb_outputs: 1,
};
let daemon = DAEMON.get().unwrap().lock_anyhow()?;
let fee_rate = daemon.estimate_fee(6)
.unwrap_or(Amount::from_sat(1000))
.checked_div(1000)
.unwrap();
let mut freezed_utxos = lock_freezed_utxos()?;
let next_commited_in = updated_process.get_process_tip()?;
let mandatory_input = if let Some(next_outpoint) = freezed_utxos.take(&next_commited_in) {
next_outpoint
} else {
return Err(Error::msg(format!("Missing next commitment outpoint for process {}", updated_process.get_process_id()?)));
};
let commitment_payload = Vec::from_hex(state_to_commit.state_id.clone())?;
let psbt = create_transaction(
vec![mandatory_input],
&freezed_utxos,
&sp_wallet,
vec![recipient],
Some(commitment_payload),
fee_rate,
None,
)?;
let new_tx = psbt.extract_tx()?;
let txid = daemon.broadcast(&new_tx)?;
let commited_in = OutPoint::new(txid, 0);
freezed_utxos.insert(commited_in);
updated_process.remove_all_concurrent_states()?;
updated_process.insert_concurrent_state(state_to_commit)?;
updated_process.update_states_tip(commited_in)?;
Ok(commited_in)
}
#[cfg(test)]
mod tests {
use super::*;
use bitcoincore_rpc::bitcoin::hex::DisplayHex;
use sdk_common::pcd::Member;
use sdk_common::pcd::RoleDefinition;
use sdk_common::pcd::ValidationRule;
use sdk_common::process::CACHEDPROCESSES;
use sdk_common::sp_client::silentpayments::utils::SilentPaymentAddress;
use serde_json::json;
use mockall::predicate::*;
use mockall::mock;
use std::collections::HashMap;
use std::sync::Mutex;
use bitcoincore_rpc::bitcoin::*;
use crate::daemon::RpcCall;
use std::sync::OnceLock;
use sdk_common::sp_client::bitcoin::consensus::serialize;
use serde_json::{Map, Value};
const LOCAL_ADDRESS: &str = "sprt1qq222dhaxlzmjft2pa7qtspw2aw55vwfmtnjyllv5qrsqwm3nufxs6q7t88jf9asvd7rxhczt87de68du3jhem54xvqxy80wc6ep7lauxacsrq79v";
const INIT_TRANSACTION: &str = "02000000000102b01b832bf34cf87583c628839c5316546646dcd4939e339c1d83e693216cdfa00100000000fdffffffdd1ca865b199accd4801634488fca87e0cf81b36ee7e9bec526a8f922539b8670000000000fdffffff0200e1f505000000001600140798fac9f310cefad436ea928f0bdacf03a11be544e0f5050000000016001468a66f38e7c2c9e367577d6fad8532ae2c728ed2014043764b77de5041f80d19e3d872f205635f87486af015c00d2a3b205c694a0ae1cbc60e70b18bcd4470abbd777de63ae52600aba8f5ad1334cdaa6bcd931ab78b0140b56dd8e7ac310d6dcbc3eff37f111ced470990d911b55cd6ff84b74b579c17d0bba051ec23b738eeeedba405a626d95f6bdccb94c626db74c57792254bfc5a7c00000000";
// Define the mock for Daemon with the required methods
mock! {
#[derive(Debug)]
pub Daemon {}
impl RpcCall for Daemon {
fn connect(
rpcwallet: Option<String>,
rpc_url: String,
network: bitcoincore_rpc::bitcoin::Network,
) -> Result<Self> where Self: Sized;
fn estimate_fee(&self, nblocks: u16) -> Result<Amount>;
fn get_relay_fee(&self) -> Result<Amount>;
fn get_current_height(&self) -> Result<u64>;
fn get_block(&self, block_hash: BlockHash) -> Result<Block>;
fn get_filters(&self, block_height: u32) -> Result<(u32, BlockHash, bip158::BlockFilter)>;
fn list_unspent_from_to(
&self,
minamt: Option<Amount>,
) -> Result<Vec<bitcoincore_rpc::json::ListUnspentResultEntry>>;
fn create_psbt(
&self,
unspents: &[bitcoincore_rpc::json::ListUnspentResultEntry],
spk: ScriptBuf,
network: Network,
) -> Result<String>;
fn process_psbt(&self, psbt: String) -> Result<String>;
fn finalize_psbt(&self, psbt: String) -> Result<String>;
fn get_network(&self) -> Result<Network>;
fn test_mempool_accept(
&self,
tx: &Transaction,
) -> Result<crate::bitcoin_json::TestMempoolAcceptResult>;
fn broadcast(&self, tx: &Transaction) -> Result<Txid>;
fn get_transaction_info(
&self,
txid: &Txid,
blockhash: Option<BlockHash>,
) -> Result<Value>;
fn get_transaction_hex(
&self,
txid: &Txid,
blockhash: Option<BlockHash>,
) -> Result<Value>;
fn get_transaction(
&self,
txid: &Txid,
blockhash: Option<BlockHash>,
) -> Result<Transaction>;
fn get_block_txids(&self, blockhash: BlockHash) -> Result<Vec<Txid>>;
fn get_mempool_txids(&self) -> Result<Vec<Txid>>;
fn get_mempool_entries(
&self,
txids: &[Txid],
) -> Result<Vec<Result<bitcoincore_rpc::json::GetMempoolEntryResult>>>;
fn get_mempool_transactions(
&self,
txids: &[Txid],
) -> Result<Vec<Result<Transaction>>>;
}
}
mock! {
#[derive(Debug)]
pub SpWallet {
fn get_receiving_address(&self) -> Result<String>;
}
}
mock! {
#[derive(Debug)]
pub SilentPaymentWallet {
fn get_sp_wallet(&self) -> Result<MockSpWallet>;
}
}
static WALLET: OnceLock<MockSilentPaymentWallet> = OnceLock::new();
pub fn initialize_static_variables() {
if DAEMON.get().is_none() {
let mut daemon = MockDaemon::new();
daemon.expect_broadcast()
.withf(|tx: &Transaction| serialize(tx).to_lower_hex_string() == INIT_TRANSACTION)
.returning(|tx| Ok(tx.txid()));
DAEMON.set(Mutex::new(Box::new(daemon))).expect("DAEMON should only be initialized once");
println!("Initialized DAEMON");
}
if WALLET.get().is_none() {
let mut wallet = MockSilentPaymentWallet::new();
wallet.expect_get_sp_wallet().returning(|| Ok(MockSpWallet::new()));
WALLET.set(wallet).expect("WALLET should only be initialized once");
println!("Initialized WALLET");
}
if CACHEDPROCESSES.get().is_none() {
CACHEDPROCESSES
.set(Mutex::new(HashMap::new()))
.expect("CACHEDPROCESSES should only be initialized once");
println!("Initialized CACHEDPROCESSES");
}
}
fn mock_commit_msg(init_tx: Transaction, first: bool) -> CommitMessage {
let field_name = "roles".to_owned();
let member = Member::new(vec![SilentPaymentAddress::try_from(LOCAL_ADDRESS).unwrap()]).unwrap();
let validation_rule = ValidationRule::new(1.0, vec![field_name.clone()], 1.0).unwrap();
let role_def = RoleDefinition {
members: vec![member],
validation_rules: vec![validation_rule],
storages: vec![],
};
let roles = HashMap::from([(String::from("role_name"), role_def)]);
let pcd_commitment = json!({field_name: "b30212b9649054b71f938fbe0d1c08e72de95bdb12b8008082795c6e9c4ad26a"});
let init_tx = if first { serialize(&init_tx).to_lower_hex_string() } else { OutPoint::new(init_tx.txid(), 0).to_string() };
let commit_msg = CommitMessage {
init_tx,
roles: roles.clone(),
validation_tokens: vec![],
pcd_commitment: pcd_commitment.clone(),
error: None,
};
commit_msg
}
#[test]
fn test_handle_commit_new_process() {
initialize_static_variables();
let init_tx = deserialize::<Transaction>(&Vec::from_hex(INIT_TRANSACTION).unwrap()).unwrap();
let init_txid = init_tx.txid();
let init_commitment = OutPoint::new(init_txid, 0);
let commit_msg = mock_commit_msg(init_tx, true);
let roles = commit_msg.roles.clone();
let pcd_commitment = commit_msg.pcd_commitment.clone();
let empty_state = ProcessState {
commited_in: init_commitment,
..Default::default()
};
let result = handle_commit_request(commit_msg);
assert_eq!(result.unwrap(), init_commitment);
let cache = CACHEDPROCESSES.get().unwrap().lock().unwrap();
let updated_process = cache.get(&init_commitment);
assert!(updated_process.is_some());
let concurrent_states = updated_process.unwrap().get_latest_concurrent_states().unwrap();
// Constructing the roles_map that was inserted in the process
let roles_object = serde_json::to_value(roles).unwrap();
let mut roles_map = Map::new();
roles_map.insert("roles".to_owned(), roles_object);
let new_state = ProcessState {
commited_in: init_commitment,
pcd_commitment,
encrypted_pcd: Value::Object(roles_map),
..Default::default()
};
let target = vec![&empty_state, &new_state];
assert_eq!(concurrent_states, target);
}
#[test]
fn test_handle_commit_new_state() {
initialize_static_variables();
let init_tx = deserialize::<Transaction>(&Vec::from_hex(INIT_TRANSACTION).unwrap()).unwrap();
let init_txid = init_tx.txid();
let init_commitment = OutPoint::new(init_txid, 0);
let commit_msg = mock_commit_msg(init_tx, false);
let roles = commit_msg.roles.clone();
let pcd_commitment = commit_msg.pcd_commitment.clone();
let process = Process::new(init_commitment);
CACHEDPROCESSES.get().unwrap().lock().unwrap().insert(init_commitment, process);
let result = handle_commit_request(commit_msg);
assert_eq!(result.unwrap(), init_commitment);
let cache = CACHEDPROCESSES.get().unwrap().lock().unwrap();
let updated_process = cache.get(&init_commitment);
assert!(updated_process.is_some());
let concurrent_states = updated_process.unwrap().get_latest_concurrent_states().unwrap();
let roles_object = serde_json::to_value(roles).unwrap();
let mut roles_map = Map::new();
roles_map.insert("roles".to_owned(), roles_object);
let new_state = ProcessState {
commited_in: init_commitment,
pcd_commitment,
encrypted_pcd: Value::Object(roles_map),
..Default::default()
};
let empty_state = ProcessState {
commited_in: init_commitment,
..Default::default()
};
let target = vec![&empty_state, &new_state];
assert_eq!(concurrent_states, target);
}
// #[test]
// fn test_handle_commit_request_invalid_init_tx() {
// let commit_msg = CommitMessage {
// init_tx: "invalid_tx_hex".to_string(),
// roles: HashMap::new(),
// validation_tokens: vec![],
// pcd_commitment: json!({"roles": "expected_roles"}).as_object().unwrap().clone(),
// };
// // Call the function under test
// let result = handle_commit_request(commit_msg);
// // Assertions for error
// assert!(result.is_err());
// assert_eq!(result.unwrap_err().to_string(), "init_tx must be a valid transaction or txid");
// }
// // Example test for adding a new state to an existing commitment
// #[test]
// fn test_handle_commit_request_add_state() {
// // Set up data for adding a state to an existing commitment
// let commit_msg = CommitMessage {
// init_tx: "existing_outpoint_hex".to_string(),
// roles: HashMap::new(),
// validation_tokens: vec![],
// pcd_commitment: json!({"roles": "expected_roles"}).as_object().unwrap().clone(),
// };
// // Mock daemon and cache initialization
// let mut daemon = MockDaemon::new();
// daemon.expect_broadcast().returning(|_| Ok(Txid::new()));
// DAEMON.set(Arc::new(Mutex::new(daemon))).unwrap();
// let process_state = Process::new(vec![], vec![]);
// CACHEDPROCESSES.lock().unwrap().insert(OutPoint::new("mock_txid", 0), process_state);
// // Run the function
// let result = handle_commit_request(commit_msg);
// // Assert success and that a new state was added
// assert!(result.is_ok());
// assert_eq!(result.unwrap(), OutPoint::new("mock_txid", 0));
// }
// // Additional tests for errors and validation tokens would follow a similar setup
}