Add commit logic
This commit is contained in:
parent
91b0c8494e
commit
a4f76b0252
202
src/commit.rs
Normal file
202
src/commit.rs
Normal file
@ -0,0 +1,202 @@
|
|||||||
|
use std::collections::HashMap;
|
||||||
|
|
||||||
|
use anyhow::{Error, Result};
|
||||||
|
|
||||||
|
use hex::FromHex;
|
||||||
|
use log::debug;
|
||||||
|
use sdk_common::pcd::AnkPcdHash;
|
||||||
|
use sdk_common::silentpayments::create_transaction;
|
||||||
|
use sdk_common::sp_client::spclient::Recipient;
|
||||||
|
use sdk_common::{error::AnkError, network::CommitMessage};
|
||||||
|
use sdk_common::sp_client::bitcoin::consensus::deserialize;
|
||||||
|
use sdk_common::sp_client::bitcoin::{Amount, Transaction, Txid, OutPoint};
|
||||||
|
use sdk_common::process::{Process, ProcessState, CACHEDPROCESSES};
|
||||||
|
use serde_json::Value;
|
||||||
|
|
||||||
|
use crate::{lock_freezed_utxos, MutexExt, DAEMON, WALLET};
|
||||||
|
|
||||||
|
pub(crate) fn handle_commit_request(commit_msg: CommitMessage) -> Result<OutPoint> {
|
||||||
|
// Attempt to deserialize `init_tx` as a `Transaction`
|
||||||
|
if let Ok(tx) = deserialize::<Transaction>(&Vec::from_hex(&commit_msg.init_tx)?) {
|
||||||
|
// This is the first transaction of a chain of commitments
|
||||||
|
|
||||||
|
// Ensure the transaction has only one output
|
||||||
|
if tx.output.len() != 1 {
|
||||||
|
return Err(AnkError::NewTxError(
|
||||||
|
"Transaction must have only one output".to_string(),
|
||||||
|
))?;
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: Check that the output pays us
|
||||||
|
|
||||||
|
// Validation tokens must be empty for the initial transaction
|
||||||
|
if !commit_msg.validation_tokens.is_empty() {
|
||||||
|
return Err(AnkError::NewTxError(
|
||||||
|
"Validation tokens must be empty".to_string(),
|
||||||
|
))?;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Obtain the daemon instance and broadcast the transaction
|
||||||
|
let daemon = DAEMON.get().unwrap().lock_anyhow()?;
|
||||||
|
daemon.broadcast(&tx)?;
|
||||||
|
|
||||||
|
// Create the root commitment outpoint
|
||||||
|
let root_commitment = OutPoint::new(tx.txid(), 0);
|
||||||
|
|
||||||
|
// Initialize the process state
|
||||||
|
let init_state = ProcessState {
|
||||||
|
commited_in: root_commitment,
|
||||||
|
encrypted_pcd: Value::Object(commit_msg.encrypted_pcd),
|
||||||
|
keys: commit_msg.keys,
|
||||||
|
validation_tokens: vec![],
|
||||||
|
};
|
||||||
|
|
||||||
|
// Access the cached processes and insert the new commitment
|
||||||
|
let mut commitments = CACHEDPROCESSES
|
||||||
|
.get()
|
||||||
|
.ok_or(Error::msg("CACHEDPROCESSES not initialized"))?
|
||||||
|
.lock_anyhow()?;
|
||||||
|
// We are confident that `root_commitment` doesn't exist in the map
|
||||||
|
commitments.insert(
|
||||||
|
root_commitment,
|
||||||
|
Process::new(vec![init_state], HashMap::new(), vec![]),
|
||||||
|
);
|
||||||
|
|
||||||
|
// Add the outpoint to the list of frozen UTXOs
|
||||||
|
let new_root_outpoint = OutPoint::new(tx.txid(), 0);
|
||||||
|
lock_freezed_utxos()?.insert(new_root_outpoint);
|
||||||
|
|
||||||
|
// Wait for validation tokens to spend the new output and commit the hash
|
||||||
|
Ok(new_root_outpoint)
|
||||||
|
}
|
||||||
|
// Attempt to deserialize `init_tx` as an `OutPoint`
|
||||||
|
else if let Ok(outpoint) = deserialize::<OutPoint>(&Vec::from_hex(&commit_msg.init_tx)?) {
|
||||||
|
// Reference the first transaction of a chain
|
||||||
|
let mut commitments = CACHEDPROCESSES
|
||||||
|
.get()
|
||||||
|
.ok_or(Error::msg("CACHEDPROCESSES not initialized"))?
|
||||||
|
.lock_anyhow()?;
|
||||||
|
let commitment = commitments
|
||||||
|
.get_mut(&outpoint)
|
||||||
|
.ok_or(Error::msg("Commitment not found"))?;
|
||||||
|
|
||||||
|
let pcd_hash = AnkPcdHash::from_map(&commit_msg.encrypted_pcd);
|
||||||
|
|
||||||
|
if commit_msg.validation_tokens.is_empty() {
|
||||||
|
// Register a new state if validation tokens are empty
|
||||||
|
|
||||||
|
// Get all the latest concurrent states
|
||||||
|
let concurrent_states = commitment.get_latest_concurrent_states();
|
||||||
|
|
||||||
|
let current_outpoint = concurrent_states.first().unwrap().commited_in;
|
||||||
|
|
||||||
|
// Check for existing states with the same PCD hash
|
||||||
|
if concurrent_states
|
||||||
|
.into_iter()
|
||||||
|
.any(|state| AnkPcdHash::from_value(&state.encrypted_pcd) == pcd_hash)
|
||||||
|
{
|
||||||
|
return Err(anyhow::Error::msg("Proposed state already exists"));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Insert the new process state
|
||||||
|
commitment.insert_state(ProcessState {
|
||||||
|
commited_in: current_outpoint,
|
||||||
|
encrypted_pcd: Value::Object(commit_msg.encrypted_pcd),
|
||||||
|
keys: commit_msg.keys,
|
||||||
|
validation_tokens: vec![],
|
||||||
|
});
|
||||||
|
|
||||||
|
Ok(current_outpoint)
|
||||||
|
} else {
|
||||||
|
// Validation tokens are provided; process the pending state
|
||||||
|
|
||||||
|
let new_state_commitment = AnkPcdHash::from_map(&commit_msg.encrypted_pcd);
|
||||||
|
|
||||||
|
// Clone the previous state, we'll need it for validation purpose
|
||||||
|
if let Some(mut state_to_validate) = commitment.get_latest_concurrent_states()
|
||||||
|
.into_iter()
|
||||||
|
.find(|s| {
|
||||||
|
AnkPcdHash::from_value(&s.encrypted_pcd) == new_state_commitment
|
||||||
|
})
|
||||||
|
.cloned()
|
||||||
|
{
|
||||||
|
// We update the validation tokens for our clone
|
||||||
|
state_to_validate.validation_tokens = commit_msg.validation_tokens;
|
||||||
|
let previous_state = commitment.get_previous_state(&state_to_validate);
|
||||||
|
if state_to_validate.is_valid(previous_state)? {
|
||||||
|
// If the new state is valid we commit it in a new transaction that spends the last commited_in
|
||||||
|
// By spending it we also know the next outpoint to monitor for the next state
|
||||||
|
// We add a placeholder state with that information at the tip of the chain
|
||||||
|
// We also remove all concurrent states that didn't get validated
|
||||||
|
let mut freezed_utxos = lock_freezed_utxos()?;
|
||||||
|
let mandatory_input = if freezed_utxos.remove(&state_to_validate.commited_in) {
|
||||||
|
state_to_validate.commited_in
|
||||||
|
} else {
|
||||||
|
// This shoudln't happen, except if we send the commitment message to another relay
|
||||||
|
// We need to think about the case a relay is down
|
||||||
|
// Probably relays should have backups of their keys so that it can be bootstrapped again and resume commiting
|
||||||
|
return Err(Error::msg("Commitment utxo doesn't exist"))
|
||||||
|
};
|
||||||
|
|
||||||
|
// TODO we should make this sequence more atomic by handling errors in a way that we get back to the current state if any step fails
|
||||||
|
|
||||||
|
let sp_wallet = WALLET.get().ok_or(Error::msg("Wallet not initialized"))?.get_wallet()?;
|
||||||
|
|
||||||
|
let recipient = Recipient {
|
||||||
|
address: sp_wallet.get_client().get_receiving_address(),
|
||||||
|
amount: Amount::from_sat(1000),
|
||||||
|
nb_outputs: 1
|
||||||
|
};
|
||||||
|
|
||||||
|
let daemon = DAEMON.get().unwrap().lock_anyhow()?;
|
||||||
|
let fee_rate = daemon.estimate_fee(6)?;
|
||||||
|
|
||||||
|
let psbt = create_transaction(
|
||||||
|
vec![mandatory_input],
|
||||||
|
&freezed_utxos,
|
||||||
|
&sp_wallet,
|
||||||
|
vec![recipient],
|
||||||
|
None,
|
||||||
|
fee_rate,
|
||||||
|
None
|
||||||
|
)?;
|
||||||
|
|
||||||
|
let new_tx = psbt.extract_tx()?;
|
||||||
|
|
||||||
|
daemon.test_mempool_accept(&new_tx)?;
|
||||||
|
|
||||||
|
// We're ready to commit, we first update our process states
|
||||||
|
// We remove all concurrent states
|
||||||
|
let rm_states = commitment.remove_latest_concurrent_states();
|
||||||
|
debug!("removed states: {:?}", rm_states);
|
||||||
|
// We push the validated state back
|
||||||
|
commitment.insert_state(state_to_validate);
|
||||||
|
|
||||||
|
// We broadcast transaction
|
||||||
|
let txid = daemon.broadcast(&new_tx)?;
|
||||||
|
|
||||||
|
// We push a new, empty state commited in the newly created output
|
||||||
|
let commited_in = OutPoint::new(txid, 0);
|
||||||
|
|
||||||
|
// Add the newly created outpoint to our list of freezed utxos
|
||||||
|
freezed_utxos.insert(commited_in);
|
||||||
|
|
||||||
|
let empty_state = ProcessState {
|
||||||
|
commited_in,
|
||||||
|
..Default::default()
|
||||||
|
};
|
||||||
|
|
||||||
|
commitment.insert_state(empty_state);
|
||||||
|
|
||||||
|
Ok(commited_in)
|
||||||
|
} else {
|
||||||
|
return Err(Error::msg("Invalid state"));
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
return Err(Error::msg("Unknown proposal, must create it first before sending validations"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
Err(Error::msg("init_tx must be a valid transaction or txid"))
|
||||||
|
}
|
||||||
|
}
|
10
src/main.rs
10
src/main.rs
@ -10,17 +10,18 @@ use std::{
|
|||||||
sync::{Mutex, MutexGuard, OnceLock},
|
sync::{Mutex, MutexGuard, OnceLock},
|
||||||
};
|
};
|
||||||
|
|
||||||
use bitcoincore_rpc::{bitcoin::OutPoint, json::{self as bitcoin_json}};
|
use bitcoincore_rpc::json::{self as bitcoin_json};
|
||||||
use futures_util::{future, pin_mut, stream::TryStreamExt, FutureExt, StreamExt};
|
use futures_util::{future, pin_mut, stream::TryStreamExt, FutureExt, StreamExt};
|
||||||
use log::{debug, error, warn};
|
use log::{debug, error, warn};
|
||||||
use message::{broadcast_message, process_message, BroadcastType, MessageCache, MESSAGECACHE};
|
use message::{broadcast_message, process_message, BroadcastType, MessageCache, MESSAGECACHE};
|
||||||
use scan::compute_partial_tweak_to_transaction;
|
use scan::compute_partial_tweak_to_transaction;
|
||||||
use sdk_common::{sp_client::bitcoin::{
|
use sdk_common::{pcd::{AnkPcdHash, RoleDefinition}, process::CACHEDPROCESSES, signature::{AnkHash, Proof}, sp_client::bitcoin::{
|
||||||
consensus::deserialize,
|
consensus::deserialize,
|
||||||
hex::{DisplayHex, FromHex},
|
hex::{DisplayHex, FromHex},
|
||||||
Amount, Network, Transaction,
|
Amount, Network, Transaction,
|
||||||
}, MutexExt};
|
}, MutexExt};
|
||||||
use sdk_common::sp_client::{
|
use sdk_common::sp_client::{
|
||||||
|
bitcoin::{Txid, OutPoint},
|
||||||
bitcoin::secp256k1::rand::{thread_rng, Rng},
|
bitcoin::secp256k1::rand::{thread_rng, Rng},
|
||||||
spclient::SpWallet,
|
spclient::SpWallet,
|
||||||
};
|
};
|
||||||
@ -44,6 +45,7 @@ mod electrumclient;
|
|||||||
mod faucet;
|
mod faucet;
|
||||||
mod message;
|
mod message;
|
||||||
mod scan;
|
mod scan;
|
||||||
|
mod commit;
|
||||||
|
|
||||||
use crate::config::Config;
|
use crate::config::Config;
|
||||||
use crate::{daemon::Daemon, scan::scan_blocks};
|
use crate::{daemon::Daemon, scan::scan_blocks};
|
||||||
@ -293,6 +295,10 @@ async fn main() -> Result<()> {
|
|||||||
warn!("Running on mainnet, you're on your own");
|
warn!("Running on mainnet, you're on your own");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
CACHEDPROCESSES
|
||||||
|
.set(Mutex::new(HashMap::new()))
|
||||||
|
.expect("CachedProcesses initialization failed");
|
||||||
|
|
||||||
MESSAGECACHE
|
MESSAGECACHE
|
||||||
.set(MessageCache::new())
|
.set(MessageCache::new())
|
||||||
.expect("Message Cache initialization failed");
|
.expect("Message Cache initialization failed");
|
||||||
|
@ -8,9 +8,9 @@ use std::{
|
|||||||
use tokio::time;
|
use tokio::time;
|
||||||
use tokio_tungstenite::tungstenite::Message;
|
use tokio_tungstenite::tungstenite::Message;
|
||||||
|
|
||||||
use sdk_common::network::{AnkFlag, Envelope, FaucetMessage, NewTxMessage};
|
use sdk_common::network::{AnkFlag, CommitMessage, Envelope, FaucetMessage, NewTxMessage};
|
||||||
|
|
||||||
use crate::{faucet::handle_faucet_request, handle_new_tx_request, PEERMAP};
|
use crate::{commit::handle_commit_request, faucet::handle_faucet_request, handle_new_tx_request, PEERMAP};
|
||||||
|
|
||||||
pub(crate) static MESSAGECACHE: OnceLock<MessageCache> = OnceLock::new();
|
pub(crate) static MESSAGECACHE: OnceLock<MessageCache> = OnceLock::new();
|
||||||
|
|
||||||
@ -181,7 +181,27 @@ fn process_cipher_message(ank_msg: Envelope, addr: SocketAddr) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn process_unknown_message(ank_msg: AnkNetworkMsg, addr: SocketAddr) {
|
fn process_commit_message(ank_msg: Envelope, addr: SocketAddr) {
|
||||||
|
if let Ok(mut commit_msg) = serde_json::from_str::<CommitMessage>(&ank_msg.content) {
|
||||||
|
match handle_commit_request(commit_msg.clone()) {
|
||||||
|
Ok(new_outpoint) => log::debug!("Processed commit msg for outpoint {}", new_outpoint),
|
||||||
|
Err(e) => {
|
||||||
|
log::error!("handle_commit_request returned error: {}", e);
|
||||||
|
commit_msg.error = Some(e.into());
|
||||||
|
if let Err(e) = broadcast_message(
|
||||||
|
AnkFlag::Commit,
|
||||||
|
serde_json::to_string(&commit_msg).expect("This shouldn't fail"),
|
||||||
|
BroadcastType::Sender(addr)
|
||||||
|
)
|
||||||
|
{
|
||||||
|
log::error!("Failed to broadcast message: {}", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn process_unknown_message(ank_msg: Envelope, addr: SocketAddr) {
|
||||||
log::debug!("Received an unknown message");
|
log::debug!("Received an unknown message");
|
||||||
if let Err(e) = broadcast_message(
|
if let Err(e) = broadcast_message(
|
||||||
AnkFlag::Unknown,
|
AnkFlag::Unknown,
|
||||||
@ -206,6 +226,7 @@ pub fn process_message(raw_msg: &str, addr: SocketAddr) {
|
|||||||
AnkFlag::Faucet => process_faucet_message(ank_msg, addr),
|
AnkFlag::Faucet => process_faucet_message(ank_msg, addr),
|
||||||
AnkFlag::NewTx => process_new_tx_message(ank_msg, addr),
|
AnkFlag::NewTx => process_new_tx_message(ank_msg, addr),
|
||||||
AnkFlag::Cipher => process_cipher_message(ank_msg, addr),
|
AnkFlag::Cipher => process_cipher_message(ank_msg, addr),
|
||||||
|
AnkFlag::Commit => process_commit_message(ank_msg, addr),
|
||||||
AnkFlag::Unknown => process_unknown_message(ank_msg, addr),
|
AnkFlag::Unknown => process_unknown_message(ank_msg, addr),
|
||||||
},
|
},
|
||||||
Err(_) => log::error!("Failed to parse network message"),
|
Err(_) => log::error!("Failed to parse network message"),
|
||||||
|
Loading…
x
Reference in New Issue
Block a user