diff --git a/.conf.model b/.conf.model index 3c8e8fe..2c4c80e 100644 --- a/.conf.model +++ b/.conf.model @@ -5,3 +5,4 @@ network="signet" electrum_url="tcp://localhost:60601" blindbit_url="tcp://localhost:8000" zmq_url="" +blindbit_enabled=false diff --git a/src/config.rs b/src/config.rs index 31cfd5d..f8604fa 100644 --- a/src/config.rs +++ b/src/config.rs @@ -14,6 +14,7 @@ pub struct Config { pub wallet_name: String, pub network: Network, pub blindbit_url: String, + pub blindbit_enabled: bool, pub zmq_url: String, pub data_dir: String, pub cookie_path: Option, @@ -68,6 +69,10 @@ impl Config { .remove("blindbit_url") .ok_or(Error::msg("No \"blindbit_url\""))? .to_owned(), + blindbit_enabled: file_content + .remove("blindbit_enabled") + .map(|s| s.to_lowercase() == "true") + .unwrap_or(true), // Par défaut activé zmq_url: file_content .remove("zmq_url") .ok_or(Error::msg("No \"zmq_url\""))? diff --git a/src/config.rs.backup b/src/config.rs.backup new file mode 100644 index 0000000..31cfd5d --- /dev/null +++ b/src/config.rs.backup @@ -0,0 +1,83 @@ +use std::collections::HashMap; +use std::fs::File; +use std::io::{self, BufRead}; + +use anyhow::{Error, Result}; + +use sdk_common::sp_client::bitcoin::Network; + +#[derive(Debug)] +pub struct Config { + pub core_url: String, + pub core_wallet: Option, + pub ws_url: String, + pub wallet_name: String, + pub network: Network, + pub blindbit_url: String, + pub zmq_url: String, + pub data_dir: String, + pub cookie_path: Option, +} + +impl Config { + pub fn read_from_file(filename: &str) -> Result { + let mut file_content = HashMap::new(); + if let Ok(file) = File::open(filename) { + let reader = io::BufReader::new(file); + + // Read the file line by line + for line in reader.lines() { + if let Ok(l) = line { + // Ignore comments and empty lines + if l.starts_with('#') || l.trim().is_empty() { + continue; + } + + // Split the line into key and value + if let Some((k, v)) = l.split_once('=') { + file_content.insert(k.to_owned(), v.trim_matches('\"').to_owned()); + } + } + } + } else { + return Err(anyhow::Error::msg("Failed to find conf file")); + } + + // Now set the Config + let config = Config { + core_url: file_content + .remove("core_url") + .ok_or(Error::msg("No \"core_url\""))? + .to_owned(), + core_wallet: file_content.remove("core_wallet").map(|s| s.to_owned()), + ws_url: file_content + .remove("ws_url") + .ok_or(Error::msg("No \"ws_url\""))? + .to_owned(), + wallet_name: file_content + .remove("wallet_name") + .ok_or(Error::msg("No \"wallet_name\""))? + .to_owned(), + network: Network::from_core_arg( + &file_content + .remove("network") + .ok_or(Error::msg("no \"network\""))? + .trim_matches('\"'), + )?, + blindbit_url: file_content + .remove("blindbit_url") + .ok_or(Error::msg("No \"blindbit_url\""))? + .to_owned(), + zmq_url: file_content + .remove("zmq_url") + .ok_or(Error::msg("No \"zmq_url\""))? + .to_owned(), + data_dir: file_content + .remove("data_dir") + .unwrap_or_else(|| ".4nk".to_string()), + cookie_path: file_content.remove("cookie_path").map(|s| s.to_owned()), + }; + + Ok(config) + } +} diff --git a/src/main.rs b/src/main.rs index 3ce3e4e..770b754 100644 --- a/src/main.rs +++ b/src/main.rs @@ -323,7 +323,7 @@ async fn handle_state_updates( } } -async fn handle_zmq(zmq_url: String, blindbit_url: String) { +async fn handle_zmq(zmq_url: String, blindbit_url: String, blindbit_enabled: bool) { debug!("Starting listening on Core"); let mut socket = zeromq::SubSocket::new(); socket.connect(&zmq_url).await.unwrap(); @@ -369,7 +369,7 @@ async fn handle_zmq(zmq_url: String, blindbit_url: String) { const RETRY_DELAY_MS: u64 = 1000; // 1 second initial delay loop { - match scan_blocks(0, &blindbit_url).await { + match scan_blocks(0, &blindbit_url, blindbit_enabled).await { Ok(_) => { debug!("Successfully scanned blocks after {} retries", retry_count); break; @@ -596,14 +596,14 @@ async fn main() -> Result<()> { if last_scan < current_tip { log::info!("Scanning for our outputs"); - scan_blocks(current_tip - last_scan, &config.blindbit_url).await?; + scan_blocks(current_tip - last_scan, &config.blindbit_url, config.blindbit_enabled).await?; } // Subscribe to Bitcoin Core let zmq_url = config.zmq_url.clone(); let blindbit_url = config.blindbit_url.clone(); tokio::spawn(async move { - handle_zmq(zmq_url, blindbit_url).await; + handle_zmq(zmq_url, blindbit_url, config.blindbit_enabled).await; }); // Create the event loop and TCP listener we'll accept connections on. diff --git a/src/scan.rs b/src/scan.rs index 556090e..6b01d4a 100644 --- a/src/scan.rs +++ b/src/scan.rs @@ -526,7 +526,7 @@ impl<'a> SpScanner for NativeSpScanner<'a> { } } -pub async fn scan_blocks(mut n_blocks_to_scan: u32, blindbit_url: &str) -> anyhow::Result<()> { +pub async fn scan_blocks(mut n_blocks_to_scan: u32, blindbit_url: &str, blindbit_enabled: bool) -> anyhow::Result<()> { log::info!("Starting a rescan"); // Get all the data we need upfront, before any async operations @@ -558,6 +558,10 @@ pub async fn scan_blocks(mut n_blocks_to_scan: u32, blindbit_url: &str) -> anyho let updater = StateUpdater::new(); let backend = BlindbitBackend::new(blindbit_url.to_string())?; + if !blindbit_enabled { + log::info!("Blindbit disabled, skipping block scanning"); + return Ok(()); + } let owned_outpoints = sp_wallet.get_unspent_outputs().keys().map(|o| *o).collect(); diff --git a/src/scan.rs.backup b/src/scan.rs.backup new file mode 100644 index 0000000..556090e --- /dev/null +++ b/src/scan.rs.backup @@ -0,0 +1,593 @@ +use std::collections::HashMap; +use std::collections::HashSet; +use std::str::FromStr; +use std::sync::atomic::AtomicBool; +use std::sync::atomic::Ordering; +use std::sync::Arc; +use std::sync::MutexGuard; + +use anyhow::bail; +use anyhow::{Error, Result}; +use bitcoincore_rpc::bitcoin::absolute::Height; +use bitcoincore_rpc::bitcoin::hashes::sha256; +use bitcoincore_rpc::bitcoin::hashes::Hash; +use bitcoincore_rpc::bitcoin::Amount; +use futures_util::Stream; +use log::info; +use sdk_common::backend_blindbit_native::BlindbitBackend; +use sdk_common::backend_blindbit_native::ChainBackend; +use sdk_common::backend_blindbit_native::SpScanner; +use sdk_common::silentpayments::SpWallet; +use sdk_common::sp_client::bitcoin::bip158::BlockFilter; +use sdk_common::sp_client::bitcoin::secp256k1::{All, PublicKey, Scalar, Secp256k1, SecretKey}; +use sdk_common::sp_client::bitcoin::{BlockHash, OutPoint, Transaction, TxOut, XOnlyPublicKey}; +use sdk_common::sp_client::silentpayments::receiving::Receiver; +use sdk_common::sp_client::silentpayments::utils::receiving::{ + calculate_tweak_data, get_pubkey_from_input, +}; +use sdk_common::sp_client::BlockData; +use sdk_common::sp_client::FilterData; +use sdk_common::sp_client::SpClient; +use sdk_common::sp_client::Updater; +use sdk_common::sp_client::{OutputSpendStatus, OwnedOutput}; +use sdk_common::updates::StateUpdater; +use tokio::time::Instant; + +use crate::CHAIN_TIP; +use crate::{MutexExt, DAEMON, STORAGE, WALLET, WITH_CUTTHROUGH}; + +pub fn compute_partial_tweak_to_transaction(tx: &Transaction) -> Result { + let daemon = DAEMON.get().ok_or(Error::msg("DAEMON not initialized"))?; + let mut outpoints: Vec<(String, u32)> = Vec::with_capacity(tx.input.len()); + let mut pubkeys: Vec = Vec::with_capacity(tx.input.len()); + // TODO we should cache transactions to prevent multiple rpc request when transaction spends multiple outputs from the same tx + for input in tx.input.iter() { + outpoints.push(( + input.previous_output.txid.to_string(), + input.previous_output.vout, + )); + let prev_tx = daemon + .lock_anyhow()? + .get_transaction(&input.previous_output.txid, None) + .map_err(|e| Error::msg(format!("Failed to find previous transaction: {}", e)))?; + + if let Some(output) = prev_tx.output.get(input.previous_output.vout as usize) { + match get_pubkey_from_input( + &input.script_sig.to_bytes(), + &input.witness.to_vec(), + &output.script_pubkey.to_bytes(), + ) { + Ok(Some(pubkey)) => pubkeys.push(pubkey), + Ok(None) => continue, + Err(e) => { + return Err(Error::msg(format!( + "Can't extract pubkey from input: {}", + e + ))) + } + } + } else { + return Err(Error::msg("Transaction with a non-existing input")); + } + } + + let input_pub_keys: Vec<&PublicKey> = pubkeys.iter().collect(); + let partial_tweak = calculate_tweak_data(&input_pub_keys, &outpoints)?; + Ok(partial_tweak) +} + +pub fn check_transaction_alone( + mut wallet: MutexGuard, + tx: &Transaction, + tweak_data: &PublicKey, +) -> Result> { + let updates = match wallet.update_with_transaction(tx, tweak_data, 0) { + Ok(updates) => updates, + Err(e) => { + log::debug!("Error while checking transaction: {}", e); + HashMap::new() + } + }; + + if updates.len() > 0 { + let storage = STORAGE + .get() + .ok_or_else(|| Error::msg("Failed to get STORAGE"))?; + storage + .lock_anyhow()? + .wallet_file + .save(&serde_json::to_value(wallet.clone())?)?; + } + + Ok(updates) +} + +fn check_block( + blkfilter: BlockFilter, + blkhash: BlockHash, + candidate_spks: Vec<&[u8; 34]>, + owned_spks: Vec>, +) -> Result { + // check output scripts + let mut scripts_to_match: Vec<_> = candidate_spks.into_iter().map(|spk| spk.as_ref()).collect(); + + // check input scripts + scripts_to_match.extend(owned_spks.iter().map(|spk| spk.as_slice())); + + // note: match will always return true for an empty query! + if !scripts_to_match.is_empty() { + Ok(blkfilter.match_any(&blkhash, &mut scripts_to_match.into_iter())?) + } else { + Ok(false) + } +} + +fn scan_block_outputs( + sp_receiver: &Receiver, + txdata: &Vec, + blkheight: u64, + spk2secret: HashMap<[u8; 34], PublicKey>, +) -> Result> { + let mut res: HashMap = HashMap::new(); + + // loop over outputs + for tx in txdata { + let txid = tx.txid(); + + // collect all taproot outputs from transaction + let p2tr_outs: Vec<(usize, &TxOut)> = tx + .output + .iter() + .enumerate() + .filter(|(_, o)| o.script_pubkey.is_p2tr()) + .collect(); + + if p2tr_outs.is_empty() { + continue; + }; // no taproot output + + let mut secret: Option = None; + // Does this transaction contains one of the outputs we already found? + for spk in p2tr_outs.iter().map(|(_, o)| &o.script_pubkey) { + if let Some(s) = spk2secret.get(spk.as_bytes()) { + // we might have at least one output in this transaction + secret = Some(*s); + break; + } + } + + if secret.is_none() { + continue; + }; // we don't have a secret that matches any of the keys + + // Now we can just run sp_receiver on all the p2tr outputs + let xonlykeys: Result> = p2tr_outs + .iter() + .map(|(_, o)| { + XOnlyPublicKey::from_slice(&o.script_pubkey.as_bytes()[2..]).map_err(Error::new) + }) + .collect(); + + let ours = sp_receiver.scan_transaction(&secret.unwrap(), xonlykeys?)?; + let height = Height::from_consensus(blkheight as u32)?; + for (label, map) in ours { + res.extend(p2tr_outs.iter().filter_map(|(i, o)| { + match XOnlyPublicKey::from_slice(&o.script_pubkey.as_bytes()[2..]) { + Ok(key) => { + if let Some(scalar) = map.get(&key) { + match SecretKey::from_slice(&scalar.to_be_bytes()) { + Ok(tweak) => { + let outpoint = OutPoint { + txid, + vout: *i as u32, + }; + return Some(( + outpoint, + OwnedOutput { + blockheight: height, + tweak: tweak.secret_bytes(), + amount: o.value, + script: o.script_pubkey.clone(), + label: label.clone(), + spend_status: OutputSpendStatus::Unspent, + }, + )); + } + Err(_) => { + return None; + } + } + } + None + } + Err(_) => None, + } + })); + } + } + Ok(res) +} + +fn scan_block_inputs( + our_outputs: &HashMap, + txdata: Vec, +) -> Result> { + let mut found = vec![]; + + for tx in txdata { + for input in tx.input { + let prevout = input.previous_output; + + if our_outputs.contains_key(&prevout) { + found.push(prevout); + } + } + } + Ok(found) +} + +pub struct NativeSpScanner<'a> { + updater: Box, + backend: Box, + client: SpClient, + keep_scanning: &'a AtomicBool, // used to interrupt scanning + owned_outpoints: HashSet, // used to scan block inputs +} + +impl<'a> NativeSpScanner<'a> { + pub fn new( + client: SpClient, + updater: Box, + backend: Box, + owned_outpoints: HashSet, + keep_scanning: &'a AtomicBool, + ) -> Self { + Self { + client, + updater, + backend, + owned_outpoints, + keep_scanning, + } + } + + pub async fn process_blocks( + &mut self, + start: Height, + end: Height, + block_data_stream: impl Stream> + Unpin + Send, + ) -> Result<()> { + use futures_util::StreamExt; + use std::time::{Duration, Instant}; + + let mut update_time = Instant::now(); + let mut stream = block_data_stream; + + while let Some(blockdata) = stream.next().await { + let blockdata = blockdata?; + let blkheight = blockdata.blkheight; + let blkhash = blockdata.blkhash; + + // stop scanning and return if interrupted + if self.should_interrupt() { + self.save_state()?; + return Ok(()); + } + + let mut save_to_storage = false; + + // always save on last block or after 30 seconds since last save + if blkheight == end || update_time.elapsed() > Duration::from_secs(30) { + save_to_storage = true; + } + + let (found_outputs, found_inputs) = self.process_block(blockdata).await?; + + if !found_outputs.is_empty() { + save_to_storage = true; + self.record_outputs(blkheight, blkhash, found_outputs)?; + } + + if !found_inputs.is_empty() { + save_to_storage = true; + self.record_inputs(blkheight, blkhash, found_inputs)?; + } + + // tell the updater we scanned this block + self.record_progress(start, blkheight, end)?; + + if save_to_storage { + self.save_state()?; + update_time = Instant::now(); + } + } + + Ok(()) + } +} + +#[async_trait::async_trait] +impl<'a> SpScanner for NativeSpScanner<'a> { + async fn scan_blocks( + &mut self, + start: Height, + end: Height, + dust_limit: Amount, + with_cutthrough: bool, + ) -> Result<()> { + if start > end { + bail!("bigger start than end: {} > {}", start, end); + } + + info!("start: {} end: {}", start, end); + let start_time: Instant = Instant::now(); + + // get block data stream + let range = start.to_consensus_u32()..=end.to_consensus_u32(); + let block_data_stream = self.get_block_data_stream(range, dust_limit, with_cutthrough); + + // process blocks using block data stream + self.process_blocks(start, end, block_data_stream).await?; + + // time elapsed for the scan + info!( + "Blindbit scan complete in {} seconds", + start_time.elapsed().as_secs() + ); + + Ok(()) + } + + async fn process_block( + &mut self, + blockdata: BlockData, + ) -> Result<(HashMap, HashSet)> { + let BlockData { + blkheight, + tweaks, + new_utxo_filter, + spent_filter, + .. + } = blockdata; + + let outs = self + .process_block_outputs(blkheight, tweaks, new_utxo_filter) + .await?; + + // after processing outputs, we add the found outputs to our list + self.owned_outpoints.extend(outs.keys()); + + let ins = self.process_block_inputs(blkheight, spent_filter).await?; + + // after processing inputs, we remove the found inputs + self.owned_outpoints.retain(|item| !ins.contains(item)); + + Ok((outs, ins)) + } + + async fn process_block_outputs( + &self, + blkheight: Height, + tweaks: Vec, + new_utxo_filter: FilterData, + ) -> Result> { + let mut res = HashMap::new(); + + if !tweaks.is_empty() { + let secrets_map = self.client.get_script_to_secret_map(tweaks)?; + + //last_scan = last_scan.max(n as u32); + let candidate_spks: Vec<&[u8; 34]> = secrets_map.keys().collect(); + + //get block gcs & check match + let blkfilter = BlockFilter::new(&new_utxo_filter.data); + let blkhash = new_utxo_filter.block_hash; + + let matched_outputs = Self::check_block_outputs(blkfilter, blkhash, candidate_spks)?; + + //if match: fetch and scan utxos + if matched_outputs { + info!("matched outputs on: {}", blkheight); + let found = self.scan_utxos(blkheight, secrets_map).await?; + + if !found.is_empty() { + for (label, utxo, tweak) in found { + let outpoint = OutPoint { + txid: utxo.txid, + vout: utxo.vout, + }; + + let out = OwnedOutput { + blockheight: blkheight, + tweak: tweak.to_be_bytes(), + amount: utxo.value, + script: utxo.scriptpubkey, + label, + spend_status: OutputSpendStatus::Unspent, + }; + + res.insert(outpoint, out); + } + } + } + } + Ok(res) + } + + async fn process_block_inputs( + &self, + blkheight: Height, + spent_filter: FilterData, + ) -> Result> { + let mut res = HashSet::new(); + + let blkhash = spent_filter.block_hash; + + // first get the 8-byte hashes used to construct the input filter + let input_hashes_map = self.get_input_hashes(blkhash)?; + + // check against filter + let blkfilter = BlockFilter::new(&spent_filter.data); + let matched_inputs = self.check_block_inputs( + blkfilter, + blkhash, + input_hashes_map.keys().cloned().collect(), + )?; + + // if match: download spent data, collect the outpoints that are spent + if matched_inputs { + info!("matched inputs on: {}", blkheight); + let spent = self.backend.spent_index(blkheight).await?.data; + + for spent in spent { + let hex: &[u8] = spent.as_ref(); + + if let Some(outpoint) = input_hashes_map.get(hex) { + res.insert(*outpoint); + } + } + } + Ok(res) + } + + fn get_block_data_stream( + &self, + range: std::ops::RangeInclusive, + dust_limit: Amount, + with_cutthrough: bool, + ) -> std::pin::Pin> + Send>> { + self.backend + .get_block_data_for_range(range, dust_limit, with_cutthrough) + } + + fn should_interrupt(&self) -> bool { + !self + .keep_scanning + .load(std::sync::atomic::Ordering::Relaxed) + } + + fn save_state(&mut self) -> Result<()> { + self.updater.save_to_persistent_storage() + } + + fn record_outputs( + &mut self, + height: Height, + block_hash: BlockHash, + outputs: HashMap, + ) -> Result<()> { + self.updater + .record_block_outputs(height, block_hash, outputs) + } + + fn record_inputs( + &mut self, + height: Height, + block_hash: BlockHash, + inputs: HashSet, + ) -> Result<()> { + self.updater.record_block_inputs(height, block_hash, inputs) + } + + fn record_progress(&mut self, start: Height, current: Height, end: Height) -> Result<()> { + self.updater.record_scan_progress(start, current, end) + } + + fn client(&self) -> &SpClient { + &self.client + } + + fn backend(&self) -> &dyn ChainBackend { + self.backend.as_ref() + } + + fn updater(&mut self) -> &mut dyn Updater { + self.updater.as_mut() + } + + // Override the default get_input_hashes implementation to use owned_outpoints + fn get_input_hashes(&self, blkhash: BlockHash) -> Result> { + let mut map: HashMap<[u8; 8], OutPoint> = HashMap::new(); + + for outpoint in &self.owned_outpoints { + let mut arr = [0u8; 68]; + arr[..32].copy_from_slice(&outpoint.txid.to_raw_hash().to_byte_array()); + arr[32..36].copy_from_slice(&outpoint.vout.to_le_bytes()); + arr[36..].copy_from_slice(&blkhash.to_byte_array()); + let hash = sha256::Hash::hash(&arr); + + let mut res = [0u8; 8]; + res.copy_from_slice(&hash[..8]); + + map.insert(res, outpoint.clone()); + } + + Ok(map) + } +} + +pub async fn scan_blocks(mut n_blocks_to_scan: u32, blindbit_url: &str) -> anyhow::Result<()> { + log::info!("Starting a rescan"); + + // Get all the data we need upfront, before any async operations + let (sp_wallet, scan_height, tip_height) = { + let sp_wallet = WALLET + .get() + .ok_or(Error::msg("Wallet not initialized"))? + .lock_anyhow()?; + let scan_height = sp_wallet.get_last_scan(); + let tip_height: u32 = CHAIN_TIP.load(Ordering::Relaxed).try_into()?; + (sp_wallet.clone(), scan_height, tip_height) + }; + + // 0 means scan to tip + if n_blocks_to_scan == 0 { + n_blocks_to_scan = tip_height - scan_height; + } + + let start = scan_height + 1; + let end = if scan_height + n_blocks_to_scan <= tip_height { + scan_height + n_blocks_to_scan + } else { + tip_height + }; + + if start > end { + return Ok(()); + } + + let updater = StateUpdater::new(); + let backend = BlindbitBackend::new(blindbit_url.to_string())?; + + let owned_outpoints = sp_wallet.get_unspent_outputs().keys().map(|o| *o).collect(); + + let keep_scanning = Arc::new(AtomicBool::new(true)); + + log::info!("start: {} end: {}", start, end); + let start_time = Instant::now(); + let mut scanner = NativeSpScanner::new( + sp_wallet.get_sp_client().clone(), + Box::new(updater), + Box::new(backend), + owned_outpoints, + &keep_scanning, + ); + + let dust_limit = Amount::from_sat(0); // We don't really have a dust limit for this use case + scanner + .scan_blocks( + Height::from_consensus(start)?, + Height::from_consensus(end)?, + dust_limit, + WITH_CUTTHROUGH, + ) + .await?; + + // time elapsed for the scan + log::info!( + "Scan complete in {} seconds", + start_time.elapsed().as_secs() + ); + + Ok(()) +}