diff --git a/src/daemon.rs b/src/daemon.rs index d3beb46..661bc5f 100644 --- a/src/daemon.rs +++ b/src/daemon.rs @@ -294,6 +294,12 @@ impl Daemon { Ok(blockchain_info.chain) } + pub(crate) fn test_mempool_accept(&self, tx: &Transaction) -> Result { + let res = self.rpc.test_mempool_accept(&vec![tx])?; + + Ok(res.get(0).unwrap().clone()) + } + pub(crate) fn broadcast(&self, tx: &Transaction) -> Result { let txid = self.rpc.send_raw_transaction(tx)?; diff --git a/src/main.rs b/src/main.rs index 160a5fa..6d724b5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -11,8 +11,7 @@ use bitcoincore_rpc::json::{self as bitcoin_json}; use futures_util::{future, pin_mut, stream::TryStreamExt, FutureExt, StreamExt}; use log::{debug, error}; use sdk_common::{ - network::{AnkFlag, AnkNetworkMsg, FaucetMessage, NewTxMessage}, - silentpayments::create_transaction_for_address_with_shared_secret, + error::AnkError, network::{AnkFlag, AnkNetworkMsg, FaucetMessage, NewTxMessage}, silentpayments::create_transaction_for_address_with_shared_secret }; use sp_client::bitcoin::{ absolute::LockTime, consensus::{deserialize, serialize}, hex::{DisplayHex, FromHex}, key::TapTweak, script::PushBytesBuf, sighash::{Prevouts, SighashCache}, taproot::Signature, transaction::Version, Amount, OutPoint, Psbt, ScriptBuf, TapSighashType, Transaction, TxIn, TxOut, Witness, XOnlyPublicKey @@ -386,9 +385,12 @@ fn handle_faucet_request( )) } -fn handle_new_tx_request(msg: &str, shared_daemon: SharedDaemon) -> Result { - let mut new_tx_msg = serde_json::from_str::(msg)?; +fn handle_new_tx_request(new_tx_msg: &mut NewTxMessage, shared_daemon: SharedDaemon) -> Result<()> { let tx = deserialize::(&Vec::from_hex(&new_tx_msg.transaction)?)?; + let mempool_accept = shared_daemon.lock_anyhow()?.test_mempool_accept(&tx)?; + if !mempool_accept.allowed { + return Err(AnkError::NewTxError(mempool_accept.reject_reason.unwrap()))?; + } if new_tx_msg.tweak_data.is_none() { // we add the tweak_data let partial_tweak = compute_partial_tweak_to_transaction(&tx, shared_daemon.clone())?; @@ -398,7 +400,7 @@ fn handle_new_tx_request(msg: &str, shared_daemon: SharedDaemon) -> Result match ank_msg.flag { AnkFlag::Faucet => { debug!("Received a faucet message"); - if let Ok(content) = serde_json::from_str::(&ank_msg.content) + if let Ok(mut content) = serde_json::from_str::(&ank_msg.content) { match handle_faucet_request( &content, @@ -451,10 +453,12 @@ async fn handle_connection( } Err(e) => { log::error!("Failed to send faucet tx: {}", e); + content.error = Some(e.into()); + let payload = serde_json::to_string(&content).expect("Message type shouldn't fail"); if let Err(e) = broadcast_message( peers.clone(), - AnkFlag::Error, - e.to_string(), + AnkFlag::Faucet, + payload, BroadcastType::Sender(addr), ) { log::error!("Failed to broadcast message: {}", e); @@ -467,37 +471,60 @@ async fn handle_connection( } AnkFlag::NewTx => { debug!("Received a new tx message"); - match handle_new_tx_request(&ank_msg.content, shared_daemon.clone()) { - Ok(new_tx_msg) => { - // Repeat the msg to all except sender - if let Err(e) = broadcast_message( - peers.clone(), - AnkFlag::NewTx, - serde_json::to_string(&new_tx_msg) - .expect("This should not fail"), - BroadcastType::ExcludeSender(addr), - ) { - log::error!("Failed to send message with error: {}", e); + if let Ok(mut new_tx_msg) = serde_json::from_str::(&ank_msg.content) { + match handle_new_tx_request(&mut new_tx_msg, shared_daemon.clone()) { + Ok(new_tx_msg) => { + // Repeat the msg to all except sender + if let Err(e) = broadcast_message( + peers.clone(), + AnkFlag::NewTx, + serde_json::to_string(&new_tx_msg) + .expect("This should not fail"), + BroadcastType::ExcludeSender(addr), + ) { + log::error!("Failed to send message with error: {}", e); + } + } + Err(e) => { + log::error!("handle_new_tx_request returned error: {}", e); + new_tx_msg.error = Some(e.into()); + if let Err(e) = broadcast_message( + peers.clone(), + AnkFlag::NewTx, + serde_json::to_string(&new_tx_msg).expect("This shouldn't fail"), + BroadcastType::Sender(addr), + ) { + log::error!("Failed to broadcast message: {}", e); + } } } - Err(e) => { - log::error!("handle_new_tx_request returned error: {}", e); - } + } else { + log::error!("Invalid content for new_tx message"); } } - AnkFlag::Error => unimplemented!(), - AnkFlag::Unknown => { - debug!("Received an unknown message"); + AnkFlag::Cipher => { + // For now we just send it to everyone + debug!("Received a cipher message"); if let Err(e) = broadcast_message( peers.clone(), - AnkFlag::Unknown, - serde_json::to_string(&ank_msg.content).expect("This should not fail"), + AnkFlag::Cipher, + ank_msg.content, + BroadcastType::ExcludeSender(addr), + ) { + log::error!("Failed to send message with error: {}", e); + } + } + AnkFlag::Unknown => { + debug!("Received an unknown message"); + if let Err(e) = broadcast_message( + peers.clone(), + AnkFlag::Unknown, + ank_msg.content, BroadcastType::ExcludeSender(addr), ) { log::error!("Failed to send message with error: {}", e); } } - AnkFlag::Prd => unimplemented!(), }, Err(_) => log::error!("Failed to parse network message"), }