From 76f16a1d98b11fa7be8df51f9b71fd413238c776 Mon Sep 17 00:00:00 2001 From: Sosthene Date: Tue, 24 Jun 2025 17:24:31 +0200 Subject: [PATCH] Impl WasmSpScanner --- Cargo.toml | 4 + src/wallet.rs | 288 ++++++++++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 283 insertions(+), 9 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 4a33142..0d300a4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,6 +9,7 @@ crate-type = ["lib", "cdylib"] [dependencies] anyhow = "1.0" +async-trait = "0.1" serde = { version = "1.0.188", features = ["derive"] } serde_json = "1.0" wasm-bindgen = "0.2.91" @@ -19,6 +20,9 @@ tsify = { git = "https://github.com/Sosthene00/tsify", branch = "next" } # sdk_common = { path = "../sdk_common" } sdk_common = { git = "https://git.4nkweb.com/4nk/sdk_common.git", branch = "dev" } serde-wasm-bindgen = "0.6.5" +wasm-bindgen-futures = "0.4" +futures = "0.3" +web-sys = { version = "0.3", features = ["Window"] } [dev-dependencies] wasm-bindgen-test = "0.3" diff --git a/src/wallet.rs b/src/wallet.rs index 54f62eb..29dcbc3 100644 --- a/src/wallet.rs +++ b/src/wallet.rs @@ -1,17 +1,17 @@ use std::{ - collections::HashSet, - sync::{Mutex, MutexGuard, OnceLock}, + collections::{HashMap, HashSet}, ops::RangeInclusive, pin::Pin, sync::{atomic::AtomicBool, Arc, Mutex, MutexGuard, OnceLock}, time::Instant }; -use anyhow::Error; +use anyhow::{bail, Result, Error}; +use futures::Stream; use rand::Rng; -use sdk_common::sp_client::{ - bitcoin::{secp256k1::SecretKey, Network, OutPoint}, - silentpayments::SilentPaymentAddress, - SpClient, SpendKey, -}; +use sdk_common::{log, sp_client::{ + bitcoin::{absolute::Height, hashes::{sha256, Hash}, secp256k1::{PublicKey, SecretKey}, Amount, BlockHash, Network, OutPoint}, silentpayments::SilentPaymentAddress, BlindbitBackend, BlockData, FilterData, OutputSpendStatus, OwnedOutput, SpClient, SpScanner, SpendKey, SpentIndexData, Updater, UtxoData +}, updates::StateUpdater}; -use crate::MutexExt; +use sdk_common::sp_client::ChainBackendWasm; + +use crate::{user::lock_local_device, MutexExt}; pub static FREEZED_UTXOS: OnceLock>> = OnceLock::new(); @@ -29,3 +29,273 @@ pub fn generate_sp_wallet(network: Network) -> anyhow::Result { network, ) } + +pub struct WasmSpScanner<'a> { + updater: Box, + backend: BlindbitBackend, + client: SpClient, + keep_scanning: &'a AtomicBool, + owned_outpoints: HashSet, +} + +impl<'a> WasmSpScanner<'a> { + pub fn new( + client: SpClient, + updater: Box, + backend: BlindbitBackend, + owned_outpoints: HashSet, + keep_scanning: &'a AtomicBool, + ) -> Self { + Self { + client, + updater, + backend, + owned_outpoints, + keep_scanning, + } + } + + pub async fn scan_blocks( + &mut self, + start: Height, + end: Height, + dust_limit: Amount, + with_cutthrough: bool, + ) -> Result<()> { + if start > end { + bail!("bigger start than end: {} > {}", start, end); + } + + log::info!("start: {} end: {}", start, end); + let start_time: Instant = Instant::now(); + + // get block data stream + let range = start.to_consensus_u32()..=end.to_consensus_u32(); + let block_data_stream = self.backend.get_block_data_for_range(range, dust_limit, with_cutthrough); + + // process blocks using block data stream + self.process_blocks(start, end, block_data_stream).await?; + + // time elapsed for the scan + log::info!( + "Blindbit scan complete in {} seconds", + start_time.elapsed().as_secs() + ); + + Ok(()) + } + + async fn process_blocks( + &mut self, + start: Height, + end: Height, + block_data_stream: Pin>>>, + ) -> Result<()> { + use sdk_common::sp_client::futures::StreamExt; + use std::time::{Duration, Instant}; + + let mut update_time = Instant::now(); + let mut stream = block_data_stream; + + while let Some(blockdata) = stream.next().await { + let blockdata = blockdata?; + let blkheight = blockdata.blkheight; + let blkhash = blockdata.blkhash; + + // stop scanning and return if interrupted + if self.should_interrupt() { + self.save_state()?; + return Ok(()); + } + + let mut save_to_storage = false; + + // always save on last block or after 30 seconds since last save + if blkheight == end || update_time.elapsed() > Duration::from_secs(30) { + save_to_storage = true; + } + + let (found_outputs, found_inputs) = self.process_block(blockdata).await?; + + if !found_outputs.is_empty() { + save_to_storage = true; + self.record_outputs(blkheight, blkhash, found_outputs)?; + } + + if !found_inputs.is_empty() { + save_to_storage = true; + self.record_inputs(blkheight, blkhash, found_inputs)?; + } + + // tell the updater we scanned this block + self.record_progress(start, blkheight, end)?; + + if save_to_storage { + self.save_state()?; + update_time = Instant::now(); + } + } + + Ok(()) + } + + async fn process_block( + &mut self, + blockdata: BlockData, + ) -> Result<(HashMap, HashSet)> { + let BlockData { + blkheight, + tweaks, + new_utxo_filter, + spent_filter, + .. + } = blockdata; + + let outs = self + .process_block_outputs(blkheight, tweaks, new_utxo_filter) + .await?; + + // after processing outputs, we add the found outputs to our list + self.owned_outpoints.extend(outs.keys()); + + let ins = self.process_block_inputs(blkheight, spent_filter).await?; + + // after processing inputs, we remove the found inputs + self.owned_outpoints.retain(|item| !ins.contains(item)); + + Ok((outs, ins)) + } + + async fn process_block_outputs( + &self, + blkheight: Height, + tweaks: Vec, + new_utxo_filter: FilterData, + ) -> Result> { + // Implementation for processing block outputs + // This is a placeholder - you'll need to implement the actual logic + Ok(HashMap::new()) + } + + async fn process_block_inputs( + &self, + blkheight: Height, + spent_filter: FilterData, + ) -> Result> { + // Implementation for processing block inputs + // This is a placeholder - you'll need to implement the actual logic + Ok(HashSet::new()) + } + + fn should_interrupt(&self) -> bool { + !self + .keep_scanning + .load(std::sync::atomic::Ordering::Relaxed) + } + + fn save_state(&mut self) -> Result<()> { + self.updater.save_to_persistent_storage() + } + + fn record_outputs( + &mut self, + height: Height, + block_hash: BlockHash, + outputs: HashMap, + ) -> Result<()> { + self.updater.record_block_outputs(height, block_hash, outputs) + } + + fn record_inputs( + &mut self, + height: Height, + block_hash: BlockHash, + inputs: HashSet, + ) -> Result<()> { + self.updater.record_block_inputs(height, block_hash, inputs) + } + + fn record_progress(&mut self, start: Height, current: Height, end: Height) -> Result<()> { + self.updater.record_scan_progress(start, current, end) + } + + // Override the default get_input_hashes implementation to use owned_outpoints + fn get_input_hashes(&self, blkhash: BlockHash) -> Result> { + let mut map: HashMap<[u8; 8], OutPoint> = HashMap::new(); + + for outpoint in &self.owned_outpoints { + let mut arr = [0u8; 68]; + arr[..32].copy_from_slice(&outpoint.txid.to_raw_hash().to_byte_array()); + arr[32..36].copy_from_slice(&outpoint.vout.to_le_bytes()); + arr[36..].copy_from_slice(&blkhash.to_byte_array()); + let hash = sha256::Hash::hash(&arr); + + let mut res = [0u8; 8]; + res.copy_from_slice(&hash[..8]); + + map.insert(res, outpoint.clone()); + } + + Ok(map) + } +} + +pub async fn scan_blocks(mut n_blocks_to_scan: u32, blindbit_url: &str, tip_height: u32, with_cutthrough: bool) -> anyhow::Result<()> { + log::info!("Starting a rescan"); + + // Get all the data we need upfront, before any async operations + let device = lock_local_device()?; + let sp_wallet = device.get_sp_wallet(); + let scan_height = sp_wallet.get_last_scan(); + + // 0 means scan to tip + if n_blocks_to_scan == 0 { + n_blocks_to_scan = tip_height - scan_height; + } + + let start = scan_height + 1; + let end = if scan_height + n_blocks_to_scan <= tip_height { + scan_height + n_blocks_to_scan + } else { + tip_height + }; + + if start > end { + return Ok(()); + } + + let updater = StateUpdater::new(); + let backend = BlindbitBackend::new(blindbit_url.to_string())?; + + let owned_outpoints = sp_wallet.get_unspent_outputs().keys().map(|o| *o).collect(); + + let keep_scanning = Arc::new(AtomicBool::new(true)); + + log::info!("start: {} end: {}", start, end); + let start_time = Instant::now(); + let mut scanner = WasmSpScanner::new( + sp_wallet.get_sp_client().clone(), + Box::new(updater), + backend, + owned_outpoints, + &keep_scanning, + ); + + let dust_limit = Amount::from_sat(0); // We don't really have a dust limit for this use case + scanner + .scan_blocks( + Height::from_consensus(start)?, + Height::from_consensus(end)?, + dust_limit, + with_cutthrough, + ) + .await?; + + // time elapsed for the scan + log::info!( + "Scan complete in {} seconds", + start_time.elapsed().as_secs() + ); + + Ok(()) +}