From 76e4c985c14f8e30140bd5ed298d3933c6a457c4 Mon Sep 17 00:00:00 2001 From: Sosthene Date: Fri, 20 Jun 2025 10:34:04 +0200 Subject: [PATCH] Scan block with blindbit instead of electrs --- src/commit.rs | 7 ++- src/config.rs | 3 +- src/main.rs | 120 +++++++++++++++++++++++++++++++++--------- src/scan.rs | 142 +++++++++++++++----------------------------------- 4 files changed, 144 insertions(+), 128 deletions(-) diff --git a/src/commit.rs b/src/commit.rs index aad4bf1..0dd9c1a 100644 --- a/src/commit.rs +++ b/src/commit.rs @@ -18,7 +18,10 @@ use sdk_common::{ sp_client::{silentpayments::SilentPaymentAddress, RecipientAddress}, }; -use crate::message::{broadcast_message, BroadcastType}; +use crate::{ + CHAIN_TIP, + message::{broadcast_message, BroadcastType}, +}; use crate::{lock_freezed_utxos, MutexExt, DAEMON, STORAGE, WALLET}; pub(crate) fn handle_commit_request(commit_msg: CommitMessage) -> Result { @@ -48,10 +51,12 @@ pub(crate) fn handle_commit_request(commit_msg: CommitMessage) -> Result; type PeerMap = Mutex>; @@ -77,7 +72,7 @@ pub(crate) static PEERMAP: OnceLock = OnceLock::new(); pub(crate) static DAEMON: OnceLock>> = OnceLock::new(); -static CHAIN_TIP: AtomicU64 = AtomicU64::new(0); +static CHAIN_TIP: AtomicU32 = AtomicU32::new(0); pub static FREEZED_UTXOS: OnceLock>> = OnceLock::new(); @@ -205,7 +200,7 @@ async fn handle_connection( our_sp_address.to_string(), OutPointMemberMap(members), OutPointProcessMap(processes), - current_tip, + current_tip.into(), ); if let Err(e) = broadcast_message( @@ -246,7 +241,6 @@ async fn handle_connection( } fn create_new_tx_message(transaction: Vec) -> Result { - // debug!("Creating tx message"); let tx: Transaction = deserialize(&transaction)?; if tx.is_coinbase() { @@ -267,6 +261,44 @@ fn create_new_tx_message(transaction: Vec) -> Result { )) } +async fn handle_scan_updates(scan_rx: std::sync::mpsc::Receiver) { + while let Ok(update) = scan_rx.recv() { + log::debug!("Received scan update: {:?}", update); + } +} + +async fn handle_state_updates(state_rx: std::sync::mpsc::Receiver) { + while let Ok(update) = state_rx.recv() { + match update { + StateUpdate::Update { blkheight, blkhash, found_outputs, found_inputs } => { + // We update the wallet with found outputs and inputs + let mut sp_wallet = WALLET.get().unwrap().lock_anyhow().unwrap(); + // inputs first + for outpoint in found_inputs { + if let Some(output) = sp_wallet.get_mut_outputs().get_mut(&outpoint) { + output.spend_status = OutputSpendStatus::Mined(blkhash.as_raw_hash().to_byte_array()); + } else { + // We found an input that we don't have in our wallet, that shouldn't happen + error!("Spent unknown output: {:?}", outpoint); + } + } + sp_wallet.get_mut_outputs().extend(found_outputs); + sp_wallet.set_last_scan(blkheight.to_consensus_u32()); + let json = serde_json::to_value(sp_wallet.clone()).unwrap(); + STORAGE.get().unwrap().lock_anyhow().unwrap().wallet_file.save(&json).unwrap(); + } + StateUpdate::NoUpdate { blkheight }=> { + // We just keep the current height to update the last_scan + debug!("No update, setting last scan at {}", blkheight); + let mut sp_wallet = WALLET.get().unwrap().lock_anyhow().unwrap(); + sp_wallet.set_last_scan(blkheight.to_consensus_u32()); + let json = serde_json::to_value(sp_wallet.clone()).unwrap(); + STORAGE.get().unwrap().lock_anyhow().unwrap().wallet_file.save(&json).unwrap(); + } + } + } +} + async fn handle_zmq(zmq_url: String, blindbit_url: String) { debug!("Starting listening on Core"); let mut socket = zeromq::SubSocket::new(); @@ -297,12 +329,38 @@ async fn handle_zmq(zmq_url: String, blindbit_url: String) { continue; } }, - Ok("hashblock") => match scan_blocks(0, &blindbit_url).await { - Ok(_) => continue, - Err(e) => { - error!("{}", e); - continue; + Ok("hashblock") => { + let current_height = DAEMON.get().unwrap().lock_anyhow().unwrap().get_current_height().unwrap(); + CHAIN_TIP.store(current_height as u32, std::sync::atomic::Ordering::SeqCst); + + // Add retry logic for hashblock processing + let mut retry_count = 0; + const MAX_RETRIES: u32 = 3; + const RETRY_DELAY_MS: u64 = 1000; // 1 second initial delay + + loop { + match scan_blocks(0, &blindbit_url).await { + Ok(_) => { + debug!("Successfully scanned blocks after {} retries", retry_count); + break; + } + Err(e) => { + retry_count += 1; + if retry_count >= MAX_RETRIES { + error!("Failed to scan blocks after {} retries: {}", MAX_RETRIES, e); + break; + } + + // Exponential backoff: 1s, 2s, 4s + let delay_ms = RETRY_DELAY_MS * (1 << (retry_count - 1)); + warn!("Scan failed (attempt {}/{}), retrying in {}ms: {}", + retry_count, MAX_RETRIES, delay_ms, e); + + tokio::time::sleep(tokio::time::Duration::from_millis(delay_ms)).await; + } + } } + continue; }, _ => { error!("Unexpected message in zmq"); @@ -356,6 +414,9 @@ async fn main() -> Result<()> { .get_current_height()? .try_into()?; + // Set CHAIN_TIP + CHAIN_TIP.store(current_tip, std::sync::atomic::Ordering::SeqCst); + let mut app_dir = PathBuf::from_str(&env::var("HOME")?)?; app_dir.push(config.data_dir); let mut wallet_file = app_dir.clone(); @@ -470,16 +531,25 @@ async fn main() -> Result<()> { STORAGE.set(Mutex::new(storage)).unwrap(); + let (sink, scan_rx, state_rx) = NativeUpdateSink::new(); + init_update_sink(Arc::new(sink)); + + // Spawn the update handlers + tokio::spawn(handle_scan_updates(scan_rx)); + tokio::spawn(handle_state_updates(state_rx)); + if last_scan < current_tip { log::info!("Scanning for our outputs"); - scan_blocks(current_tip - last_scan, &config.blindbit_url)?; + scan_blocks(current_tip - last_scan, &config.blindbit_url).await?; } - // Set CHAIN_TIP - CHAIN_TIP.store(current_tip as u64, std::sync::atomic::Ordering::SeqCst); // Subscribe to Bitcoin Core - tokio::spawn(handle_zmq(config.zmq_url, config.blindbit_url)); + let zmq_url = config.zmq_url.clone(); + let blindbit_url = config.blindbit_url.clone(); + tokio::spawn(async move { + handle_zmq(zmq_url, blindbit_url).await; + }); // Create the event loop and TCP listener we'll accept connections on. let try_socket = TcpListener::bind(config.ws_url).await; diff --git a/src/scan.rs b/src/scan.rs index c11bb16..f52fa9a 100644 --- a/src/scan.rs +++ b/src/scan.rs @@ -1,22 +1,27 @@ use std::collections::HashMap; use std::str::FromStr; +use std::sync::atomic::AtomicBool; +use std::sync::atomic::Ordering; +use std::sync::Arc; use std::sync::MutexGuard; use anyhow::{Error, Result}; -use bitcoincore_rpc::bitcoin::absolute::{Amount, Height}; -use sdk_common::silentpayments::{SpWallet, StateUpdater}; +use bitcoincore_rpc::bitcoin::absolute::Height; +use bitcoincore_rpc::bitcoin::Amount; +use sdk_common::silentpayments::SpWallet; use sdk_common::sp_client::bitcoin::bip158::BlockFilter; -use sdk_common::sp_client::bitcoin::hex::DisplayHex; use sdk_common::sp_client::bitcoin::secp256k1::{All, PublicKey, Scalar, Secp256k1, SecretKey}; use sdk_common::sp_client::bitcoin::{BlockHash, OutPoint, Transaction, TxOut, XOnlyPublicKey}; use sdk_common::sp_client::silentpayments::receiving::Receiver; use sdk_common::sp_client::silentpayments::utils::receiving::{ calculate_tweak_data, get_pubkey_from_input, }; -use sdk_common::sp_client::{OutputSpendStatus, OwnedOutput, SpScanner}; +use sdk_common::sp_client::{BlindbitBackend, OutputSpendStatus, OwnedOutput, SpScanner}; +use sdk_common::updates::StateUpdater; use tokio::time::Instant; -use crate::{MutexExt, DAEMON, STORAGE, WALLET}; +use crate::CHAIN_TIP; +use crate::{MutexExt, DAEMON, STORAGE, WALLET, WITH_CUTTHROUGH}; pub fn compute_partial_tweak_to_transaction(tx: &Transaction) -> Result { let daemon = DAEMON.get().ok_or(Error::msg("DAEMON not initialized"))?; @@ -236,21 +241,17 @@ fn scan_block_inputs( pub async fn scan_blocks(mut n_blocks_to_scan: u32, blindbit_url: &str) -> anyhow::Result<()> { log::info!("Starting a rescan"); - let blindbit_client = sdk_common::sp_client::BlindbitClient::new(blindbit_url.to_owned())?; - let mut sp_wallet = WALLET - .get() - .ok_or(Error::msg("Wallet not initialized"))? - .lock_anyhow()?; - - let core = DAEMON - .get() - .ok_or(Error::msg("DAEMON not initialized"))? - .lock_anyhow()?; - - let secp = Secp256k1::new(); - let scan_height = sp_wallet.get_last_scan(); - let tip_height: u32 = core.get_current_height()?.try_into()?; + // Get all the data we need upfront, before any async operations + let (sp_wallet, scan_height, tip_height) = { + let sp_wallet = WALLET + .get() + .ok_or(Error::msg("Wallet not initialized"))? + .lock_anyhow()?; + let scan_height = sp_wallet.get_last_scan(); + let tip_height: u32 = CHAIN_TIP.load(Ordering::Relaxed).try_into()?; + (sp_wallet.clone(), scan_height, tip_height) + }; // 0 means scan to tip if n_blocks_to_scan == 0 { @@ -267,83 +268,33 @@ pub async fn scan_blocks(mut n_blocks_to_scan: u32, blindbit_url: &str) -> anyho if start > end { return Ok(()); } - + let updater = StateUpdater::new(); + let backend = BlindbitBackend::new(blindbit_url.to_string())?; + + let owned_outpoints = sp_wallet.get_unspent_outputs().keys().map(|o| *o).collect(); + + let keep_scanning = Arc::new(AtomicBool::new(true)); log::info!("start: {} end: {}", start, end); - SpScanner::new( - sp_wallet.get_sp_client().clone(), - Box::new(updater), - backend, - owned_outpoints, - keep_scanning - ); - let mut filters: Vec<(u32, BlockHash, BlockFilter)> = vec![]; - for blkheight in start..=end { - filters.push(core.get_filters(blkheight)?); - } - - let mut tweak_data_map = blindbit_client.tweaks(Height::from_consensus(start)?, Amount::from_sat(550)).await?; - - let scan_sk = sp_wallet.get_sp_client().get_scan_key(); - let start_time = Instant::now(); + let mut scanner = SpScanner::new( + sp_wallet.get_sp_client().clone(), + Box::new(updater), + Box::new(backend), + owned_outpoints, + &keep_scanning, + ); - for (blkheight, blkhash, blkfilter) in filters { - let spk2secret = match tweak_data_map.remove(&blkheight) { - Some(tweak_data_vec) => get_script_to_secret_map( - &sp_wallet.get_sp_client().sp_receiver, - tweak_data_vec, - scan_sk.into(), - &secp, - )?, - None => HashMap::new(), - }; - - // check if new possible outputs are payments to us - let candidate_spks: Vec<&[u8; 34]> = spk2secret.keys().collect(); - - // check if owned inputs are spent - let owned_spks: Vec> = sp_wallet - .get_outputs() - .iter() - .map(|(_, output)| { - let script = output.script.to_bytes(); - script - }) - .collect(); - - let matched = check_block(blkfilter, blkhash, candidate_spks, owned_spks)?; - - if matched { - let blk = core.get_block(blkhash)?; - - // scan block for new outputs, and add them to our list - let utxo_created_in_block = scan_block_outputs( - &sp_wallet.get_sp_client().sp_receiver, - &blk.txdata, - blkheight.into(), - spk2secret, - )?; - if !utxo_created_in_block.is_empty() { - sp_wallet.get_mut_outputs().extend(utxo_created_in_block); - } - - // update the list of outputs just in case - // utxos may be created and destroyed in the same block - // search inputs and mark as mined - let utxo_destroyed_in_block = scan_block_inputs(sp_wallet.get_outputs(), blk.txdata)?; - if !utxo_destroyed_in_block.is_empty() { - let outputs = sp_wallet.get_mut_outputs(); - for outpoint in utxo_destroyed_in_block { - if let Some(output) = outputs.get_mut(&outpoint) { - output.spend_status = - OutputSpendStatus::Mined(blkhash.as_raw_hash().to_byte_array()); - } - } - } - } - } + let dust_limit = Amount::from_sat(0); // We don't really have a dust limit for this use case + scanner + .scan_blocks( + Height::from_consensus(start)?, + Height::from_consensus(end)?, + dust_limit, + WITH_CUTTHROUGH, + ) + .await?; // time elapsed for the scan log::info!( @@ -351,14 +302,5 @@ pub async fn scan_blocks(mut n_blocks_to_scan: u32, blindbit_url: &str) -> anyho start_time.elapsed().as_secs() ); - // update last_scan height - sp_wallet.set_last_scan(end); - STORAGE - .get() - .unwrap() - .lock_anyhow()? - .wallet_file - .save(&serde_json::to_value(sp_wallet.clone())?)?; - Ok(()) }