Handle new message types and better error management

This commit is contained in:
Sosthene 2024-05-27 12:10:39 +02:00
parent de84c8a1bf
commit 539670d248
2 changed files with 61 additions and 28 deletions

View File

@ -294,6 +294,12 @@ impl Daemon {
Ok(blockchain_info.chain)
}
pub(crate) fn test_mempool_accept(&self, tx: &Transaction) -> Result<crate::bitcoin_json::TestMempoolAcceptResult> {
let res = self.rpc.test_mempool_accept(&vec![tx])?;
Ok(res.get(0).unwrap().clone())
}
pub(crate) fn broadcast(&self, tx: &Transaction) -> Result<Txid> {
let txid = self.rpc.send_raw_transaction(tx)?;

View File

@ -11,8 +11,7 @@ use bitcoincore_rpc::json::{self as bitcoin_json};
use futures_util::{future, pin_mut, stream::TryStreamExt, FutureExt, StreamExt};
use log::{debug, error};
use sdk_common::{
network::{AnkFlag, AnkNetworkMsg, FaucetMessage, NewTxMessage},
silentpayments::create_transaction_for_address_with_shared_secret,
error::AnkError, network::{AnkFlag, AnkNetworkMsg, FaucetMessage, NewTxMessage}, silentpayments::create_transaction_for_address_with_shared_secret
};
use sp_client::bitcoin::{
absolute::LockTime, consensus::{deserialize, serialize}, hex::{DisplayHex, FromHex}, key::TapTweak, script::PushBytesBuf, sighash::{Prevouts, SighashCache}, taproot::Signature, transaction::Version, Amount, OutPoint, Psbt, ScriptBuf, TapSighashType, Transaction, TxIn, TxOut, Witness, XOnlyPublicKey
@ -386,9 +385,12 @@ fn handle_faucet_request(
))
}
fn handle_new_tx_request(msg: &str, shared_daemon: SharedDaemon) -> Result<NewTxMessage> {
let mut new_tx_msg = serde_json::from_str::<NewTxMessage>(msg)?;
fn handle_new_tx_request(new_tx_msg: &mut NewTxMessage, shared_daemon: SharedDaemon) -> Result<()> {
let tx = deserialize::<Transaction>(&Vec::from_hex(&new_tx_msg.transaction)?)?;
let mempool_accept = shared_daemon.lock_anyhow()?.test_mempool_accept(&tx)?;
if !mempool_accept.allowed {
return Err(AnkError::NewTxError(mempool_accept.reject_reason.unwrap()))?;
}
if new_tx_msg.tweak_data.is_none() {
// we add the tweak_data
let partial_tweak = compute_partial_tweak_to_transaction(&tx, shared_daemon.clone())?;
@ -398,7 +400,7 @@ fn handle_new_tx_request(msg: &str, shared_daemon: SharedDaemon) -> Result<NewTx
// we try to broadcast it
shared_daemon.lock_anyhow()?.broadcast(&tx)?;
Ok(new_tx_msg)
Ok(())
}
async fn handle_connection(
@ -436,7 +438,7 @@ async fn handle_connection(
Ok(ank_msg) => match ank_msg.flag {
AnkFlag::Faucet => {
debug!("Received a faucet message");
if let Ok(content) = serde_json::from_str::<FaucetMessage>(&ank_msg.content)
if let Ok(mut content) = serde_json::from_str::<FaucetMessage>(&ank_msg.content)
{
match handle_faucet_request(
&content,
@ -451,10 +453,12 @@ async fn handle_connection(
}
Err(e) => {
log::error!("Failed to send faucet tx: {}", e);
content.error = Some(e.into());
let payload = serde_json::to_string(&content).expect("Message type shouldn't fail");
if let Err(e) = broadcast_message(
peers.clone(),
AnkFlag::Error,
e.to_string(),
AnkFlag::Faucet,
payload,
BroadcastType::Sender(addr),
) {
log::error!("Failed to broadcast message: {}", e);
@ -467,37 +471,60 @@ async fn handle_connection(
}
AnkFlag::NewTx => {
debug!("Received a new tx message");
match handle_new_tx_request(&ank_msg.content, shared_daemon.clone()) {
Ok(new_tx_msg) => {
// Repeat the msg to all except sender
if let Err(e) = broadcast_message(
peers.clone(),
AnkFlag::NewTx,
serde_json::to_string(&new_tx_msg)
.expect("This should not fail"),
BroadcastType::ExcludeSender(addr),
) {
log::error!("Failed to send message with error: {}", e);
if let Ok(mut new_tx_msg) = serde_json::from_str::<NewTxMessage>(&ank_msg.content) {
match handle_new_tx_request(&mut new_tx_msg, shared_daemon.clone()) {
Ok(new_tx_msg) => {
// Repeat the msg to all except sender
if let Err(e) = broadcast_message(
peers.clone(),
AnkFlag::NewTx,
serde_json::to_string(&new_tx_msg)
.expect("This should not fail"),
BroadcastType::ExcludeSender(addr),
) {
log::error!("Failed to send message with error: {}", e);
}
}
Err(e) => {
log::error!("handle_new_tx_request returned error: {}", e);
new_tx_msg.error = Some(e.into());
if let Err(e) = broadcast_message(
peers.clone(),
AnkFlag::NewTx,
serde_json::to_string(&new_tx_msg).expect("This shouldn't fail"),
BroadcastType::Sender(addr),
) {
log::error!("Failed to broadcast message: {}", e);
}
}
}
Err(e) => {
log::error!("handle_new_tx_request returned error: {}", e);
}
} else {
log::error!("Invalid content for new_tx message");
}
}
AnkFlag::Error => unimplemented!(),
AnkFlag::Unknown => {
debug!("Received an unknown message");
AnkFlag::Cipher => {
// For now we just send it to everyone
debug!("Received a cipher message");
if let Err(e) = broadcast_message(
peers.clone(),
AnkFlag::Unknown,
serde_json::to_string(&ank_msg.content).expect("This should not fail"),
AnkFlag::Cipher,
ank_msg.content,
BroadcastType::ExcludeSender(addr),
) {
log::error!("Failed to send message with error: {}", e);
}
}
AnkFlag::Unknown => {
debug!("Received an unknown message");
if let Err(e) = broadcast_message(
peers.clone(),
AnkFlag::Unknown,
ank_msg.content,
BroadcastType::ExcludeSender(addr),
) {
log::error!("Failed to send message with error: {}", e);
}
}
AnkFlag::Prd => unimplemented!(),
},
Err(_) => log::error!("Failed to parse network message"),
}