diff --git a/.cargo/config.toml b/.cargo/config.toml new file mode 100644 index 0000000..86e8b12 --- /dev/null +++ b/.cargo/config.toml @@ -0,0 +1,5 @@ +[build] +target = "wasm32-unknown-unknown" + +[target.wasm32-unknown-unknown] +rustflags = ["--cfg", "web_sys_unstable_apis"] diff --git a/Cargo.toml b/Cargo.toml index 4a33142..c7ad497 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,6 +9,7 @@ crate-type = ["lib", "cdylib"] [dependencies] anyhow = "1.0" +async-trait = "0.1" serde = { version = "1.0.188", features = ["derive"] } serde_json = "1.0" wasm-bindgen = "0.2.91" @@ -16,9 +17,10 @@ getrandom = { version="0.2.12", features = ["js"] } wasm-logger = "0.2.0" rand = "0.8.5" tsify = { git = "https://github.com/Sosthene00/tsify", branch = "next" } -# sdk_common = { path = "../sdk_common" } -sdk_common = { git = "https://git.4nkweb.com/4nk/sdk_common.git", branch = "dev" } +sdk_common = { git = "https://git.4nkweb.com/4nk/sdk_common.git", branch = "dev", features = ["blindbit-wasm"] } serde-wasm-bindgen = "0.6.5" +futures-util = { version = "0.3.28", default-features = false, features = ["sink", "std"] } +web-time = "1.1.0" [dev-dependencies] wasm-bindgen-test = "0.3" diff --git a/src/api.rs b/src/api.rs index 0edc048..fe6d023 100644 --- a/src/api.rs +++ b/src/api.rs @@ -6,7 +6,7 @@ use std::ops::Index; use std::str::FromStr; use std::string::FromUtf8Error; use std::sync::{Mutex, MutexGuard, OnceLock, PoisonError}; -use std::time::{Duration, Instant}; +use web_time::{Duration, Instant}; use std::u32; use rand::{thread_rng, Fill, Rng, RngCore}; @@ -15,6 +15,7 @@ use sdk_common::aes_gcm::aes::cipher::ArrayLength; use sdk_common::aes_gcm::Nonce; use sdk_common::hash::AnkPcdHash; use sdk_common::log::{self, debug, info, warn}; +use sdk_common::backend_blindbit_wasm::wasm_bindgen_futures; use anyhow::{anyhow, Context}; use anyhow::Error as AnyhowError; @@ -68,12 +69,12 @@ use sdk_common::pcd::{ DataType, FileBlob, Member, Pcd, PcdCommitments, RoleDefinition, Roles, ValidationRule, PCD_VERSION, PcdSerializable }; use sdk_common::prd::{AnkPrdHash, Prd, PrdType}; -use sdk_common::silentpayments::{create_transaction as internal_create_transaction, sign_transaction as internal_sign_tx, TsUnsignedTransaction}; +use sdk_common::silentpayments::{create_transaction as internal_create_transaction, sign_transaction as internal_sign_tx, SpWallet, TsUnsignedTransaction}; use sdk_common::sp_client::{FeeRate, OutputSpendStatus, OwnedOutput, Recipient, RecipientAddress, SilentPaymentUnsignedTransaction, SpClient, SpendKey}; use sdk_common::secrets::SecretsStore; use crate::user::{lock_local_device, set_new_device, LOCAL_DEVICE}; -use crate::wallet::{generate_sp_wallet, lock_freezed_utxos, scan_blocks}; +use crate::wallet::{generate_sp_wallet, lock_freezed_utxos}; const EMPTYSTATEID: &str = "0000000000000000000000000000000000000000000000000000000000000000"; @@ -293,6 +294,19 @@ pub fn create_new_device(birthday: u32, network_str: String) -> ApiResult ApiResult<()> { + let local_device = lock_local_device()?; + + let sp_wallet = local_device.get_sp_wallet(); + + let last_scan = sp_wallet.get_last_scan(); + let n_blocks_to_scan = tip_height - last_scan; + crate::wallet::scan_blocks(n_blocks_to_scan, &blindbit_url, sp_wallet, tip_height, last_scan).await?; + + Ok(()) +} + #[wasm_bindgen] pub fn is_paired() -> ApiResult { let local_device = lock_local_device()?; @@ -456,6 +470,7 @@ pub fn get_opreturn(transaction: String) -> ApiResult { #[wasm_bindgen] pub fn process_commit_new_state(mut process: Process, state_id: String, new_tip: String) -> ApiResult { let state_id_array: [u8; 32] = Vec::from_hex(&state_id)?.try_into().unwrap(); + let new_tip = OutPoint::from_str(&new_tip)?; let new_state: ProcessState; if let Ok(commited_state) = process.get_state_for_id(&state_id_array) { new_state = commited_state.clone(); @@ -469,7 +484,7 @@ pub fn process_commit_new_state(mut process: Process, state_id: String, new_tip: process.remove_all_concurrent_states()?; process.insert_concurrent_state(new_state)?; - process.update_states_tip(OutPoint::from_str(&new_tip)?)?; + process.update_states_tip(new_tip)?; Ok(process) } diff --git a/src/lib.rs b/src/lib.rs index 63c7a6f..b2c96a4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -6,3 +6,6 @@ pub mod api; mod peers; mod user; mod wallet; +mod scanner; + +const WITH_CUTTHROUGH: bool = true; diff --git a/src/scanner.rs b/src/scanner.rs new file mode 100644 index 0000000..ad5a627 --- /dev/null +++ b/src/scanner.rs @@ -0,0 +1,306 @@ +use std::{collections::{HashMap, HashSet}, sync::atomic::AtomicBool}; +use web_time::{Duration, Instant}; + +use anyhow::{bail, Result}; +use futures_util::Stream; +use sdk_common::{backend_blindbit_wasm::{ChainBackend, SpScanner}, log::{self, info}, sp_client::{bitcoin::{absolute::Height, bip158::BlockFilter, hashes::{sha256, Hash}, secp256k1::PublicKey, Amount, BlockHash, OutPoint}, BlockData, FilterData, OutputSpendStatus, OwnedOutput, SpClient, Updater}}; + +pub struct WasmSpScanner<'a> { + updater: Box, + backend: Box, + client: SpClient, + keep_scanning: &'a AtomicBool, // used to interrupt scanning + owned_outpoints: HashSet, // used to scan block inputs +} + +impl<'a> WasmSpScanner<'a> { + pub fn new( + client: SpClient, + updater: Box, + backend: Box, + owned_outpoints: HashSet, + keep_scanning: &'a AtomicBool, + ) -> Self { + log::info!("Creating WasmSpScanner"); + Self { + client, + updater, + backend, + owned_outpoints, + keep_scanning, + } + } + + pub async fn process_blocks( + &mut self, + start: Height, + end: Height, + block_data_stream: impl Stream> + Unpin , + ) -> Result<()> { + use futures_util::StreamExt; + + let mut update_time = Instant::now(); + let mut stream = block_data_stream; + + while let Some(blockdata) = stream.next().await { + let blockdata = blockdata?; + let blkheight = blockdata.blkheight; + let blkhash = blockdata.blkhash; + + // stop scanning and return if interrupted + if self.should_interrupt() { + self.save_state()?; + return Ok(()); + } + + let mut save_to_storage = false; + + // always save on last block or after 30 seconds since last save + if blkheight == end || update_time.elapsed() > Duration::from_secs(30) { + save_to_storage = true; + } + + let (found_outputs, found_inputs) = self.process_block(blockdata).await?; + + if !found_outputs.is_empty() { + save_to_storage = true; + self.record_outputs(blkheight, blkhash, found_outputs)?; + } + + if !found_inputs.is_empty() { + save_to_storage = true; + self.record_inputs(blkheight, blkhash, found_inputs)?; + } + + // tell the updater we scanned this block + self.record_progress(start, blkheight, end)?; + + if save_to_storage { + self.save_state()?; + update_time = Instant::now(); + } + } + + Ok(()) + } +} + +#[async_trait::async_trait(?Send)] +impl<'a> SpScanner for WasmSpScanner<'a> { + async fn scan_blocks( + &mut self, + start: Height, + end: Height, + dust_limit: Amount, + with_cutthrough: bool, + ) -> Result<()> { + if start > end { + bail!("bigger start than end: {} > {}", start, end); + } + + info!("start: {} end: {}", start, end); + let start_time= web_time::Instant::now(); + + // get block data stream + let range = start.to_consensus_u32()..=end.to_consensus_u32(); + let block_data_stream = self.get_block_data_stream(range, dust_limit, with_cutthrough); + + // process blocks using block data stream + self.process_blocks(start, end, block_data_stream).await?; + + // time elapsed for the scan + info!( + "Blindbit scan complete in {} seconds", + start_time.elapsed().as_secs() + ); + + Ok(()) + } + + async fn process_block( + &mut self, + blockdata: BlockData, + ) -> Result<(HashMap, HashSet)> { + let BlockData { + blkheight, + tweaks, + new_utxo_filter, + spent_filter, + .. + } = blockdata; + + let outs = self + .process_block_outputs(blkheight, tweaks, new_utxo_filter) + .await?; + + // after processing outputs, we add the found outputs to our list + self.owned_outpoints.extend(outs.keys()); + + let ins = self.process_block_inputs(blkheight, spent_filter).await?; + + // after processing inputs, we remove the found inputs + self.owned_outpoints.retain(|item| !ins.contains(item)); + + Ok((outs, ins)) + } + + async fn process_block_outputs( + &self, + blkheight: Height, + tweaks: Vec, + new_utxo_filter: FilterData, + ) -> Result> { + let mut res = HashMap::new(); + + if !tweaks.is_empty() { + let secrets_map = self.client.get_script_to_secret_map(tweaks)?; + + //last_scan = last_scan.max(n as u32); + let candidate_spks: Vec<&[u8; 34]> = secrets_map.keys().collect(); + + //get block gcs & check match + let blkfilter = BlockFilter::new(&new_utxo_filter.data); + let blkhash = new_utxo_filter.block_hash; + + let matched_outputs = Self::check_block_outputs(blkfilter, blkhash, candidate_spks)?; + + //if match: fetch and scan utxos + if matched_outputs { + info!("matched outputs on: {}", blkheight); + let found = self.scan_utxos(blkheight, secrets_map).await?; + + if !found.is_empty() { + for (label, utxo, tweak) in found { + let outpoint = OutPoint { + txid: utxo.txid, + vout: utxo.vout, + }; + + let out = OwnedOutput { + blockheight: blkheight, + tweak: tweak.to_be_bytes(), + amount: utxo.value, + script: utxo.scriptpubkey, + label, + spend_status: OutputSpendStatus::Unspent, + }; + + res.insert(outpoint, out); + } + } + } + } + Ok(res) + } + + async fn process_block_inputs( + &self, + blkheight: Height, + spent_filter: FilterData, + ) -> Result> { + let mut res = HashSet::new(); + + let blkhash = spent_filter.block_hash; + + // first get the 8-byte hashes used to construct the input filter + let input_hashes_map = self.get_input_hashes(blkhash)?; + + // check against filter + let blkfilter = BlockFilter::new(&spent_filter.data); + let matched_inputs = self.check_block_inputs( + blkfilter, + blkhash, + input_hashes_map.keys().cloned().collect(), + )?; + + // if match: download spent data, collect the outpoints that are spent + if matched_inputs { + info!("matched inputs on: {}", blkheight); + let spent = self.backend.spent_index(blkheight).await?.data; + + for spent in spent { + let hex: &[u8] = spent.as_ref(); + + if let Some(outpoint) = input_hashes_map.get(hex) { + res.insert(*outpoint); + } + } + } + Ok(res) + } + + fn get_block_data_stream( + &self, + range: std::ops::RangeInclusive, + dust_limit: Amount, + with_cutthrough: bool, + ) -> std::pin::Pin>>> { + self.backend + .get_block_data_for_range(range, dust_limit, with_cutthrough) + } + + fn should_interrupt(&self) -> bool { + !self + .keep_scanning + .load(std::sync::atomic::Ordering::Relaxed) + } + + fn save_state(&mut self) -> Result<()> { + self.updater.save_to_persistent_storage() + } + + fn record_outputs( + &mut self, + height: Height, + block_hash: BlockHash, + outputs: HashMap, + ) -> Result<()> { + self.updater + .record_block_outputs(height, block_hash, outputs) + } + + fn record_inputs( + &mut self, + height: Height, + block_hash: BlockHash, + inputs: HashSet, + ) -> Result<()> { + self.updater.record_block_inputs(height, block_hash, inputs) + } + + fn record_progress(&mut self, start: Height, current: Height, end: Height) -> Result<()> { + self.updater.record_scan_progress(start, current, end) + } + + fn client(&self) -> &SpClient { + &self.client + } + + fn backend(&self) -> &dyn ChainBackend { + self.backend.as_ref() + } + + fn updater(&mut self) -> &mut dyn Updater { + self.updater.as_mut() + } + + // Override the default get_input_hashes implementation to use owned_outpoints + fn get_input_hashes(&self, blkhash: BlockHash) -> Result> { + let mut map: HashMap<[u8; 8], OutPoint> = HashMap::new(); + + for outpoint in &self.owned_outpoints { + let mut arr = [0u8; 68]; + arr[..32].copy_from_slice(&outpoint.txid.to_raw_hash().to_byte_array()); + arr[32..36].copy_from_slice(&outpoint.vout.to_le_bytes()); + arr[36..].copy_from_slice(&blkhash.to_byte_array()); + let hash = sha256::Hash::hash(&arr); + + let mut res = [0u8; 8]; + res.copy_from_slice(&hash[..8]); + + map.insert(res, outpoint.clone()); + } + + Ok(map) + } +} diff --git a/src/wallet.rs b/src/wallet.rs index 54f62eb..42e5703 100644 --- a/src/wallet.rs +++ b/src/wallet.rs @@ -1,17 +1,18 @@ use std::{ collections::HashSet, - sync::{Mutex, MutexGuard, OnceLock}, + sync::{atomic::AtomicBool, Arc, Mutex, MutexGuard, OnceLock}, }; +use web_time::Instant; use anyhow::Error; use rand::Rng; -use sdk_common::sp_client::{ - bitcoin::{secp256k1::SecretKey, Network, OutPoint}, +use sdk_common::{backend_blindbit_wasm::{BlindbitBackend, SpScanner}, log, silentpayments::SpWallet, sp_client::{ + bitcoin::{absolute::Height, secp256k1::SecretKey, Amount, Network, OutPoint}, silentpayments::SilentPaymentAddress, SpClient, SpendKey, -}; +}, updates::StateUpdater}; -use crate::MutexExt; +use crate::{scanner::WasmSpScanner, MutexExt, WITH_CUTTHROUGH}; pub static FREEZED_UTXOS: OnceLock>> = OnceLock::new(); @@ -29,3 +30,59 @@ pub fn generate_sp_wallet(network: Network) -> anyhow::Result { network, ) } + +pub async fn scan_blocks(mut n_blocks_to_scan: u32, blindbit_url: &str, sp_wallet: &SpWallet, tip_height: u32, scan_height: u32) -> anyhow::Result<()> { + log::info!("Starting a rescan"); + + // 0 means scan to tip + if n_blocks_to_scan == 0 { + n_blocks_to_scan = tip_height - scan_height; + } + + let start = scan_height + 1; + let end = if scan_height + n_blocks_to_scan <= tip_height { + scan_height + n_blocks_to_scan + } else { + tip_height + }; + + if start > end { + return Ok(()); + } + + let updater = StateUpdater::new(); + let backend = BlindbitBackend::new(blindbit_url.to_string())?; + + let owned_outpoints = sp_wallet.get_unspent_outputs().keys().map(|o| *o).collect(); + + let keep_scanning = Arc::new(AtomicBool::new(true)); + + log::info!("start: {} end: {}", start, end); + let start_time = Instant::now(); + log::info!("{:?}", start_time); + let mut scanner = WasmSpScanner::new( + sp_wallet.get_sp_client().clone(), + Box::new(updater), + Box::new(backend), + owned_outpoints, + &keep_scanning, + ); + + let dust_limit = Amount::from_sat(0); // We don't really have a dust limit for this use case + scanner + .scan_blocks( + Height::from_consensus(start)?, + Height::from_consensus(end)?, + dust_limit, + WITH_CUTTHROUGH, + ) + .await?; + + // time elapsed for the scan + log::info!( + "Scan complete in {} seconds", + start_time.elapsed().as_secs() + ); + + Ok(()) +}