use std::collections::HashMap; use std::collections::HashSet; use std::str::FromStr; use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering; use std::sync::Arc; use std::sync::MutexGuard; use anyhow::bail; use anyhow::{Error, Result}; use bitcoincore_rpc::bitcoin::absolute::Height; use bitcoincore_rpc::bitcoin::hashes::sha256; use bitcoincore_rpc::bitcoin::hashes::Hash; use bitcoincore_rpc::bitcoin::Amount; use futures_util::Stream; use log::info; use sdk_common::backend_blindbit_native::BlindbitBackend; use sdk_common::backend_blindbit_native::ChainBackend; use sdk_common::backend_blindbit_native::SpScanner; use sdk_common::silentpayments::SpWallet; use sdk_common::sp_client::bitcoin::bip158::BlockFilter; use sdk_common::sp_client::bitcoin::secp256k1::{All, PublicKey, Scalar, Secp256k1, SecretKey}; use sdk_common::sp_client::bitcoin::{BlockHash, OutPoint, Transaction, TxOut, XOnlyPublicKey}; use sdk_common::sp_client::silentpayments::receiving::Receiver; use sdk_common::sp_client::silentpayments::utils::receiving::{ calculate_tweak_data, get_pubkey_from_input, }; use sdk_common::sp_client::BlockData; use sdk_common::sp_client::FilterData; use sdk_common::sp_client::SpClient; use sdk_common::sp_client::Updater; use sdk_common::sp_client::{OutputSpendStatus, OwnedOutput}; use sdk_common::updates::StateUpdater; use tokio::time::Instant; use crate::CHAIN_TIP; use crate::{MutexExt, DAEMON, STORAGE, WALLET, WITH_CUTTHROUGH}; pub fn compute_partial_tweak_to_transaction(tx: &Transaction) -> Result { let daemon = DAEMON.get().ok_or(Error::msg("DAEMON not initialized"))?; let mut outpoints: Vec<(String, u32)> = Vec::with_capacity(tx.input.len()); let mut pubkeys: Vec = Vec::with_capacity(tx.input.len()); // TODO we should cache transactions to prevent multiple rpc request when transaction spends multiple outputs from the same tx for input in tx.input.iter() { outpoints.push(( input.previous_output.txid.to_string(), input.previous_output.vout, )); let prev_tx = daemon .lock_anyhow()? .get_transaction(&input.previous_output.txid, None) .map_err(|e| Error::msg(format!("Failed to find previous transaction: {}", e)))?; if let Some(output) = prev_tx.output.get(input.previous_output.vout as usize) { match get_pubkey_from_input( &input.script_sig.to_bytes(), &input.witness.to_vec(), &output.script_pubkey.to_bytes(), ) { Ok(Some(pubkey)) => pubkeys.push(pubkey), Ok(None) => continue, Err(e) => { return Err(Error::msg(format!( "Can't extract pubkey from input: {}", e ))) } } } else { return Err(Error::msg("Transaction with a non-existing input")); } } let input_pub_keys: Vec<&PublicKey> = pubkeys.iter().collect(); let partial_tweak = calculate_tweak_data(&input_pub_keys, &outpoints)?; Ok(partial_tweak) } pub fn check_transaction_alone( mut wallet: MutexGuard, tx: &Transaction, tweak_data: &PublicKey, ) -> Result> { let updates = match wallet.update_with_transaction(tx, tweak_data, 0) { Ok(updates) => updates, Err(e) => { log::debug!("Error while checking transaction: {}", e); HashMap::new() } }; if updates.len() > 0 { let storage = STORAGE .get() .ok_or_else(|| Error::msg("Failed to get STORAGE"))?; storage .lock_anyhow()? .wallet_file .save(&serde_json::to_value(wallet.clone())?)?; } Ok(updates) } fn check_block( blkfilter: BlockFilter, blkhash: BlockHash, candidate_spks: Vec<&[u8; 34]>, owned_spks: Vec>, ) -> Result { // check output scripts let mut scripts_to_match: Vec<_> = candidate_spks.into_iter().map(|spk| spk.as_ref()).collect(); // check input scripts scripts_to_match.extend(owned_spks.iter().map(|spk| spk.as_slice())); // note: match will always return true for an empty query! if !scripts_to_match.is_empty() { Ok(blkfilter.match_any(&blkhash, &mut scripts_to_match.into_iter())?) } else { Ok(false) } } fn scan_block_outputs( sp_receiver: &Receiver, txdata: &Vec, blkheight: u64, spk2secret: HashMap<[u8; 34], PublicKey>, ) -> Result> { let mut res: HashMap = HashMap::new(); // loop over outputs for tx in txdata { let txid = tx.txid(); // collect all taproot outputs from transaction let p2tr_outs: Vec<(usize, &TxOut)> = tx .output .iter() .enumerate() .filter(|(_, o)| o.script_pubkey.is_p2tr()) .collect(); if p2tr_outs.is_empty() { continue; }; // no taproot output let mut secret: Option = None; // Does this transaction contains one of the outputs we already found? for spk in p2tr_outs.iter().map(|(_, o)| &o.script_pubkey) { if let Some(s) = spk2secret.get(spk.as_bytes()) { // we might have at least one output in this transaction secret = Some(*s); break; } } if secret.is_none() { continue; }; // we don't have a secret that matches any of the keys // Now we can just run sp_receiver on all the p2tr outputs let xonlykeys: Result> = p2tr_outs .iter() .map(|(_, o)| { XOnlyPublicKey::from_slice(&o.script_pubkey.as_bytes()[2..]).map_err(Error::new) }) .collect(); let ours = sp_receiver.scan_transaction(&secret.unwrap(), xonlykeys?)?; let height = Height::from_consensus(blkheight as u32)?; for (label, map) in ours { res.extend(p2tr_outs.iter().filter_map(|(i, o)| { match XOnlyPublicKey::from_slice(&o.script_pubkey.as_bytes()[2..]) { Ok(key) => { if let Some(scalar) = map.get(&key) { match SecretKey::from_slice(&scalar.to_be_bytes()) { Ok(tweak) => { let outpoint = OutPoint { txid, vout: *i as u32, }; return Some(( outpoint, OwnedOutput { blockheight: height, tweak: tweak.secret_bytes(), amount: o.value, script: o.script_pubkey.clone(), label: label.clone(), spend_status: OutputSpendStatus::Unspent, }, )); } Err(_) => { return None; } } } None } Err(_) => None, } })); } } Ok(res) } fn scan_block_inputs( our_outputs: &HashMap, txdata: Vec, ) -> Result> { let mut found = vec![]; for tx in txdata { for input in tx.input { let prevout = input.previous_output; if our_outputs.contains_key(&prevout) { found.push(prevout); } } } Ok(found) } pub struct NativeSpScanner<'a> { updater: Box, backend: Box, client: SpClient, keep_scanning: &'a AtomicBool, // used to interrupt scanning owned_outpoints: HashSet, // used to scan block inputs } impl<'a> NativeSpScanner<'a> { pub fn new( client: SpClient, updater: Box, backend: Box, owned_outpoints: HashSet, keep_scanning: &'a AtomicBool, ) -> Self { Self { client, updater, backend, owned_outpoints, keep_scanning, } } pub async fn process_blocks( &mut self, start: Height, end: Height, block_data_stream: impl Stream> + Unpin + Send, ) -> Result<()> { use futures_util::StreamExt; use std::time::{Duration, Instant}; let mut update_time = Instant::now(); let mut stream = block_data_stream; while let Some(blockdata) = stream.next().await { let blockdata = blockdata?; let blkheight = blockdata.blkheight; let blkhash = blockdata.blkhash; // stop scanning and return if interrupted if self.should_interrupt() { self.save_state()?; return Ok(()); } let mut save_to_storage = false; // always save on last block or after 30 seconds since last save if blkheight == end || update_time.elapsed() > Duration::from_secs(30) { save_to_storage = true; } let (found_outputs, found_inputs) = self.process_block(blockdata).await?; if !found_outputs.is_empty() { save_to_storage = true; self.record_outputs(blkheight, blkhash, found_outputs)?; } if !found_inputs.is_empty() { save_to_storage = true; self.record_inputs(blkheight, blkhash, found_inputs)?; } // tell the updater we scanned this block self.record_progress(start, blkheight, end)?; if save_to_storage { self.save_state()?; update_time = Instant::now(); } } Ok(()) } } #[async_trait::async_trait] impl<'a> SpScanner for NativeSpScanner<'a> { async fn scan_blocks( &mut self, start: Height, end: Height, dust_limit: Amount, with_cutthrough: bool, ) -> Result<()> { if start > end { bail!("bigger start than end: {} > {}", start, end); } info!("start: {} end: {}", start, end); let start_time: Instant = Instant::now(); // get block data stream let range = start.to_consensus_u32()..=end.to_consensus_u32(); let block_data_stream = self.get_block_data_stream(range, dust_limit, with_cutthrough); // process blocks using block data stream self.process_blocks(start, end, block_data_stream).await?; // time elapsed for the scan info!( "Blindbit scan complete in {} seconds", start_time.elapsed().as_secs() ); Ok(()) } async fn process_block( &mut self, blockdata: BlockData, ) -> Result<(HashMap, HashSet)> { let BlockData { blkheight, tweaks, new_utxo_filter, spent_filter, .. } = blockdata; let outs = self .process_block_outputs(blkheight, tweaks, new_utxo_filter) .await?; // after processing outputs, we add the found outputs to our list self.owned_outpoints.extend(outs.keys()); let ins = self.process_block_inputs(blkheight, spent_filter).await?; // after processing inputs, we remove the found inputs self.owned_outpoints.retain(|item| !ins.contains(item)); Ok((outs, ins)) } async fn process_block_outputs( &self, blkheight: Height, tweaks: Vec, new_utxo_filter: FilterData, ) -> Result> { let mut res = HashMap::new(); if !tweaks.is_empty() { let secrets_map = self.client.get_script_to_secret_map(tweaks)?; //last_scan = last_scan.max(n as u32); let candidate_spks: Vec<&[u8; 34]> = secrets_map.keys().collect(); //get block gcs & check match let blkfilter = BlockFilter::new(&new_utxo_filter.data); let blkhash = new_utxo_filter.block_hash; let matched_outputs = Self::check_block_outputs(blkfilter, blkhash, candidate_spks)?; //if match: fetch and scan utxos if matched_outputs { info!("matched outputs on: {}", blkheight); let found = self.scan_utxos(blkheight, secrets_map).await?; if !found.is_empty() { for (label, utxo, tweak) in found { let outpoint = OutPoint { txid: utxo.txid, vout: utxo.vout, }; let out = OwnedOutput { blockheight: blkheight, tweak: tweak.to_be_bytes(), amount: utxo.value, script: utxo.scriptpubkey, label, spend_status: OutputSpendStatus::Unspent, }; res.insert(outpoint, out); } } } } Ok(res) } async fn process_block_inputs( &self, blkheight: Height, spent_filter: FilterData, ) -> Result> { let mut res = HashSet::new(); let blkhash = spent_filter.block_hash; // first get the 8-byte hashes used to construct the input filter let input_hashes_map = self.get_input_hashes(blkhash)?; // check against filter let blkfilter = BlockFilter::new(&spent_filter.data); let matched_inputs = self.check_block_inputs( blkfilter, blkhash, input_hashes_map.keys().cloned().collect(), )?; // if match: download spent data, collect the outpoints that are spent if matched_inputs { info!("matched inputs on: {}", blkheight); let spent = self.backend.spent_index(blkheight).await?.data; for spent in spent { let hex: &[u8] = spent.as_ref(); if let Some(outpoint) = input_hashes_map.get(hex) { res.insert(*outpoint); } } } Ok(res) } fn get_block_data_stream( &self, range: std::ops::RangeInclusive, dust_limit: Amount, with_cutthrough: bool, ) -> std::pin::Pin> + Send>> { self.backend .get_block_data_for_range(range, dust_limit, with_cutthrough) } fn should_interrupt(&self) -> bool { !self .keep_scanning .load(std::sync::atomic::Ordering::Relaxed) } fn save_state(&mut self) -> Result<()> { self.updater.save_to_persistent_storage() } fn record_outputs( &mut self, height: Height, block_hash: BlockHash, outputs: HashMap, ) -> Result<()> { self.updater .record_block_outputs(height, block_hash, outputs) } fn record_inputs( &mut self, height: Height, block_hash: BlockHash, inputs: HashSet, ) -> Result<()> { self.updater.record_block_inputs(height, block_hash, inputs) } fn record_progress(&mut self, start: Height, current: Height, end: Height) -> Result<()> { self.updater.record_scan_progress(start, current, end) } fn client(&self) -> &SpClient { &self.client } fn backend(&self) -> &dyn ChainBackend { self.backend.as_ref() } fn updater(&mut self) -> &mut dyn Updater { self.updater.as_mut() } // Override the default get_input_hashes implementation to use owned_outpoints fn get_input_hashes(&self, blkhash: BlockHash) -> Result> { let mut map: HashMap<[u8; 8], OutPoint> = HashMap::new(); for outpoint in &self.owned_outpoints { let mut arr = [0u8; 68]; arr[..32].copy_from_slice(&outpoint.txid.to_raw_hash().to_byte_array()); arr[32..36].copy_from_slice(&outpoint.vout.to_le_bytes()); arr[36..].copy_from_slice(&blkhash.to_byte_array()); let hash = sha256::Hash::hash(&arr); let mut res = [0u8; 8]; res.copy_from_slice(&hash[..8]); map.insert(res, outpoint.clone()); } Ok(map) } } pub async fn scan_blocks(mut n_blocks_to_scan: u32, blindbit_url: &str) -> anyhow::Result<()> { log::info!("Starting a rescan"); // Get all the data we need upfront, before any async operations let (sp_wallet, scan_height, tip_height) = { let sp_wallet = WALLET .get() .ok_or(Error::msg("Wallet not initialized"))? .lock_anyhow()?; let scan_height = sp_wallet.get_last_scan(); let tip_height: u32 = CHAIN_TIP.load(Ordering::Relaxed).try_into()?; (sp_wallet.clone(), scan_height, tip_height) }; // 0 means scan to tip if n_blocks_to_scan == 0 { n_blocks_to_scan = tip_height - scan_height; } let start = scan_height + 1; let end = if scan_height + n_blocks_to_scan <= tip_height { scan_height + n_blocks_to_scan } else { tip_height }; if start > end { return Ok(()); } let updater = StateUpdater::new(); let backend = BlindbitBackend::new(blindbit_url.to_string())?; let owned_outpoints = sp_wallet.get_unspent_outputs().keys().map(|o| *o).collect(); let keep_scanning = Arc::new(AtomicBool::new(true)); log::info!("start: {} end: {}", start, end); let start_time = Instant::now(); let mut scanner = NativeSpScanner::new( sp_wallet.get_sp_client().clone(), Box::new(updater), Box::new(backend), owned_outpoints, &keep_scanning, ); let dust_limit = Amount::from_sat(0); // We don't really have a dust limit for this use case scanner .scan_blocks( Height::from_consensus(start)?, Height::from_consensus(end)?, dust_limit, WITH_CUTTHROUGH, ) .await?; // time elapsed for the scan log::info!( "Scan complete in {} seconds", start_time.elapsed().as_secs() ); Ok(()) }