Compare commits
No commits in common. "c3409c4460886ec0d452375d24495422710e70cd" and "188bc9b179396878f09abd10db89f8c97d576f26" have entirely different histories.
c3409c4460
...
188bc9b179
87
src/api.rs
87
src/api.rs
@ -6,8 +6,6 @@ use std::ops::Index;
|
|||||||
use std::str::FromStr;
|
use std::str::FromStr;
|
||||||
use std::string::FromUtf8Error;
|
use std::string::FromUtf8Error;
|
||||||
use std::sync::{Mutex, MutexGuard, OnceLock, PoisonError};
|
use std::sync::{Mutex, MutexGuard, OnceLock, PoisonError};
|
||||||
use futures_util::StreamExt;
|
|
||||||
use sdk_common::updates::{init_update_sink, StateUpdate, WasmUpdateSink};
|
|
||||||
use web_time::{Duration, Instant};
|
use web_time::{Duration, Instant};
|
||||||
use std::u32;
|
use std::u32;
|
||||||
|
|
||||||
@ -298,90 +296,13 @@ pub fn create_new_device(birthday: u32, network_str: String) -> ApiResult<String
|
|||||||
|
|
||||||
#[wasm_bindgen]
|
#[wasm_bindgen]
|
||||||
pub async fn scan_blocks(tip_height: u32, blindbit_url: String) -> ApiResult<()> {
|
pub async fn scan_blocks(tip_height: u32, blindbit_url: String) -> ApiResult<()> {
|
||||||
let (sink, mut scan_rx, mut state_rx) = WasmUpdateSink::new();
|
|
||||||
|
|
||||||
// Initialize the global sink
|
|
||||||
init_update_sink(sink);
|
|
||||||
|
|
||||||
wasm_bindgen_futures::spawn_local(async move {
|
|
||||||
while let Some(progress) = scan_rx.next().await {
|
|
||||||
log::info!(
|
|
||||||
"Scan progress: {}/{} ({}%)",
|
|
||||||
progress.current,
|
|
||||||
progress.end,
|
|
||||||
(progress.current * 100) / progress.end
|
|
||||||
);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
wasm_bindgen_futures::spawn_local(async move {
|
|
||||||
while let Some(update) = state_rx.next().await {
|
|
||||||
match update {
|
|
||||||
StateUpdate::Update {
|
|
||||||
blkheight,
|
|
||||||
blkhash,
|
|
||||||
found_outputs,
|
|
||||||
found_inputs,
|
|
||||||
} => {
|
|
||||||
log::info!(
|
|
||||||
"Processing update at height {}: {} outputs, {} inputs",
|
|
||||||
blkheight, found_outputs.len(), found_inputs.len()
|
|
||||||
);
|
|
||||||
|
|
||||||
// Actually update the device's wallet state in WASM context
|
|
||||||
if let Ok(mut local_device) = lock_local_device() {
|
|
||||||
// Get direct access to the device's outputs to modify them
|
|
||||||
let device_outputs = local_device.get_mut_outputs();
|
|
||||||
|
|
||||||
// Mark found inputs as spent
|
|
||||||
for outpoint in found_inputs {
|
|
||||||
if let Some(output) = device_outputs.get_mut(&outpoint) {
|
|
||||||
output.spend_status = OutputSpendStatus::Spent(blkhash.to_byte_array());
|
|
||||||
} else {
|
|
||||||
// This should never happen, but we'll log it just in case
|
|
||||||
log::error!("Found a spent input that we don't know about: {}", outpoint.to_string());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Add new found outputs to the device
|
|
||||||
device_outputs.extend(found_outputs);
|
|
||||||
|
|
||||||
// update last_scan
|
|
||||||
local_device.get_mut_sp_wallet().set_last_scan(blkheight.to_consensus_u32());
|
|
||||||
|
|
||||||
log::debug!("Updated device outputs and marked inputs as spent");
|
|
||||||
} else {
|
|
||||||
log::error!("Failed to lock local device for state update");
|
|
||||||
}
|
|
||||||
|
|
||||||
log::debug!("State update processed at height: {}", blkheight);
|
|
||||||
}
|
|
||||||
StateUpdate::NoUpdate { blkheight } => {
|
|
||||||
log::debug!("No update at height: {}", blkheight);
|
|
||||||
// We still need to update last_scan
|
|
||||||
if let Ok(mut local_device) = lock_local_device() {
|
|
||||||
let sp_wallet = local_device.get_mut_sp_wallet();
|
|
||||||
sp_wallet.set_last_scan(blkheight.to_consensus_u32());
|
|
||||||
} else {
|
|
||||||
log::error!("Failed to lock local device for last_scan update");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
let (sp_client, owned_outpoints, last_scan) = {
|
|
||||||
let local_device = lock_local_device()?;
|
let local_device = lock_local_device()?;
|
||||||
let owned_outpoints: HashSet<OutPoint> = local_device.get_sp_wallet().get_unspent_outputs().keys().map(|o| *o).collect();
|
|
||||||
let sp_client = local_device.get_sp_client().clone();
|
|
||||||
let last_scan = local_device.get_sp_wallet().get_last_scan();
|
|
||||||
|
|
||||||
(sp_client, owned_outpoints, last_scan)
|
let sp_wallet = local_device.get_sp_wallet();
|
||||||
};
|
|
||||||
|
|
||||||
|
let last_scan = sp_wallet.get_last_scan();
|
||||||
let n_blocks_to_scan = tip_height - last_scan;
|
let n_blocks_to_scan = tip_height - last_scan;
|
||||||
|
crate::wallet::scan_blocks(n_blocks_to_scan, &blindbit_url, sp_wallet, tip_height, last_scan).await?;
|
||||||
crate::wallet::scan_blocks(n_blocks_to_scan, &blindbit_url, sp_client, owned_outpoints, tip_height, last_scan, 10).await?;
|
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@ -549,7 +470,7 @@ pub fn get_opreturn(transaction: String) -> ApiResult<String> {
|
|||||||
#[wasm_bindgen]
|
#[wasm_bindgen]
|
||||||
pub fn process_commit_new_state(mut process: Process, state_id: String, new_tip: String) -> ApiResult<Process> {
|
pub fn process_commit_new_state(mut process: Process, state_id: String, new_tip: String) -> ApiResult<Process> {
|
||||||
let state_id_array: [u8; 32] = Vec::from_hex(&state_id)?.try_into().unwrap();
|
let state_id_array: [u8; 32] = Vec::from_hex(&state_id)?.try_into().unwrap();
|
||||||
let new_tip = OutPoint::new(Txid::from_str(&new_tip)?, 0);
|
let new_tip = OutPoint::from_str(&new_tip)?;
|
||||||
let new_state: ProcessState;
|
let new_state: ProcessState;
|
||||||
if let Ok(commited_state) = process.get_state_for_id(&state_id_array) {
|
if let Ok(commited_state) = process.get_state_for_id(&state_id_array) {
|
||||||
new_state = commited_state.clone();
|
new_state = commited_state.clone();
|
||||||
|
@ -21,6 +21,7 @@ impl<'a> WasmSpScanner<'a> {
|
|||||||
owned_outpoints: HashSet<OutPoint>,
|
owned_outpoints: HashSet<OutPoint>,
|
||||||
keep_scanning: &'a AtomicBool,
|
keep_scanning: &'a AtomicBool,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
|
log::info!("Creating WasmSpScanner");
|
||||||
Self {
|
Self {
|
||||||
client,
|
client,
|
||||||
updater,
|
updater,
|
||||||
@ -41,9 +42,6 @@ impl<'a> WasmSpScanner<'a> {
|
|||||||
let mut update_time = Instant::now();
|
let mut update_time = Instant::now();
|
||||||
let mut stream = block_data_stream;
|
let mut stream = block_data_stream;
|
||||||
|
|
||||||
let save_interval = 10;
|
|
||||||
let mut blocks_scanned = 1;
|
|
||||||
|
|
||||||
while let Some(blockdata) = stream.next().await {
|
while let Some(blockdata) = stream.next().await {
|
||||||
let blockdata = blockdata?;
|
let blockdata = blockdata?;
|
||||||
let blkheight = blockdata.blkheight;
|
let blkheight = blockdata.blkheight;
|
||||||
@ -57,8 +55,8 @@ impl<'a> WasmSpScanner<'a> {
|
|||||||
|
|
||||||
let mut save_to_storage = false;
|
let mut save_to_storage = false;
|
||||||
|
|
||||||
// always save on last block or after scanning some number of blocks
|
// always save on last block or after 30 seconds since last save
|
||||||
if blkheight == end || blocks_scanned % save_interval == 0 {
|
if blkheight == end || update_time.elapsed() > Duration::from_secs(30) {
|
||||||
save_to_storage = true;
|
save_to_storage = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -81,8 +79,6 @@ impl<'a> WasmSpScanner<'a> {
|
|||||||
self.save_state()?;
|
self.save_state()?;
|
||||||
update_time = Instant::now();
|
update_time = Instant::now();
|
||||||
}
|
}
|
||||||
|
|
||||||
blocks_scanned += 1;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
@ -31,15 +31,9 @@ pub fn generate_sp_wallet(network: Network) -> anyhow::Result<SpClient> {
|
|||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn scan_blocks(
|
pub async fn scan_blocks(mut n_blocks_to_scan: u32, blindbit_url: &str, sp_wallet: &SpWallet, tip_height: u32, scan_height: u32) -> anyhow::Result<()> {
|
||||||
mut n_blocks_to_scan: u32,
|
log::info!("Starting a rescan");
|
||||||
blindbit_url: &str,
|
|
||||||
sp_client: SpClient,
|
|
||||||
owned_outpoints: HashSet<OutPoint>,
|
|
||||||
tip_height: u32,
|
|
||||||
scan_height: u32,
|
|
||||||
save_interval: u32,
|
|
||||||
) -> anyhow::Result<()> {
|
|
||||||
// 0 means scan to tip
|
// 0 means scan to tip
|
||||||
if n_blocks_to_scan == 0 {
|
if n_blocks_to_scan == 0 {
|
||||||
n_blocks_to_scan = tip_height - scan_height;
|
n_blocks_to_scan = tip_height - scan_height;
|
||||||
@ -59,11 +53,15 @@ pub async fn scan_blocks(
|
|||||||
let updater = StateUpdater::new();
|
let updater = StateUpdater::new();
|
||||||
let backend = BlindbitBackend::new(blindbit_url.to_string())?;
|
let backend = BlindbitBackend::new(blindbit_url.to_string())?;
|
||||||
|
|
||||||
|
let owned_outpoints = sp_wallet.get_unspent_outputs().keys().map(|o| *o).collect();
|
||||||
|
|
||||||
let keep_scanning = Arc::new(AtomicBool::new(true));
|
let keep_scanning = Arc::new(AtomicBool::new(true));
|
||||||
|
|
||||||
|
log::info!("start: {} end: {}", start, end);
|
||||||
let start_time = Instant::now();
|
let start_time = Instant::now();
|
||||||
|
log::info!("{:?}", start_time);
|
||||||
let mut scanner = WasmSpScanner::new(
|
let mut scanner = WasmSpScanner::new(
|
||||||
sp_client,
|
sp_wallet.get_sp_client().clone(),
|
||||||
Box::new(updater),
|
Box::new(updater),
|
||||||
Box::new(backend),
|
Box::new(backend),
|
||||||
owned_outpoints,
|
owned_outpoints,
|
||||||
@ -80,5 +78,11 @@ pub async fn scan_blocks(
|
|||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
|
// time elapsed for the scan
|
||||||
|
log::info!(
|
||||||
|
"Scan complete in {} seconds",
|
||||||
|
start_time.elapsed().as_secs()
|
||||||
|
);
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user