Refactor scan_blocks to prevent potential deadlocks

This commit is contained in:
Sosthene 2025-08-25 01:05:56 +02:00
parent 188bc9b179
commit 2da5bf7a71
2 changed files with 92 additions and 17 deletions

View File

@ -6,6 +6,8 @@ use std::ops::Index;
use std::str::FromStr;
use std::string::FromUtf8Error;
use std::sync::{Mutex, MutexGuard, OnceLock, PoisonError};
use futures_util::StreamExt;
use sdk_common::updates::{init_update_sink, StateUpdate, WasmUpdateSink};
use web_time::{Duration, Instant};
use std::u32;
@ -296,13 +298,90 @@ pub fn create_new_device(birthday: u32, network_str: String) -> ApiResult<String
#[wasm_bindgen]
pub async fn scan_blocks(tip_height: u32, blindbit_url: String) -> ApiResult<()> {
let (sink, mut scan_rx, mut state_rx) = WasmUpdateSink::new();
// Initialize the global sink
init_update_sink(sink);
wasm_bindgen_futures::spawn_local(async move {
while let Some(progress) = scan_rx.next().await {
log::info!(
"Scan progress: {}/{} ({}%)",
progress.current,
progress.end,
(progress.current * 100) / progress.end
);
}
});
wasm_bindgen_futures::spawn_local(async move {
while let Some(update) = state_rx.next().await {
match update {
StateUpdate::Update {
blkheight,
blkhash,
found_outputs,
found_inputs,
} => {
log::info!(
"Processing update at height {}: {} outputs, {} inputs",
blkheight, found_outputs.len(), found_inputs.len()
);
// Actually update the device's wallet state in WASM context
if let Ok(mut local_device) = lock_local_device() {
// Get direct access to the device's outputs to modify them
let device_outputs = local_device.get_mut_outputs();
// Mark found inputs as spent
for outpoint in found_inputs {
if let Some(output) = device_outputs.get_mut(&outpoint) {
output.spend_status = OutputSpendStatus::Spent(blkhash.to_byte_array());
} else {
// This should never happen, but we'll log it just in case
log::error!("Found a spent input that we don't know about: {}", outpoint.to_string());
}
}
// Add new found outputs to the device
device_outputs.extend(found_outputs);
// update last_scan
local_device.get_mut_sp_wallet().set_last_scan(blkheight.to_consensus_u32());
log::debug!("Updated device outputs and marked inputs as spent");
} else {
log::error!("Failed to lock local device for state update");
}
log::debug!("State update processed at height: {}", blkheight);
}
StateUpdate::NoUpdate { blkheight } => {
log::debug!("No update at height: {}", blkheight);
// We still need to update last_scan
if let Ok(mut local_device) = lock_local_device() {
let sp_wallet = local_device.get_mut_sp_wallet();
sp_wallet.set_last_scan(blkheight.to_consensus_u32());
} else {
log::error!("Failed to lock local device for last_scan update");
}
}
}
}
});
let (sp_client, owned_outpoints, last_scan) = {
let local_device = lock_local_device()?;
let owned_outpoints: HashSet<OutPoint> = local_device.get_sp_wallet().get_unspent_outputs().keys().map(|o| *o).collect();
let sp_client = local_device.get_sp_client().clone();
let last_scan = local_device.get_sp_wallet().get_last_scan();
let sp_wallet = local_device.get_sp_wallet();
(sp_client, owned_outpoints, last_scan)
};
let last_scan = sp_wallet.get_last_scan();
let n_blocks_to_scan = tip_height - last_scan;
crate::wallet::scan_blocks(n_blocks_to_scan, &blindbit_url, sp_wallet, tip_height, last_scan).await?;
crate::wallet::scan_blocks(n_blocks_to_scan, &blindbit_url, sp_client, owned_outpoints, tip_height, last_scan, 10).await?;
Ok(())
}

View File

@ -31,9 +31,15 @@ pub fn generate_sp_wallet(network: Network) -> anyhow::Result<SpClient> {
)
}
pub async fn scan_blocks(mut n_blocks_to_scan: u32, blindbit_url: &str, sp_wallet: &SpWallet, tip_height: u32, scan_height: u32) -> anyhow::Result<()> {
log::info!("Starting a rescan");
pub async fn scan_blocks(
mut n_blocks_to_scan: u32,
blindbit_url: &str,
sp_client: SpClient,
owned_outpoints: HashSet<OutPoint>,
tip_height: u32,
scan_height: u32,
save_interval: u32,
) -> anyhow::Result<()> {
// 0 means scan to tip
if n_blocks_to_scan == 0 {
n_blocks_to_scan = tip_height - scan_height;
@ -53,15 +59,11 @@ pub async fn scan_blocks(mut n_blocks_to_scan: u32, blindbit_url: &str, sp_walle
let updater = StateUpdater::new();
let backend = BlindbitBackend::new(blindbit_url.to_string())?;
let owned_outpoints = sp_wallet.get_unspent_outputs().keys().map(|o| *o).collect();
let keep_scanning = Arc::new(AtomicBool::new(true));
log::info!("start: {} end: {}", start, end);
let start_time = Instant::now();
log::info!("{:?}", start_time);
let mut scanner = WasmSpScanner::new(
sp_wallet.get_sp_client().clone(),
sp_client,
Box::new(updater),
Box::new(backend),
owned_outpoints,
@ -78,11 +80,5 @@ pub async fn scan_blocks(mut n_blocks_to_scan: u32, blindbit_url: &str, sp_walle
)
.await?;
// time elapsed for the scan
log::info!(
"Scan complete in {} seconds",
start_time.elapsed().as_secs()
);
Ok(())
}