diff --git a/src/api.rs b/src/api.rs index fe6d023..1149277 100644 --- a/src/api.rs +++ b/src/api.rs @@ -6,6 +6,8 @@ use std::ops::Index; use std::str::FromStr; use std::string::FromUtf8Error; use std::sync::{Mutex, MutexGuard, OnceLock, PoisonError}; +use futures_util::StreamExt; +use sdk_common::updates::{init_update_sink, StateUpdate, WasmUpdateSink}; use web_time::{Duration, Instant}; use std::u32; @@ -296,13 +298,90 @@ pub fn create_new_device(birthday: u32, network_str: String) -> ApiResult ApiResult<()> { + let (sink, mut scan_rx, mut state_rx) = WasmUpdateSink::new(); + + // Initialize the global sink + init_update_sink(sink); + + wasm_bindgen_futures::spawn_local(async move { + while let Some(progress) = scan_rx.next().await { + log::info!( + "Scan progress: {}/{} ({}%)", + progress.current, + progress.end, + (progress.current * 100) / progress.end + ); + } + }); + + wasm_bindgen_futures::spawn_local(async move { + while let Some(update) = state_rx.next().await { + match update { + StateUpdate::Update { + blkheight, + blkhash, + found_outputs, + found_inputs, + } => { + log::info!( + "Processing update at height {}: {} outputs, {} inputs", + blkheight, found_outputs.len(), found_inputs.len() + ); + + // Actually update the device's wallet state in WASM context + if let Ok(mut local_device) = lock_local_device() { + // Get direct access to the device's outputs to modify them + let device_outputs = local_device.get_mut_outputs(); + + // Mark found inputs as spent + for outpoint in found_inputs { + if let Some(output) = device_outputs.get_mut(&outpoint) { + output.spend_status = OutputSpendStatus::Spent(blkhash.to_byte_array()); + } else { + // This should never happen, but we'll log it just in case + log::error!("Found a spent input that we don't know about: {}", outpoint.to_string()); + } + } + + // Add new found outputs to the device + device_outputs.extend(found_outputs); + + // update last_scan + local_device.get_mut_sp_wallet().set_last_scan(blkheight.to_consensus_u32()); + + log::debug!("Updated device outputs and marked inputs as spent"); + } else { + log::error!("Failed to lock local device for state update"); + } + + log::debug!("State update processed at height: {}", blkheight); + } + StateUpdate::NoUpdate { blkheight } => { + log::debug!("No update at height: {}", blkheight); + // We still need to update last_scan + if let Ok(mut local_device) = lock_local_device() { + let sp_wallet = local_device.get_mut_sp_wallet(); + sp_wallet.set_last_scan(blkheight.to_consensus_u32()); + } else { + log::error!("Failed to lock local device for last_scan update"); + } + } + } + } + }); + + let (sp_client, owned_outpoints, last_scan) = { let local_device = lock_local_device()?; + let owned_outpoints: HashSet = local_device.get_sp_wallet().get_unspent_outputs().keys().map(|o| *o).collect(); + let sp_client = local_device.get_sp_client().clone(); + let last_scan = local_device.get_sp_wallet().get_last_scan(); - let sp_wallet = local_device.get_sp_wallet(); + (sp_client, owned_outpoints, last_scan) + }; - let last_scan = sp_wallet.get_last_scan(); let n_blocks_to_scan = tip_height - last_scan; - crate::wallet::scan_blocks(n_blocks_to_scan, &blindbit_url, sp_wallet, tip_height, last_scan).await?; + + crate::wallet::scan_blocks(n_blocks_to_scan, &blindbit_url, sp_client, owned_outpoints, tip_height, last_scan, 10).await?; Ok(()) } diff --git a/src/wallet.rs b/src/wallet.rs index 42e5703..d9c90db 100644 --- a/src/wallet.rs +++ b/src/wallet.rs @@ -31,9 +31,15 @@ pub fn generate_sp_wallet(network: Network) -> anyhow::Result { ) } -pub async fn scan_blocks(mut n_blocks_to_scan: u32, blindbit_url: &str, sp_wallet: &SpWallet, tip_height: u32, scan_height: u32) -> anyhow::Result<()> { - log::info!("Starting a rescan"); - +pub async fn scan_blocks( + mut n_blocks_to_scan: u32, + blindbit_url: &str, + sp_client: SpClient, + owned_outpoints: HashSet, + tip_height: u32, + scan_height: u32, + save_interval: u32, +) -> anyhow::Result<()> { // 0 means scan to tip if n_blocks_to_scan == 0 { n_blocks_to_scan = tip_height - scan_height; @@ -53,15 +59,11 @@ pub async fn scan_blocks(mut n_blocks_to_scan: u32, blindbit_url: &str, sp_walle let updater = StateUpdater::new(); let backend = BlindbitBackend::new(blindbit_url.to_string())?; - let owned_outpoints = sp_wallet.get_unspent_outputs().keys().map(|o| *o).collect(); - let keep_scanning = Arc::new(AtomicBool::new(true)); - log::info!("start: {} end: {}", start, end); let start_time = Instant::now(); - log::info!("{:?}", start_time); let mut scanner = WasmSpScanner::new( - sp_wallet.get_sp_client().clone(), + sp_client, Box::new(updater), Box::new(backend), owned_outpoints, @@ -78,11 +80,5 @@ pub async fn scan_blocks(mut n_blocks_to_scan: u32, blindbit_url: &str, sp_walle ) .await?; - // time elapsed for the scan - log::info!( - "Scan complete in {} seconds", - start_time.elapsed().as_secs() - ); - Ok(()) }