use std::{collections::{HashMap, HashSet}, sync::atomic::AtomicBool}; use web_time::{Duration, Instant}; use anyhow::{bail, Result}; use futures_util::Stream; use sdk_common::{backend_blindbit_wasm::{ChainBackend, SpScanner}, log::{self, info}, sp_client::{bitcoin::{absolute::Height, bip158::BlockFilter, hashes::{sha256, Hash}, secp256k1::PublicKey, Amount, BlockHash, OutPoint}, BlockData, FilterData, OutputSpendStatus, OwnedOutput, SpClient, Updater}}; pub struct WasmSpScanner<'a> { updater: Box, backend: Box, client: SpClient, keep_scanning: &'a AtomicBool, // used to interrupt scanning owned_outpoints: HashSet, // used to scan block inputs } impl<'a> WasmSpScanner<'a> { pub fn new( client: SpClient, updater: Box, backend: Box, owned_outpoints: HashSet, keep_scanning: &'a AtomicBool, ) -> Self { log::info!("Creating WasmSpScanner"); Self { client, updater, backend, owned_outpoints, keep_scanning, } } pub async fn process_blocks( &mut self, start: Height, end: Height, block_data_stream: impl Stream> + Unpin , ) -> Result<()> { use futures_util::StreamExt; let mut update_time = Instant::now(); let mut stream = block_data_stream; while let Some(blockdata) = stream.next().await { let blockdata = blockdata?; let blkheight = blockdata.blkheight; let blkhash = blockdata.blkhash; // stop scanning and return if interrupted if self.should_interrupt() { self.save_state()?; return Ok(()); } let mut save_to_storage = false; // always save on last block or after 30 seconds since last save if blkheight == end || update_time.elapsed() > Duration::from_secs(30) { save_to_storage = true; } let (found_outputs, found_inputs) = self.process_block(blockdata).await?; if !found_outputs.is_empty() { save_to_storage = true; self.record_outputs(blkheight, blkhash, found_outputs)?; } if !found_inputs.is_empty() { save_to_storage = true; self.record_inputs(blkheight, blkhash, found_inputs)?; } // tell the updater we scanned this block self.record_progress(start, blkheight, end)?; if save_to_storage { self.save_state()?; update_time = Instant::now(); } } Ok(()) } } #[async_trait::async_trait(?Send)] impl<'a> SpScanner for WasmSpScanner<'a> { async fn scan_blocks( &mut self, start: Height, end: Height, dust_limit: Amount, with_cutthrough: bool, ) -> Result<()> { if start > end { bail!("bigger start than end: {} > {}", start, end); } info!("start: {} end: {}", start, end); let start_time= web_time::Instant::now(); // get block data stream let range = start.to_consensus_u32()..=end.to_consensus_u32(); let block_data_stream = self.get_block_data_stream(range, dust_limit, with_cutthrough); // process blocks using block data stream self.process_blocks(start, end, block_data_stream).await?; // time elapsed for the scan info!( "Blindbit scan complete in {} seconds", start_time.elapsed().as_secs() ); Ok(()) } async fn process_block( &mut self, blockdata: BlockData, ) -> Result<(HashMap, HashSet)> { let BlockData { blkheight, tweaks, new_utxo_filter, spent_filter, .. } = blockdata; let outs = self .process_block_outputs(blkheight, tweaks, new_utxo_filter) .await?; // after processing outputs, we add the found outputs to our list self.owned_outpoints.extend(outs.keys()); let ins = self.process_block_inputs(blkheight, spent_filter).await?; // after processing inputs, we remove the found inputs self.owned_outpoints.retain(|item| !ins.contains(item)); Ok((outs, ins)) } async fn process_block_outputs( &self, blkheight: Height, tweaks: Vec, new_utxo_filter: FilterData, ) -> Result> { let mut res = HashMap::new(); if !tweaks.is_empty() { let secrets_map = self.client.get_script_to_secret_map(tweaks)?; //last_scan = last_scan.max(n as u32); let candidate_spks: Vec<&[u8; 34]> = secrets_map.keys().collect(); //get block gcs & check match let blkfilter = BlockFilter::new(&new_utxo_filter.data); let blkhash = new_utxo_filter.block_hash; let matched_outputs = Self::check_block_outputs(blkfilter, blkhash, candidate_spks)?; //if match: fetch and scan utxos if matched_outputs { info!("matched outputs on: {}", blkheight); let found = self.scan_utxos(blkheight, secrets_map).await?; if !found.is_empty() { for (label, utxo, tweak) in found { let outpoint = OutPoint { txid: utxo.txid, vout: utxo.vout, }; let out = OwnedOutput { blockheight: blkheight, tweak: tweak.to_be_bytes(), amount: utxo.value, script: utxo.scriptpubkey, label, spend_status: OutputSpendStatus::Unspent, }; res.insert(outpoint, out); } } } } Ok(res) } async fn process_block_inputs( &self, blkheight: Height, spent_filter: FilterData, ) -> Result> { let mut res = HashSet::new(); let blkhash = spent_filter.block_hash; // first get the 8-byte hashes used to construct the input filter let input_hashes_map = self.get_input_hashes(blkhash)?; // check against filter let blkfilter = BlockFilter::new(&spent_filter.data); let matched_inputs = self.check_block_inputs( blkfilter, blkhash, input_hashes_map.keys().cloned().collect(), )?; // if match: download spent data, collect the outpoints that are spent if matched_inputs { info!("matched inputs on: {}", blkheight); let spent = self.backend.spent_index(blkheight).await?.data; for spent in spent { let hex: &[u8] = spent.as_ref(); if let Some(outpoint) = input_hashes_map.get(hex) { res.insert(*outpoint); } } } Ok(res) } fn get_block_data_stream( &self, range: std::ops::RangeInclusive, dust_limit: Amount, with_cutthrough: bool, ) -> std::pin::Pin>>> { self.backend .get_block_data_for_range(range, dust_limit, with_cutthrough) } fn should_interrupt(&self) -> bool { !self .keep_scanning .load(std::sync::atomic::Ordering::Relaxed) } fn save_state(&mut self) -> Result<()> { self.updater.save_to_persistent_storage() } fn record_outputs( &mut self, height: Height, block_hash: BlockHash, outputs: HashMap, ) -> Result<()> { self.updater .record_block_outputs(height, block_hash, outputs) } fn record_inputs( &mut self, height: Height, block_hash: BlockHash, inputs: HashSet, ) -> Result<()> { self.updater.record_block_inputs(height, block_hash, inputs) } fn record_progress(&mut self, start: Height, current: Height, end: Height) -> Result<()> { self.updater.record_scan_progress(start, current, end) } fn client(&self) -> &SpClient { &self.client } fn backend(&self) -> &dyn ChainBackend { self.backend.as_ref() } fn updater(&mut self) -> &mut dyn Updater { self.updater.as_mut() } // Override the default get_input_hashes implementation to use owned_outpoints fn get_input_hashes(&self, blkhash: BlockHash) -> Result> { let mut map: HashMap<[u8; 8], OutPoint> = HashMap::new(); for outpoint in &self.owned_outpoints { let mut arr = [0u8; 68]; arr[..32].copy_from_slice(&outpoint.txid.to_raw_hash().to_byte_array()); arr[32..36].copy_from_slice(&outpoint.vout.to_le_bytes()); arr[36..].copy_from_slice(&blkhash.to_byte_array()); let hash = sha256::Hash::hash(&arr); let mut res = [0u8; 8]; res.copy_from_slice(&hash[..8]); map.insert(res, outpoint.clone()); } Ok(map) } }