From ff64646e18dd4bfedc0c399bc69438d8e0a8c9f2 Mon Sep 17 00:00:00 2001 From: Sosthene Date: Fri, 20 Jun 2025 10:35:17 +0200 Subject: [PATCH] Cargo fmt --- src/commit.rs | 8 ++--- src/main.rs | 83 +++++++++++++++++++++++++++++++++++++-------------- 2 files changed, 65 insertions(+), 26 deletions(-) diff --git a/src/commit.rs b/src/commit.rs index 0dd9c1a..ecd16de 100644 --- a/src/commit.rs +++ b/src/commit.rs @@ -18,11 +18,11 @@ use sdk_common::{ sp_client::{silentpayments::SilentPaymentAddress, RecipientAddress}, }; -use crate::{ - CHAIN_TIP, - message::{broadcast_message, BroadcastType}, -}; use crate::{lock_freezed_utxos, MutexExt, DAEMON, STORAGE, WALLET}; +use crate::{ + message::{broadcast_message, BroadcastType}, + CHAIN_TIP, +}; pub(crate) fn handle_commit_request(commit_msg: CommitMessage) -> Result { let mut processes = lock_processes()?; diff --git a/src/main.rs b/src/main.rs index b80215c..ff730b8 100644 --- a/src/main.rs +++ b/src/main.rs @@ -19,9 +19,6 @@ use futures_util::{future, pin_mut, stream::TryStreamExt, FutureExt, StreamExt}; use log::{debug, error, warn}; use message::{broadcast_message, process_message, BroadcastType, MessageCache, MESSAGECACHE}; use scan::{check_transaction_alone, compute_partial_tweak_to_transaction}; -use sdk_common::{sp_client::{ - bitcoin::{secp256k1::rand::thread_rng, OutPoint}, OutputSpendStatus, SpClient, SpendKey -}, updates::{init_update_sink, NativeUpdateSink, StateUpdate}}; use sdk_common::network::{AnkFlag, NewTxMessage}; use sdk_common::{ network::HandshakeMessage, @@ -39,6 +36,13 @@ use sdk_common::{ }, MutexExt, }; +use sdk_common::{ + sp_client::{ + bitcoin::{secp256k1::rand::thread_rng, OutPoint}, + OutputSpendStatus, SpClient, SpendKey, + }, + updates::{init_update_sink, NativeUpdateSink, StateUpdate}, +}; use serde_json::Value; use tokio::net::{TcpListener, TcpStream}; @@ -261,22 +265,32 @@ fn create_new_tx_message(transaction: Vec) -> Result { )) } -async fn handle_scan_updates(scan_rx: std::sync::mpsc::Receiver) { +async fn handle_scan_updates( + scan_rx: std::sync::mpsc::Receiver, +) { while let Ok(update) = scan_rx.recv() { log::debug!("Received scan update: {:?}", update); } } -async fn handle_state_updates(state_rx: std::sync::mpsc::Receiver) { +async fn handle_state_updates( + state_rx: std::sync::mpsc::Receiver, +) { while let Ok(update) = state_rx.recv() { match update { - StateUpdate::Update { blkheight, blkhash, found_outputs, found_inputs } => { + StateUpdate::Update { + blkheight, + blkhash, + found_outputs, + found_inputs, + } => { // We update the wallet with found outputs and inputs let mut sp_wallet = WALLET.get().unwrap().lock_anyhow().unwrap(); // inputs first for outpoint in found_inputs { if let Some(output) = sp_wallet.get_mut_outputs().get_mut(&outpoint) { - output.spend_status = OutputSpendStatus::Mined(blkhash.as_raw_hash().to_byte_array()); + output.spend_status = + OutputSpendStatus::Mined(blkhash.as_raw_hash().to_byte_array()); } else { // We found an input that we don't have in our wallet, that shouldn't happen error!("Spent unknown output: {:?}", outpoint); @@ -285,15 +299,29 @@ async fn handle_state_updates(state_rx: std::sync::mpsc::Receiver { + StateUpdate::NoUpdate { blkheight } => { // We just keep the current height to update the last_scan debug!("No update, setting last scan at {}", blkheight); let mut sp_wallet = WALLET.get().unwrap().lock_anyhow().unwrap(); sp_wallet.set_last_scan(blkheight.to_consensus_u32()); let json = serde_json::to_value(sp_wallet.clone()).unwrap(); - STORAGE.get().unwrap().lock_anyhow().unwrap().wallet_file.save(&json).unwrap(); + STORAGE + .get() + .unwrap() + .lock_anyhow() + .unwrap() + .wallet_file + .save(&json) + .unwrap(); } } } @@ -330,14 +358,20 @@ async fn handle_zmq(zmq_url: String, blindbit_url: String) { } }, Ok("hashblock") => { - let current_height = DAEMON.get().unwrap().lock_anyhow().unwrap().get_current_height().unwrap(); + let current_height = DAEMON + .get() + .unwrap() + .lock_anyhow() + .unwrap() + .get_current_height() + .unwrap(); CHAIN_TIP.store(current_height as u32, std::sync::atomic::Ordering::SeqCst); - + // Add retry logic for hashblock processing let mut retry_count = 0; const MAX_RETRIES: u32 = 3; const RETRY_DELAY_MS: u64 = 1000; // 1 second initial delay - + loop { match scan_blocks(0, &blindbit_url).await { Ok(_) => { @@ -347,21 +381,27 @@ async fn handle_zmq(zmq_url: String, blindbit_url: String) { Err(e) => { retry_count += 1; if retry_count >= MAX_RETRIES { - error!("Failed to scan blocks after {} retries: {}", MAX_RETRIES, e); + error!( + "Failed to scan blocks after {} retries: {}", + MAX_RETRIES, e + ); break; } - + // Exponential backoff: 1s, 2s, 4s let delay_ms = RETRY_DELAY_MS * (1 << (retry_count - 1)); - warn!("Scan failed (attempt {}/{}), retrying in {}ms: {}", - retry_count, MAX_RETRIES, delay_ms, e); - - tokio::time::sleep(tokio::time::Duration::from_millis(delay_ms)).await; + warn!( + "Scan failed (attempt {}/{}), retrying in {}ms: {}", + retry_count, MAX_RETRIES, delay_ms, e + ); + + tokio::time::sleep(tokio::time::Duration::from_millis(delay_ms)) + .await; } } } continue; - }, + } _ => { error!("Unexpected message in zmq"); continue; @@ -533,7 +573,7 @@ async fn main() -> Result<()> { let (sink, scan_rx, state_rx) = NativeUpdateSink::new(); init_update_sink(Arc::new(sink)); - + // Spawn the update handlers tokio::spawn(handle_scan_updates(scan_rx)); tokio::spawn(handle_state_updates(state_rx)); @@ -543,7 +583,6 @@ async fn main() -> Result<()> { scan_blocks(current_tip - last_scan, &config.blindbit_url).await?; } - // Subscribe to Bitcoin Core let zmq_url = config.zmq_url.clone(); let blindbit_url = config.blindbit_url.clone();