From bc625955e9c4eda755787c248b33975bfdf36122 Mon Sep 17 00:00:00 2001 From: Sosthene Date: Fri, 4 Oct 2024 09:22:03 +0200 Subject: [PATCH] Add commit logic --- src/commit.rs | 202 +++++++++++++++++++++++++++++++++++++++++++++++++ src/main.rs | 10 ++- src/message.rs | 27 ++++++- 3 files changed, 234 insertions(+), 5 deletions(-) create mode 100644 src/commit.rs diff --git a/src/commit.rs b/src/commit.rs new file mode 100644 index 0000000..b0bb5be --- /dev/null +++ b/src/commit.rs @@ -0,0 +1,202 @@ +use std::collections::HashMap; + +use anyhow::{Error, Result}; + +use hex::FromHex; +use log::debug; +use sdk_common::pcd::AnkPcdHash; +use sdk_common::silentpayments::create_transaction; +use sdk_common::sp_client::spclient::Recipient; +use sdk_common::{error::AnkError, network::CommitMessage}; +use sdk_common::sp_client::bitcoin::consensus::deserialize; +use sdk_common::sp_client::bitcoin::{Amount, Transaction, Txid, OutPoint}; +use sdk_common::process::{Process, ProcessState, CACHEDPROCESSES}; +use serde_json::Value; + +use crate::{lock_freezed_utxos, MutexExt, DAEMON, WALLET}; + +pub(crate) fn handle_commit_request(commit_msg: CommitMessage) -> Result { + // Attempt to deserialize `init_tx` as a `Transaction` + if let Ok(tx) = deserialize::(&Vec::from_hex(&commit_msg.init_tx)?) { + // This is the first transaction of a chain of commitments + + // Ensure the transaction has only one output + if tx.output.len() != 1 { + return Err(AnkError::NewTxError( + "Transaction must have only one output".to_string(), + ))?; + } + + // TODO: Check that the output pays us + + // Validation tokens must be empty for the initial transaction + if !commit_msg.validation_tokens.is_empty() { + return Err(AnkError::NewTxError( + "Validation tokens must be empty".to_string(), + ))?; + } + + // Obtain the daemon instance and broadcast the transaction + let daemon = DAEMON.get().unwrap().lock_anyhow()?; + daemon.broadcast(&tx)?; + + // Create the root commitment outpoint + let root_commitment = OutPoint::new(tx.txid(), 0); + + // Initialize the process state + let init_state = ProcessState { + commited_in: root_commitment, + encrypted_pcd: Value::Object(commit_msg.encrypted_pcd), + keys: commit_msg.keys, + validation_tokens: vec![], + }; + + // Access the cached processes and insert the new commitment + let mut commitments = CACHEDPROCESSES + .get() + .ok_or(Error::msg("CACHEDPROCESSES not initialized"))? + .lock_anyhow()?; + // We are confident that `root_commitment` doesn't exist in the map + commitments.insert( + root_commitment, + Process::new(vec![init_state], HashMap::new(), vec![]), + ); + + // Add the outpoint to the list of frozen UTXOs + let new_root_outpoint = OutPoint::new(tx.txid(), 0); + lock_freezed_utxos()?.insert(new_root_outpoint); + + // Wait for validation tokens to spend the new output and commit the hash + Ok(new_root_outpoint) + } + // Attempt to deserialize `init_tx` as an `OutPoint` + else if let Ok(outpoint) = deserialize::(&Vec::from_hex(&commit_msg.init_tx)?) { + // Reference the first transaction of a chain + let mut commitments = CACHEDPROCESSES + .get() + .ok_or(Error::msg("CACHEDPROCESSES not initialized"))? + .lock_anyhow()?; + let commitment = commitments + .get_mut(&outpoint) + .ok_or(Error::msg("Commitment not found"))?; + + let pcd_hash = AnkPcdHash::from_map(&commit_msg.encrypted_pcd); + + if commit_msg.validation_tokens.is_empty() { + // Register a new state if validation tokens are empty + + // Get all the latest concurrent states + let concurrent_states = commitment.get_latest_concurrent_states(); + + let current_outpoint = concurrent_states.first().unwrap().commited_in; + + // Check for existing states with the same PCD hash + if concurrent_states + .into_iter() + .any(|state| AnkPcdHash::from_value(&state.encrypted_pcd) == pcd_hash) + { + return Err(anyhow::Error::msg("Proposed state already exists")); + } + + // Insert the new process state + commitment.insert_state(ProcessState { + commited_in: current_outpoint, + encrypted_pcd: Value::Object(commit_msg.encrypted_pcd), + keys: commit_msg.keys, + validation_tokens: vec![], + }); + + Ok(current_outpoint) + } else { + // Validation tokens are provided; process the pending state + + let new_state_commitment = AnkPcdHash::from_map(&commit_msg.encrypted_pcd); + + // Clone the previous state, we'll need it for validation purpose + if let Some(mut state_to_validate) = commitment.get_latest_concurrent_states() + .into_iter() + .find(|s| { + AnkPcdHash::from_value(&s.encrypted_pcd) == new_state_commitment + }) + .cloned() + { + // We update the validation tokens for our clone + state_to_validate.validation_tokens = commit_msg.validation_tokens; + let previous_state = commitment.get_previous_state(&state_to_validate); + if state_to_validate.is_valid(previous_state)? { + // If the new state is valid we commit it in a new transaction that spends the last commited_in + // By spending it we also know the next outpoint to monitor for the next state + // We add a placeholder state with that information at the tip of the chain + // We also remove all concurrent states that didn't get validated + let mut freezed_utxos = lock_freezed_utxos()?; + let mandatory_input = if freezed_utxos.remove(&state_to_validate.commited_in) { + state_to_validate.commited_in + } else { + // This shoudln't happen, except if we send the commitment message to another relay + // We need to think about the case a relay is down + // Probably relays should have backups of their keys so that it can be bootstrapped again and resume commiting + return Err(Error::msg("Commitment utxo doesn't exist")) + }; + + // TODO we should make this sequence more atomic by handling errors in a way that we get back to the current state if any step fails + + let sp_wallet = WALLET.get().ok_or(Error::msg("Wallet not initialized"))?.get_wallet()?; + + let recipient = Recipient { + address: sp_wallet.get_client().get_receiving_address(), + amount: Amount::from_sat(1000), + nb_outputs: 1 + }; + + let daemon = DAEMON.get().unwrap().lock_anyhow()?; + let fee_rate = daemon.estimate_fee(6)?; + + let psbt = create_transaction( + vec![mandatory_input], + &freezed_utxos, + &sp_wallet, + vec![recipient], + None, + fee_rate, + None + )?; + + let new_tx = psbt.extract_tx()?; + + daemon.test_mempool_accept(&new_tx)?; + + // We're ready to commit, we first update our process states + // We remove all concurrent states + let rm_states = commitment.remove_latest_concurrent_states(); + debug!("removed states: {:?}", rm_states); + // We push the validated state back + commitment.insert_state(state_to_validate); + + // We broadcast transaction + let txid = daemon.broadcast(&new_tx)?; + + // We push a new, empty state commited in the newly created output + let commited_in = OutPoint::new(txid, 0); + + // Add the newly created outpoint to our list of freezed utxos + freezed_utxos.insert(commited_in); + + let empty_state = ProcessState { + commited_in, + ..Default::default() + }; + + commitment.insert_state(empty_state); + + Ok(commited_in) + } else { + return Err(Error::msg("Invalid state")); + } + } else { + return Err(Error::msg("Unknown proposal, must create it first before sending validations")); + } + } + } else { + Err(Error::msg("init_tx must be a valid transaction or txid")) + } +} diff --git a/src/main.rs b/src/main.rs index 57c16c4..da44954 100644 --- a/src/main.rs +++ b/src/main.rs @@ -10,17 +10,18 @@ use std::{ sync::{Mutex, MutexGuard, OnceLock}, }; -use bitcoincore_rpc::{bitcoin::OutPoint, json::{self as bitcoin_json}}; +use bitcoincore_rpc::json::{self as bitcoin_json}; use futures_util::{future, pin_mut, stream::TryStreamExt, FutureExt, StreamExt}; use log::{debug, error, warn}; use message::{broadcast_message, process_message, BroadcastType, MessageCache, MESSAGECACHE}; use scan::compute_partial_tweak_to_transaction; -use sdk_common::{sp_client::bitcoin::{ +use sdk_common::{pcd::{AnkPcdHash, RoleDefinition}, process::CACHEDPROCESSES, signature::{AnkHash, Proof}, sp_client::bitcoin::{ consensus::deserialize, hex::{DisplayHex, FromHex}, Amount, Network, Transaction, }, MutexExt}; use sdk_common::sp_client::{ + bitcoin::{Txid, OutPoint}, bitcoin::secp256k1::rand::{thread_rng, Rng}, spclient::SpWallet, }; @@ -44,6 +45,7 @@ mod electrumclient; mod faucet; mod message; mod scan; +mod commit; use crate::config::Config; use crate::{daemon::Daemon, scan::scan_blocks}; @@ -293,6 +295,10 @@ async fn main() -> Result<()> { warn!("Running on mainnet, you're on your own"); } + CACHEDPROCESSES + .set(Mutex::new(HashMap::new())) + .expect("CachedProcesses initialization failed"); + MESSAGECACHE .set(MessageCache::new()) .expect("Message Cache initialization failed"); diff --git a/src/message.rs b/src/message.rs index 96621ec..bebf93e 100644 --- a/src/message.rs +++ b/src/message.rs @@ -8,9 +8,9 @@ use std::{ use tokio::time; use tokio_tungstenite::tungstenite::Message; -use sdk_common::network::{AnkFlag, Envelope, FaucetMessage, NewTxMessage}; +use sdk_common::network::{AnkFlag, CommitMessage, Envelope, FaucetMessage, NewTxMessage}; -use crate::{faucet::handle_faucet_request, handle_new_tx_request, PEERMAP}; +use crate::{commit::handle_commit_request, faucet::handle_faucet_request, handle_new_tx_request, PEERMAP}; pub(crate) static MESSAGECACHE: OnceLock = OnceLock::new(); @@ -181,7 +181,27 @@ fn process_cipher_message(ank_msg: Envelope, addr: SocketAddr) { } } -fn process_unknown_message(ank_msg: AnkNetworkMsg, addr: SocketAddr) { +fn process_commit_message(ank_msg: Envelope, addr: SocketAddr) { + if let Ok(mut commit_msg) = serde_json::from_str::(&ank_msg.content) { + match handle_commit_request(commit_msg.clone()) { + Ok(new_outpoint) => log::debug!("Processed commit msg for outpoint {}", new_outpoint), + Err(e) => { + log::error!("handle_commit_request returned error: {}", e); + commit_msg.error = Some(e.into()); + if let Err(e) = broadcast_message( + AnkFlag::Commit, + serde_json::to_string(&commit_msg).expect("This shouldn't fail"), + BroadcastType::Sender(addr) + ) + { + log::error!("Failed to broadcast message: {}", e); + } + } + }; + } +} + +fn process_unknown_message(ank_msg: Envelope, addr: SocketAddr) { log::debug!("Received an unknown message"); if let Err(e) = broadcast_message( AnkFlag::Unknown, @@ -206,6 +226,7 @@ pub fn process_message(raw_msg: &str, addr: SocketAddr) { AnkFlag::Faucet => process_faucet_message(ank_msg, addr), AnkFlag::NewTx => process_new_tx_message(ank_msg, addr), AnkFlag::Cipher => process_cipher_message(ank_msg, addr), + AnkFlag::Commit => process_commit_message(ank_msg, addr), AnkFlag::Unknown => process_unknown_message(ank_msg, addr), }, Err(_) => log::error!("Failed to parse network message"),