sdk_client/src/wallet.rs
2025-06-24 17:24:31 +02:00

302 lines
9.0 KiB
Rust

use std::{
collections::{HashMap, HashSet}, ops::RangeInclusive, pin::Pin, sync::{atomic::AtomicBool, Arc, Mutex, MutexGuard, OnceLock}, time::Instant
};
use anyhow::{bail, Result, Error};
use futures::Stream;
use rand::Rng;
use sdk_common::{log, sp_client::{
bitcoin::{absolute::Height, hashes::{sha256, Hash}, secp256k1::{PublicKey, SecretKey}, Amount, BlockHash, Network, OutPoint}, silentpayments::SilentPaymentAddress, BlindbitBackend, BlockData, FilterData, OutputSpendStatus, OwnedOutput, SpClient, SpScanner, SpendKey, SpentIndexData, Updater, UtxoData
}, updates::StateUpdater};
use sdk_common::sp_client::ChainBackendWasm;
use crate::{user::lock_local_device, MutexExt};
pub static FREEZED_UTXOS: OnceLock<Mutex<HashSet<OutPoint>>> = OnceLock::new();
pub fn lock_freezed_utxos() -> Result<MutexGuard<'static, HashSet<OutPoint>>, Error> {
FREEZED_UTXOS
.get_or_init(|| Mutex::new(HashSet::new()))
.lock_anyhow()
}
pub fn generate_sp_wallet(network: Network) -> anyhow::Result<SpClient> {
let mut rng = rand::thread_rng();
SpClient::new(
SecretKey::new(&mut rng),
SpendKey::Secret(SecretKey::new(&mut rng)),
network,
)
}
pub struct WasmSpScanner<'a> {
updater: Box<dyn Updater + Sync>,
backend: BlindbitBackend,
client: SpClient,
keep_scanning: &'a AtomicBool,
owned_outpoints: HashSet<OutPoint>,
}
impl<'a> WasmSpScanner<'a> {
pub fn new(
client: SpClient,
updater: Box<dyn Updater + Sync>,
backend: BlindbitBackend,
owned_outpoints: HashSet<OutPoint>,
keep_scanning: &'a AtomicBool,
) -> Self {
Self {
client,
updater,
backend,
owned_outpoints,
keep_scanning,
}
}
pub async fn scan_blocks(
&mut self,
start: Height,
end: Height,
dust_limit: Amount,
with_cutthrough: bool,
) -> Result<()> {
if start > end {
bail!("bigger start than end: {} > {}", start, end);
}
log::info!("start: {} end: {}", start, end);
let start_time: Instant = Instant::now();
// get block data stream
let range = start.to_consensus_u32()..=end.to_consensus_u32();
let block_data_stream = self.backend.get_block_data_for_range(range, dust_limit, with_cutthrough);
// process blocks using block data stream
self.process_blocks(start, end, block_data_stream).await?;
// time elapsed for the scan
log::info!(
"Blindbit scan complete in {} seconds",
start_time.elapsed().as_secs()
);
Ok(())
}
async fn process_blocks(
&mut self,
start: Height,
end: Height,
block_data_stream: Pin<Box<dyn Stream<Item = Result<BlockData>>>>,
) -> Result<()> {
use sdk_common::sp_client::futures::StreamExt;
use std::time::{Duration, Instant};
let mut update_time = Instant::now();
let mut stream = block_data_stream;
while let Some(blockdata) = stream.next().await {
let blockdata = blockdata?;
let blkheight = blockdata.blkheight;
let blkhash = blockdata.blkhash;
// stop scanning and return if interrupted
if self.should_interrupt() {
self.save_state()?;
return Ok(());
}
let mut save_to_storage = false;
// always save on last block or after 30 seconds since last save
if blkheight == end || update_time.elapsed() > Duration::from_secs(30) {
save_to_storage = true;
}
let (found_outputs, found_inputs) = self.process_block(blockdata).await?;
if !found_outputs.is_empty() {
save_to_storage = true;
self.record_outputs(blkheight, blkhash, found_outputs)?;
}
if !found_inputs.is_empty() {
save_to_storage = true;
self.record_inputs(blkheight, blkhash, found_inputs)?;
}
// tell the updater we scanned this block
self.record_progress(start, blkheight, end)?;
if save_to_storage {
self.save_state()?;
update_time = Instant::now();
}
}
Ok(())
}
async fn process_block(
&mut self,
blockdata: BlockData,
) -> Result<(HashMap<OutPoint, OwnedOutput>, HashSet<OutPoint>)> {
let BlockData {
blkheight,
tweaks,
new_utxo_filter,
spent_filter,
..
} = blockdata;
let outs = self
.process_block_outputs(blkheight, tweaks, new_utxo_filter)
.await?;
// after processing outputs, we add the found outputs to our list
self.owned_outpoints.extend(outs.keys());
let ins = self.process_block_inputs(blkheight, spent_filter).await?;
// after processing inputs, we remove the found inputs
self.owned_outpoints.retain(|item| !ins.contains(item));
Ok((outs, ins))
}
async fn process_block_outputs(
&self,
blkheight: Height,
tweaks: Vec<PublicKey>,
new_utxo_filter: FilterData,
) -> Result<HashMap<OutPoint, OwnedOutput>> {
// Implementation for processing block outputs
// This is a placeholder - you'll need to implement the actual logic
Ok(HashMap::new())
}
async fn process_block_inputs(
&self,
blkheight: Height,
spent_filter: FilterData,
) -> Result<HashSet<OutPoint>> {
// Implementation for processing block inputs
// This is a placeholder - you'll need to implement the actual logic
Ok(HashSet::new())
}
fn should_interrupt(&self) -> bool {
!self
.keep_scanning
.load(std::sync::atomic::Ordering::Relaxed)
}
fn save_state(&mut self) -> Result<()> {
self.updater.save_to_persistent_storage()
}
fn record_outputs(
&mut self,
height: Height,
block_hash: BlockHash,
outputs: HashMap<OutPoint, OwnedOutput>,
) -> Result<()> {
self.updater.record_block_outputs(height, block_hash, outputs)
}
fn record_inputs(
&mut self,
height: Height,
block_hash: BlockHash,
inputs: HashSet<OutPoint>,
) -> Result<()> {
self.updater.record_block_inputs(height, block_hash, inputs)
}
fn record_progress(&mut self, start: Height, current: Height, end: Height) -> Result<()> {
self.updater.record_scan_progress(start, current, end)
}
// Override the default get_input_hashes implementation to use owned_outpoints
fn get_input_hashes(&self, blkhash: BlockHash) -> Result<HashMap<[u8; 8], OutPoint>> {
let mut map: HashMap<[u8; 8], OutPoint> = HashMap::new();
for outpoint in &self.owned_outpoints {
let mut arr = [0u8; 68];
arr[..32].copy_from_slice(&outpoint.txid.to_raw_hash().to_byte_array());
arr[32..36].copy_from_slice(&outpoint.vout.to_le_bytes());
arr[36..].copy_from_slice(&blkhash.to_byte_array());
let hash = sha256::Hash::hash(&arr);
let mut res = [0u8; 8];
res.copy_from_slice(&hash[..8]);
map.insert(res, outpoint.clone());
}
Ok(map)
}
}
pub async fn scan_blocks(mut n_blocks_to_scan: u32, blindbit_url: &str, tip_height: u32, with_cutthrough: bool) -> anyhow::Result<()> {
log::info!("Starting a rescan");
// Get all the data we need upfront, before any async operations
let device = lock_local_device()?;
let sp_wallet = device.get_sp_wallet();
let scan_height = sp_wallet.get_last_scan();
// 0 means scan to tip
if n_blocks_to_scan == 0 {
n_blocks_to_scan = tip_height - scan_height;
}
let start = scan_height + 1;
let end = if scan_height + n_blocks_to_scan <= tip_height {
scan_height + n_blocks_to_scan
} else {
tip_height
};
if start > end {
return Ok(());
}
let updater = StateUpdater::new();
let backend = BlindbitBackend::new(blindbit_url.to_string())?;
let owned_outpoints = sp_wallet.get_unspent_outputs().keys().map(|o| *o).collect();
let keep_scanning = Arc::new(AtomicBool::new(true));
log::info!("start: {} end: {}", start, end);
let start_time = Instant::now();
let mut scanner = WasmSpScanner::new(
sp_wallet.get_sp_client().clone(),
Box::new(updater),
backend,
owned_outpoints,
&keep_scanning,
);
let dust_limit = Amount::from_sat(0); // We don't really have a dust limit for this use case
scanner
.scan_blocks(
Height::from_consensus(start)?,
Height::from_consensus(end)?,
dust_limit,
with_cutthrough,
)
.await?;
// time elapsed for the scan
log::info!(
"Scan complete in {} seconds",
start_time.elapsed().as_secs()
);
Ok(())
}