From ec8e4ebef87411ad826d5ebc65e77f9425e506e7 Mon Sep 17 00:00:00 2001 From: Sosthene00 Date: Mon, 29 Apr 2024 16:03:09 +0200 Subject: [PATCH] refactoring --- src/main.rs | 233 +++++++++++++++++++++++++++++----------------------- 1 file changed, 131 insertions(+), 102 deletions(-) diff --git a/src/main.rs b/src/main.rs index e115aaa..59a2515 100644 --- a/src/main.rs +++ b/src/main.rs @@ -15,7 +15,7 @@ use sdk_common::network::{AnkFlag, AnkNetworkMsg, FaucetMessage, NewTxMessage}; use sp_client::bitcoin::{ absolute::LockTime, consensus::{deserialize, serialize}, - hex::DisplayHex, + hex::{DisplayHex, FromHex}, key::TapTweak, sighash::{Prevouts, SighashCache}, taproot::Signature, @@ -26,21 +26,16 @@ use sp_client::bitcoin::{ use sp_client::{ bitcoin::secp256k1::{ rand::{thread_rng, Rng}, - Keypair, Message as Secp256k1Message, PublicKey, - Secp256k1, ThirtyTwoByteHash, + Keypair, Message as Secp256k1Message, PublicKey, Secp256k1, ThirtyTwoByteHash, }, spclient::SpWallet, }; use sp_client::db::{JsonFile, Storage}; use sp_client::silentpayments::sending::{generate_recipient_pubkeys, SilentPaymentAddress}; -use sp_client::silentpayments::utils::receiving::{ - calculate_tweak_data, get_pubkey_from_input, -}; +use sp_client::silentpayments::utils::receiving::{calculate_tweak_data, get_pubkey_from_input}; use sp_client::silentpayments::utils::sending::calculate_partial_secret; -use sp_client::spclient::{ - derive_keys_from_seed, Recipient, SpClient, SpendKey, -}; +use sp_client::spclient::{derive_keys_from_seed, Recipient, SpClient, SpendKey}; use tokio::net::{TcpListener, TcpStream}; use tokio::sync::mpsc::{unbounded_channel, UnboundedSender}; use tokio_stream::wrappers::UnboundedReceiverStream; @@ -61,7 +56,7 @@ type PeerMap = Arc>>; type SharedDaemon = Arc>; -const FAUCET_AMT: Amount = Amount::from_sat(1000); +const FAUCET_AMT: Amount = Amount::from_sat(100_000); pub(crate) trait MutexExt { fn lock_anyhow(&self) -> Result, Error>; @@ -227,8 +222,12 @@ fn faucet_send( .create_new_psbt(inputs.clone(), vec![recipient], None)?; log::debug!("Created psbt: {}", new_psbt); SpClient::set_fees(&mut new_psbt, fee_estimate, sp_address.into())?; - let partial_secret = wallet.get_client().get_partial_secret_from_psbt(&new_psbt)?; - wallet.get_client().fill_sp_outputs(&mut new_psbt, partial_secret)?; + let partial_secret = wallet + .get_client() + .get_partial_secret_from_psbt(&new_psbt)?; + wallet + .get_client() + .fill_sp_outputs(&mut new_psbt, partial_secret)?; log::debug!("Definitive psbt: {}", new_psbt); let mut aux_rand = [0u8; 32]; thread_rng().fill(&mut aux_rand); @@ -292,12 +291,19 @@ fn faucet_send( let ext_spk = ScriptBuf::new_p2tr_tweaked(ext_output_key.dangerous_assume_tweaked()); let change_spk = ScriptBuf::new_p2tr_tweaked(change_output_key.dangerous_assume_tweaked()); + // Take some margin to pay for the fees + if core_tx.output[0].value < FAUCET_AMT * 4 { + return Err(Error::msg("Not enough funds")); + } + + let change_amt = core_tx.output[0].value.checked_sub(FAUCET_AMT).unwrap(); + faucet_tx.output.push(TxOut { value: FAUCET_AMT, script_pubkey: ext_spk, }); faucet_tx.output.push(TxOut { - value: core_tx.output[0].value - FAUCET_AMT, + value: change_amt, script_pubkey: change_spk, }); @@ -352,45 +358,51 @@ fn handle_faucet_request( sp_wallet: Arc, shared_daemon: SharedDaemon, ) -> Result { - if let Ok(sp_address) = SilentPaymentAddress::try_from(msg) { - debug!("Sending bootstrap coins to {}", sp_address); - // send bootstrap coins to this sp_address - let tx = faucet_send(sp_address, sp_wallet.clone(), shared_daemon.clone())?; + let sp_address = SilentPaymentAddress::try_from(msg)?; + debug!("Sending bootstrap coins to {}", sp_address); + // send bootstrap coins to this sp_address + let tx = faucet_send(sp_address, sp_wallet.clone(), shared_daemon.clone())?; - // get the tweak + // get the tweak + let partial_tweak = compute_partial_tweak_to_transaction(&tx, shared_daemon.clone())?; + + // get current blockheight + let blkheight: u32 = shared_daemon + .lock_anyhow()? + .get_current_height()? + .try_into()?; + + // update our sp_client with the change output(s) + sp_wallet + .get_wallet()? + .update_wallet_with_transaction(&tx, blkheight, partial_tweak)?; + + debug!("{:?}", sp_wallet); + + debug!("updated the wallet"); + // save to disk + sp_wallet.save()?; + + debug!("saved the wallet"); + Ok(NewTxMessage::new( + serialize(&tx).to_lower_hex_string(), + Some(partial_tweak.to_string()), + )) +} + +fn handle_new_tx_request(msg: &str, shared_daemon: SharedDaemon) -> Result { + let mut new_tx_msg = serde_json::from_str::(msg)?; + let tx = deserialize::(&Vec::from_hex(&new_tx_msg.transaction)?)?; + if new_tx_msg.tweak_data.is_none() { + // we add the tweak_data let partial_tweak = compute_partial_tweak_to_transaction(&tx, shared_daemon.clone())?; - - debug!("Got the tweak"); - - // get current blockheight - let blkheight: u32 = shared_daemon - .lock_anyhow()? - .get_current_height()? - .try_into()?; - - // update our sp_client with the change output(s) - sp_wallet.get_wallet()?.update_wallet_with_transaction( - &tx, - blkheight, - partial_tweak, - )?; - - debug!("{:?}", sp_wallet.get_wallet()?.get_outputs()); - - debug!("updated the wallet"); - // save to disk - sp_wallet.save()?; - - debug!("saved the wallet"); - Ok(NewTxMessage::new( - serialize(&tx).to_lower_hex_string(), - Some(partial_tweak.to_string()), - )) - } else { - Err(Error::msg(format!( - "faucet message with unparsable sp_address" - ))) + new_tx_msg.tweak_data = Some(partial_tweak.to_string()); } + + // we try to broadcast it + shared_daemon.lock_anyhow()?.broadcast(&tx)?; + + Ok(new_tx_msg) } async fn handle_connection( @@ -419,65 +431,83 @@ async fn handle_connection( let (outgoing, incoming) = ws_stream.split(); - let broadcast_incoming = incoming.try_for_each({ + let broadcast_incoming = incoming.try_for_each(|msg| { let peers = peers.clone(); - move |msg| { - if let Ok(raw_msg) = msg.to_text() { - debug!("Received msg: {}", raw_msg); - let parsed = serde_json::from_str::(raw_msg); - match parsed { - Ok(ank_msg) => match ank_msg.flag { - AnkFlag::Faucet => { - debug!("Received a faucet message"); - if let Ok(content) = - serde_json::from_str::(&ank_msg.content) - { - match handle_faucet_request( - &content.sp_address, - sp_wallet.clone(), - shared_daemon.clone(), - ) { - Ok(new_tx_msg) => { - if let Err(e) = broadcast_message( - peers.clone(), - AnkFlag::NewTx, - serde_json::to_string(&new_tx_msg) - .expect("This should not fail"), - BroadcastType::Sender(addr), - ) { - log::error!( - "Failed to broadcast message: {}", - e.to_string() - ); - } - } - Err(e) => { - if let Err(e) = broadcast_message( - peers.clone(), - AnkFlag::Error, - e.to_string(), - BroadcastType::Sender(addr), - ) { - log::error!("Failed to broadcast message: {}", e); - } + if let Ok(raw_msg) = msg.to_text() { + debug!("Received msg: {}", raw_msg); + let parsed = serde_json::from_str::(raw_msg); + match parsed { + Ok(ank_msg) => match ank_msg.flag { + AnkFlag::Faucet => { + debug!("Received a faucet message"); + if let Ok(content) = serde_json::from_str::(&ank_msg.content) + { + match handle_faucet_request( + &content.sp_address, + sp_wallet.clone(), + shared_daemon.clone(), + ) { + Ok(new_tx_msg) => { + if let Err(e) = broadcast_message( + peers.clone(), + AnkFlag::NewTx, + serde_json::to_string(&new_tx_msg) + .expect("This should not fail"), + BroadcastType::Sender(addr), + ) { + log::error!( + "Failed to broadcast message: {}", + e.to_string() + ); } } - } else { - log::error!("Invalid content for faucet message"); + Err(e) => { + log::error!("Failed to send faucet tx: {}", e); + if let Err(e) = broadcast_message( + peers.clone(), + AnkFlag::Error, + e.to_string(), + BroadcastType::Sender(addr), + ) { + log::error!("Failed to broadcast message: {}", e); + } + } + } + } else { + log::error!("Invalid content for faucet message"); + } + } + AnkFlag::NewTx => { + debug!("Received a new tx message"); + match handle_new_tx_request(&ank_msg.content, shared_daemon.clone()) { + Ok(new_tx_msg) => { + // Repeat the msg to all except sender + if let Err(e) = broadcast_message( + peers.clone(), + AnkFlag::NewTx, + serde_json::to_string(&new_tx_msg) + .expect("This should not fail"), + BroadcastType::ExcludeSender(addr), + ) { + log::error!("Failed to send message with error: {}", e); + } + } + Err(e) => { + log::error!("handle_new_tx_request returned error: {}", e); } } - AnkFlag::NewTx => unimplemented!(), - AnkFlag::Error => unimplemented!(), - AnkFlag::Unknown => unimplemented!(), - }, - Err(_) => log::error!("Failed to parse network message"), - } - } else { - // we don't care - log::debug!("Received non-text message {} from peer {}", msg, addr); + } + AnkFlag::Error => unimplemented!(), + AnkFlag::Unknown => unimplemented!(), + AnkFlag::Prd => unimplemented!(), + }, + Err(_) => log::error!("Failed to parse network message"), } - future::ok(()) + } else { + // we don't care + log::debug!("Received non-text message {} from peer {}", msg, addr); } + future::ok(()) }); let receive_from_others = UnboundedReceiverStream::new(rx) @@ -557,7 +587,6 @@ async fn handle_zmq(peers: PeerMap, shared_daemon: SharedDaemon) { socket.connect("tcp://127.0.0.1:29100").await.unwrap(); socket.subscribe("rawtx").await.unwrap(); // socket.subscribe("hashblock"); - debug!("{:?}", socket.type_id()); loop { let core_msg = match socket.recv().await { Ok(m) => m,