From 5f3c6a0e0b39d68647f4266bc519c9d2189fd106 Mon Sep 17 00:00:00 2001 From: Sosthene Date: Sat, 23 Aug 2025 15:25:09 +0200 Subject: [PATCH] Add scanner mod --- src/lib.rs | 1 + src/scanner.rs | 306 +++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 307 insertions(+) create mode 100644 src/scanner.rs diff --git a/src/lib.rs b/src/lib.rs index 63c7a6f..b41a382 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -6,3 +6,4 @@ pub mod api; mod peers; mod user; mod wallet; +mod scanner; diff --git a/src/scanner.rs b/src/scanner.rs new file mode 100644 index 0000000..ad5a627 --- /dev/null +++ b/src/scanner.rs @@ -0,0 +1,306 @@ +use std::{collections::{HashMap, HashSet}, sync::atomic::AtomicBool}; +use web_time::{Duration, Instant}; + +use anyhow::{bail, Result}; +use futures_util::Stream; +use sdk_common::{backend_blindbit_wasm::{ChainBackend, SpScanner}, log::{self, info}, sp_client::{bitcoin::{absolute::Height, bip158::BlockFilter, hashes::{sha256, Hash}, secp256k1::PublicKey, Amount, BlockHash, OutPoint}, BlockData, FilterData, OutputSpendStatus, OwnedOutput, SpClient, Updater}}; + +pub struct WasmSpScanner<'a> { + updater: Box, + backend: Box, + client: SpClient, + keep_scanning: &'a AtomicBool, // used to interrupt scanning + owned_outpoints: HashSet, // used to scan block inputs +} + +impl<'a> WasmSpScanner<'a> { + pub fn new( + client: SpClient, + updater: Box, + backend: Box, + owned_outpoints: HashSet, + keep_scanning: &'a AtomicBool, + ) -> Self { + log::info!("Creating WasmSpScanner"); + Self { + client, + updater, + backend, + owned_outpoints, + keep_scanning, + } + } + + pub async fn process_blocks( + &mut self, + start: Height, + end: Height, + block_data_stream: impl Stream> + Unpin , + ) -> Result<()> { + use futures_util::StreamExt; + + let mut update_time = Instant::now(); + let mut stream = block_data_stream; + + while let Some(blockdata) = stream.next().await { + let blockdata = blockdata?; + let blkheight = blockdata.blkheight; + let blkhash = blockdata.blkhash; + + // stop scanning and return if interrupted + if self.should_interrupt() { + self.save_state()?; + return Ok(()); + } + + let mut save_to_storage = false; + + // always save on last block or after 30 seconds since last save + if blkheight == end || update_time.elapsed() > Duration::from_secs(30) { + save_to_storage = true; + } + + let (found_outputs, found_inputs) = self.process_block(blockdata).await?; + + if !found_outputs.is_empty() { + save_to_storage = true; + self.record_outputs(blkheight, blkhash, found_outputs)?; + } + + if !found_inputs.is_empty() { + save_to_storage = true; + self.record_inputs(blkheight, blkhash, found_inputs)?; + } + + // tell the updater we scanned this block + self.record_progress(start, blkheight, end)?; + + if save_to_storage { + self.save_state()?; + update_time = Instant::now(); + } + } + + Ok(()) + } +} + +#[async_trait::async_trait(?Send)] +impl<'a> SpScanner for WasmSpScanner<'a> { + async fn scan_blocks( + &mut self, + start: Height, + end: Height, + dust_limit: Amount, + with_cutthrough: bool, + ) -> Result<()> { + if start > end { + bail!("bigger start than end: {} > {}", start, end); + } + + info!("start: {} end: {}", start, end); + let start_time= web_time::Instant::now(); + + // get block data stream + let range = start.to_consensus_u32()..=end.to_consensus_u32(); + let block_data_stream = self.get_block_data_stream(range, dust_limit, with_cutthrough); + + // process blocks using block data stream + self.process_blocks(start, end, block_data_stream).await?; + + // time elapsed for the scan + info!( + "Blindbit scan complete in {} seconds", + start_time.elapsed().as_secs() + ); + + Ok(()) + } + + async fn process_block( + &mut self, + blockdata: BlockData, + ) -> Result<(HashMap, HashSet)> { + let BlockData { + blkheight, + tweaks, + new_utxo_filter, + spent_filter, + .. + } = blockdata; + + let outs = self + .process_block_outputs(blkheight, tweaks, new_utxo_filter) + .await?; + + // after processing outputs, we add the found outputs to our list + self.owned_outpoints.extend(outs.keys()); + + let ins = self.process_block_inputs(blkheight, spent_filter).await?; + + // after processing inputs, we remove the found inputs + self.owned_outpoints.retain(|item| !ins.contains(item)); + + Ok((outs, ins)) + } + + async fn process_block_outputs( + &self, + blkheight: Height, + tweaks: Vec, + new_utxo_filter: FilterData, + ) -> Result> { + let mut res = HashMap::new(); + + if !tweaks.is_empty() { + let secrets_map = self.client.get_script_to_secret_map(tweaks)?; + + //last_scan = last_scan.max(n as u32); + let candidate_spks: Vec<&[u8; 34]> = secrets_map.keys().collect(); + + //get block gcs & check match + let blkfilter = BlockFilter::new(&new_utxo_filter.data); + let blkhash = new_utxo_filter.block_hash; + + let matched_outputs = Self::check_block_outputs(blkfilter, blkhash, candidate_spks)?; + + //if match: fetch and scan utxos + if matched_outputs { + info!("matched outputs on: {}", blkheight); + let found = self.scan_utxos(blkheight, secrets_map).await?; + + if !found.is_empty() { + for (label, utxo, tweak) in found { + let outpoint = OutPoint { + txid: utxo.txid, + vout: utxo.vout, + }; + + let out = OwnedOutput { + blockheight: blkheight, + tweak: tweak.to_be_bytes(), + amount: utxo.value, + script: utxo.scriptpubkey, + label, + spend_status: OutputSpendStatus::Unspent, + }; + + res.insert(outpoint, out); + } + } + } + } + Ok(res) + } + + async fn process_block_inputs( + &self, + blkheight: Height, + spent_filter: FilterData, + ) -> Result> { + let mut res = HashSet::new(); + + let blkhash = spent_filter.block_hash; + + // first get the 8-byte hashes used to construct the input filter + let input_hashes_map = self.get_input_hashes(blkhash)?; + + // check against filter + let blkfilter = BlockFilter::new(&spent_filter.data); + let matched_inputs = self.check_block_inputs( + blkfilter, + blkhash, + input_hashes_map.keys().cloned().collect(), + )?; + + // if match: download spent data, collect the outpoints that are spent + if matched_inputs { + info!("matched inputs on: {}", blkheight); + let spent = self.backend.spent_index(blkheight).await?.data; + + for spent in spent { + let hex: &[u8] = spent.as_ref(); + + if let Some(outpoint) = input_hashes_map.get(hex) { + res.insert(*outpoint); + } + } + } + Ok(res) + } + + fn get_block_data_stream( + &self, + range: std::ops::RangeInclusive, + dust_limit: Amount, + with_cutthrough: bool, + ) -> std::pin::Pin>>> { + self.backend + .get_block_data_for_range(range, dust_limit, with_cutthrough) + } + + fn should_interrupt(&self) -> bool { + !self + .keep_scanning + .load(std::sync::atomic::Ordering::Relaxed) + } + + fn save_state(&mut self) -> Result<()> { + self.updater.save_to_persistent_storage() + } + + fn record_outputs( + &mut self, + height: Height, + block_hash: BlockHash, + outputs: HashMap, + ) -> Result<()> { + self.updater + .record_block_outputs(height, block_hash, outputs) + } + + fn record_inputs( + &mut self, + height: Height, + block_hash: BlockHash, + inputs: HashSet, + ) -> Result<()> { + self.updater.record_block_inputs(height, block_hash, inputs) + } + + fn record_progress(&mut self, start: Height, current: Height, end: Height) -> Result<()> { + self.updater.record_scan_progress(start, current, end) + } + + fn client(&self) -> &SpClient { + &self.client + } + + fn backend(&self) -> &dyn ChainBackend { + self.backend.as_ref() + } + + fn updater(&mut self) -> &mut dyn Updater { + self.updater.as_mut() + } + + // Override the default get_input_hashes implementation to use owned_outpoints + fn get_input_hashes(&self, blkhash: BlockHash) -> Result> { + let mut map: HashMap<[u8; 8], OutPoint> = HashMap::new(); + + for outpoint in &self.owned_outpoints { + let mut arr = [0u8; 68]; + arr[..32].copy_from_slice(&outpoint.txid.to_raw_hash().to_byte_array()); + arr[32..36].copy_from_slice(&outpoint.vout.to_le_bytes()); + arr[36..].copy_from_slice(&blkhash.to_byte_array()); + let hash = sha256::Hash::hash(&arr); + + let mut res = [0u8; 8]; + res.copy_from_slice(&hash[..8]); + + map.insert(res, outpoint.clone()); + } + + Ok(map) + } +}