Scan block with blindbit instead of electrs
This commit is contained in:
parent
294734e089
commit
76e4c985c1
@ -18,7 +18,10 @@ use sdk_common::{
|
||||
sp_client::{silentpayments::SilentPaymentAddress, RecipientAddress},
|
||||
};
|
||||
|
||||
use crate::message::{broadcast_message, BroadcastType};
|
||||
use crate::{
|
||||
CHAIN_TIP,
|
||||
message::{broadcast_message, BroadcastType},
|
||||
};
|
||||
use crate::{lock_freezed_utxos, MutexExt, DAEMON, STORAGE, WALLET};
|
||||
|
||||
pub(crate) fn handle_commit_request(commit_msg: CommitMessage) -> Result<OutPoint> {
|
||||
@ -48,10 +51,12 @@ pub(crate) fn handle_commit_request(commit_msg: CommitMessage) -> Result<OutPoin
|
||||
let mut new_process_map = HashMap::new();
|
||||
let new_process = processes.get(&commit_msg.process_id).unwrap().clone();
|
||||
new_process_map.insert(commit_msg.process_id, new_process);
|
||||
let current_tip = CHAIN_TIP.load(std::sync::atomic::Ordering::SeqCst);
|
||||
let init_msg = HandshakeMessage::new(
|
||||
our_sp_address.to_string(),
|
||||
OutPointMemberMap(HashMap::new()),
|
||||
OutPointProcessMap(new_process_map),
|
||||
current_tip.into(),
|
||||
);
|
||||
|
||||
if let Err(e) = broadcast_message(
|
||||
|
@ -78,8 +78,7 @@ impl Config {
|
||||
.to_owned(),
|
||||
data_dir: file_content
|
||||
.remove("data_dir")
|
||||
.ok_or(Error::msg("No \"data_dir\""))?
|
||||
.to_owned(),
|
||||
.unwrap_or_else(|| ".4nk".to_string()),
|
||||
};
|
||||
|
||||
Ok(config)
|
||||
|
120
src/main.rs
120
src/main.rs
@ -7,11 +7,11 @@ use std::{
|
||||
net::SocketAddr,
|
||||
path::PathBuf,
|
||||
str::FromStr,
|
||||
sync::{atomic::AtomicU64, Mutex, MutexGuard, OnceLock},
|
||||
sync::{atomic::AtomicU32, Arc, Mutex, MutexGuard, OnceLock},
|
||||
};
|
||||
|
||||
use bitcoincore_rpc::{
|
||||
bitcoin::secp256k1::SecretKey,
|
||||
bitcoin::{hashes::Hash, secp256k1::SecretKey},
|
||||
json::{self as bitcoin_json},
|
||||
};
|
||||
use commit::{lock_members, MEMBERLIST};
|
||||
@ -19,15 +19,10 @@ use futures_util::{future, pin_mut, stream::TryStreamExt, FutureExt, StreamExt};
|
||||
use log::{debug, error, warn};
|
||||
use message::{broadcast_message, process_message, BroadcastType, MessageCache, MESSAGECACHE};
|
||||
use scan::{check_transaction_alone, compute_partial_tweak_to_transaction};
|
||||
use sdk_common::sp_client::{
|
||||
bitcoin::secp256k1::rand::{thread_rng, Rng},
|
||||
bitcoin::OutPoint,
|
||||
SpClient, SpendKey,
|
||||
};
|
||||
use sdk_common::{
|
||||
error::AnkError,
|
||||
network::{AnkFlag, NewTxMessage},
|
||||
};
|
||||
use sdk_common::{sp_client::{
|
||||
bitcoin::{secp256k1::rand::thread_rng, OutPoint}, OutputSpendStatus, SpClient, SpendKey
|
||||
}, updates::{init_update_sink, NativeUpdateSink, StateUpdate}};
|
||||
use sdk_common::network::{AnkFlag, NewTxMessage};
|
||||
use sdk_common::{
|
||||
network::HandshakeMessage,
|
||||
pcd::Member,
|
||||
@ -41,7 +36,6 @@ use sdk_common::{
|
||||
Amount, Network, Transaction,
|
||||
},
|
||||
silentpayments::SilentPaymentAddress,
|
||||
OwnedOutput,
|
||||
},
|
||||
MutexExt,
|
||||
};
|
||||
@ -58,7 +52,6 @@ use zeromq::{Socket, SocketRecv};
|
||||
mod commit;
|
||||
mod config;
|
||||
mod daemon;
|
||||
mod electrumclient;
|
||||
mod faucet;
|
||||
mod message;
|
||||
mod scan;
|
||||
@ -69,6 +62,8 @@ use crate::{
|
||||
scan::scan_blocks,
|
||||
};
|
||||
|
||||
pub const WITH_CUTTHROUGH: bool = false; // We'd rather catch everything for this use case
|
||||
|
||||
type Tx = UnboundedSender<Message>;
|
||||
|
||||
type PeerMap = Mutex<HashMap<SocketAddr, Tx>>;
|
||||
@ -77,7 +72,7 @@ pub(crate) static PEERMAP: OnceLock<PeerMap> = OnceLock::new();
|
||||
|
||||
pub(crate) static DAEMON: OnceLock<Mutex<Box<dyn RpcCall>>> = OnceLock::new();
|
||||
|
||||
static CHAIN_TIP: AtomicU64 = AtomicU64::new(0);
|
||||
static CHAIN_TIP: AtomicU32 = AtomicU32::new(0);
|
||||
|
||||
pub static FREEZED_UTXOS: OnceLock<Mutex<HashSet<OutPoint>>> = OnceLock::new();
|
||||
|
||||
@ -205,7 +200,7 @@ async fn handle_connection(
|
||||
our_sp_address.to_string(),
|
||||
OutPointMemberMap(members),
|
||||
OutPointProcessMap(processes),
|
||||
current_tip,
|
||||
current_tip.into(),
|
||||
);
|
||||
|
||||
if let Err(e) = broadcast_message(
|
||||
@ -246,7 +241,6 @@ async fn handle_connection(
|
||||
}
|
||||
|
||||
fn create_new_tx_message(transaction: Vec<u8>) -> Result<NewTxMessage> {
|
||||
// debug!("Creating tx message");
|
||||
let tx: Transaction = deserialize(&transaction)?;
|
||||
|
||||
if tx.is_coinbase() {
|
||||
@ -267,6 +261,44 @@ fn create_new_tx_message(transaction: Vec<u8>) -> Result<NewTxMessage> {
|
||||
))
|
||||
}
|
||||
|
||||
async fn handle_scan_updates(scan_rx: std::sync::mpsc::Receiver<sdk_common::updates::ScanProgress>) {
|
||||
while let Ok(update) = scan_rx.recv() {
|
||||
log::debug!("Received scan update: {:?}", update);
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_state_updates(state_rx: std::sync::mpsc::Receiver<sdk_common::updates::StateUpdate>) {
|
||||
while let Ok(update) = state_rx.recv() {
|
||||
match update {
|
||||
StateUpdate::Update { blkheight, blkhash, found_outputs, found_inputs } => {
|
||||
// We update the wallet with found outputs and inputs
|
||||
let mut sp_wallet = WALLET.get().unwrap().lock_anyhow().unwrap();
|
||||
// inputs first
|
||||
for outpoint in found_inputs {
|
||||
if let Some(output) = sp_wallet.get_mut_outputs().get_mut(&outpoint) {
|
||||
output.spend_status = OutputSpendStatus::Mined(blkhash.as_raw_hash().to_byte_array());
|
||||
} else {
|
||||
// We found an input that we don't have in our wallet, that shouldn't happen
|
||||
error!("Spent unknown output: {:?}", outpoint);
|
||||
}
|
||||
}
|
||||
sp_wallet.get_mut_outputs().extend(found_outputs);
|
||||
sp_wallet.set_last_scan(blkheight.to_consensus_u32());
|
||||
let json = serde_json::to_value(sp_wallet.clone()).unwrap();
|
||||
STORAGE.get().unwrap().lock_anyhow().unwrap().wallet_file.save(&json).unwrap();
|
||||
}
|
||||
StateUpdate::NoUpdate { blkheight }=> {
|
||||
// We just keep the current height to update the last_scan
|
||||
debug!("No update, setting last scan at {}", blkheight);
|
||||
let mut sp_wallet = WALLET.get().unwrap().lock_anyhow().unwrap();
|
||||
sp_wallet.set_last_scan(blkheight.to_consensus_u32());
|
||||
let json = serde_json::to_value(sp_wallet.clone()).unwrap();
|
||||
STORAGE.get().unwrap().lock_anyhow().unwrap().wallet_file.save(&json).unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_zmq(zmq_url: String, blindbit_url: String) {
|
||||
debug!("Starting listening on Core");
|
||||
let mut socket = zeromq::SubSocket::new();
|
||||
@ -297,12 +329,38 @@ async fn handle_zmq(zmq_url: String, blindbit_url: String) {
|
||||
continue;
|
||||
}
|
||||
},
|
||||
Ok("hashblock") => match scan_blocks(0, &blindbit_url).await {
|
||||
Ok(_) => continue,
|
||||
Err(e) => {
|
||||
error!("{}", e);
|
||||
continue;
|
||||
Ok("hashblock") => {
|
||||
let current_height = DAEMON.get().unwrap().lock_anyhow().unwrap().get_current_height().unwrap();
|
||||
CHAIN_TIP.store(current_height as u32, std::sync::atomic::Ordering::SeqCst);
|
||||
|
||||
// Add retry logic for hashblock processing
|
||||
let mut retry_count = 0;
|
||||
const MAX_RETRIES: u32 = 3;
|
||||
const RETRY_DELAY_MS: u64 = 1000; // 1 second initial delay
|
||||
|
||||
loop {
|
||||
match scan_blocks(0, &blindbit_url).await {
|
||||
Ok(_) => {
|
||||
debug!("Successfully scanned blocks after {} retries", retry_count);
|
||||
break;
|
||||
}
|
||||
Err(e) => {
|
||||
retry_count += 1;
|
||||
if retry_count >= MAX_RETRIES {
|
||||
error!("Failed to scan blocks after {} retries: {}", MAX_RETRIES, e);
|
||||
break;
|
||||
}
|
||||
|
||||
// Exponential backoff: 1s, 2s, 4s
|
||||
let delay_ms = RETRY_DELAY_MS * (1 << (retry_count - 1));
|
||||
warn!("Scan failed (attempt {}/{}), retrying in {}ms: {}",
|
||||
retry_count, MAX_RETRIES, delay_ms, e);
|
||||
|
||||
tokio::time::sleep(tokio::time::Duration::from_millis(delay_ms)).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
continue;
|
||||
},
|
||||
_ => {
|
||||
error!("Unexpected message in zmq");
|
||||
@ -356,6 +414,9 @@ async fn main() -> Result<()> {
|
||||
.get_current_height()?
|
||||
.try_into()?;
|
||||
|
||||
// Set CHAIN_TIP
|
||||
CHAIN_TIP.store(current_tip, std::sync::atomic::Ordering::SeqCst);
|
||||
|
||||
let mut app_dir = PathBuf::from_str(&env::var("HOME")?)?;
|
||||
app_dir.push(config.data_dir);
|
||||
let mut wallet_file = app_dir.clone();
|
||||
@ -470,16 +531,25 @@ async fn main() -> Result<()> {
|
||||
|
||||
STORAGE.set(Mutex::new(storage)).unwrap();
|
||||
|
||||
let (sink, scan_rx, state_rx) = NativeUpdateSink::new();
|
||||
init_update_sink(Arc::new(sink));
|
||||
|
||||
// Spawn the update handlers
|
||||
tokio::spawn(handle_scan_updates(scan_rx));
|
||||
tokio::spawn(handle_state_updates(state_rx));
|
||||
|
||||
if last_scan < current_tip {
|
||||
log::info!("Scanning for our outputs");
|
||||
scan_blocks(current_tip - last_scan, &config.blindbit_url)?;
|
||||
scan_blocks(current_tip - last_scan, &config.blindbit_url).await?;
|
||||
}
|
||||
|
||||
// Set CHAIN_TIP
|
||||
CHAIN_TIP.store(current_tip as u64, std::sync::atomic::Ordering::SeqCst);
|
||||
|
||||
// Subscribe to Bitcoin Core
|
||||
tokio::spawn(handle_zmq(config.zmq_url, config.blindbit_url));
|
||||
let zmq_url = config.zmq_url.clone();
|
||||
let blindbit_url = config.blindbit_url.clone();
|
||||
tokio::spawn(async move {
|
||||
handle_zmq(zmq_url, blindbit_url).await;
|
||||
});
|
||||
|
||||
// Create the event loop and TCP listener we'll accept connections on.
|
||||
let try_socket = TcpListener::bind(config.ws_url).await;
|
||||
|
134
src/scan.rs
134
src/scan.rs
@ -1,22 +1,27 @@
|
||||
use std::collections::HashMap;
|
||||
use std::str::FromStr;
|
||||
use std::sync::atomic::AtomicBool;
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::sync::Arc;
|
||||
use std::sync::MutexGuard;
|
||||
|
||||
use anyhow::{Error, Result};
|
||||
use bitcoincore_rpc::bitcoin::absolute::{Amount, Height};
|
||||
use sdk_common::silentpayments::{SpWallet, StateUpdater};
|
||||
use bitcoincore_rpc::bitcoin::absolute::Height;
|
||||
use bitcoincore_rpc::bitcoin::Amount;
|
||||
use sdk_common::silentpayments::SpWallet;
|
||||
use sdk_common::sp_client::bitcoin::bip158::BlockFilter;
|
||||
use sdk_common::sp_client::bitcoin::hex::DisplayHex;
|
||||
use sdk_common::sp_client::bitcoin::secp256k1::{All, PublicKey, Scalar, Secp256k1, SecretKey};
|
||||
use sdk_common::sp_client::bitcoin::{BlockHash, OutPoint, Transaction, TxOut, XOnlyPublicKey};
|
||||
use sdk_common::sp_client::silentpayments::receiving::Receiver;
|
||||
use sdk_common::sp_client::silentpayments::utils::receiving::{
|
||||
calculate_tweak_data, get_pubkey_from_input,
|
||||
};
|
||||
use sdk_common::sp_client::{OutputSpendStatus, OwnedOutput, SpScanner};
|
||||
use sdk_common::sp_client::{BlindbitBackend, OutputSpendStatus, OwnedOutput, SpScanner};
|
||||
use sdk_common::updates::StateUpdater;
|
||||
use tokio::time::Instant;
|
||||
|
||||
use crate::{MutexExt, DAEMON, STORAGE, WALLET};
|
||||
use crate::CHAIN_TIP;
|
||||
use crate::{MutexExt, DAEMON, STORAGE, WALLET, WITH_CUTTHROUGH};
|
||||
|
||||
pub fn compute_partial_tweak_to_transaction(tx: &Transaction) -> Result<PublicKey> {
|
||||
let daemon = DAEMON.get().ok_or(Error::msg("DAEMON not initialized"))?;
|
||||
@ -236,21 +241,17 @@ fn scan_block_inputs(
|
||||
|
||||
pub async fn scan_blocks(mut n_blocks_to_scan: u32, blindbit_url: &str) -> anyhow::Result<()> {
|
||||
log::info!("Starting a rescan");
|
||||
let blindbit_client = sdk_common::sp_client::BlindbitClient::new(blindbit_url.to_owned())?;
|
||||
|
||||
let mut sp_wallet = WALLET
|
||||
.get()
|
||||
.ok_or(Error::msg("Wallet not initialized"))?
|
||||
.lock_anyhow()?;
|
||||
|
||||
let core = DAEMON
|
||||
.get()
|
||||
.ok_or(Error::msg("DAEMON not initialized"))?
|
||||
.lock_anyhow()?;
|
||||
|
||||
let secp = Secp256k1::new();
|
||||
let scan_height = sp_wallet.get_last_scan();
|
||||
let tip_height: u32 = core.get_current_height()?.try_into()?;
|
||||
// Get all the data we need upfront, before any async operations
|
||||
let (sp_wallet, scan_height, tip_height) = {
|
||||
let sp_wallet = WALLET
|
||||
.get()
|
||||
.ok_or(Error::msg("Wallet not initialized"))?
|
||||
.lock_anyhow()?;
|
||||
let scan_height = sp_wallet.get_last_scan();
|
||||
let tip_height: u32 = CHAIN_TIP.load(Ordering::Relaxed).try_into()?;
|
||||
(sp_wallet.clone(), scan_height, tip_height)
|
||||
};
|
||||
|
||||
// 0 means scan to tip
|
||||
if n_blocks_to_scan == 0 {
|
||||
@ -269,81 +270,31 @@ pub async fn scan_blocks(mut n_blocks_to_scan: u32, blindbit_url: &str) -> anyho
|
||||
}
|
||||
|
||||
let updater = StateUpdater::new();
|
||||
let backend = BlindbitBackend::new(blindbit_url.to_string())?;
|
||||
|
||||
let owned_outpoints = sp_wallet.get_unspent_outputs().keys().map(|o| *o).collect();
|
||||
|
||||
let keep_scanning = Arc::new(AtomicBool::new(true));
|
||||
|
||||
log::info!("start: {} end: {}", start, end);
|
||||
SpScanner::new(
|
||||
let start_time = Instant::now();
|
||||
let mut scanner = SpScanner::new(
|
||||
sp_wallet.get_sp_client().clone(),
|
||||
Box::new(updater),
|
||||
backend,
|
||||
Box::new(backend),
|
||||
owned_outpoints,
|
||||
keep_scanning
|
||||
&keep_scanning,
|
||||
);
|
||||
let mut filters: Vec<(u32, BlockHash, BlockFilter)> = vec![];
|
||||
for blkheight in start..=end {
|
||||
filters.push(core.get_filters(blkheight)?);
|
||||
}
|
||||
|
||||
let mut tweak_data_map = blindbit_client.tweaks(Height::from_consensus(start)?, Amount::from_sat(550)).await?;
|
||||
|
||||
let scan_sk = sp_wallet.get_sp_client().get_scan_key();
|
||||
|
||||
let start_time = Instant::now();
|
||||
|
||||
for (blkheight, blkhash, blkfilter) in filters {
|
||||
let spk2secret = match tweak_data_map.remove(&blkheight) {
|
||||
Some(tweak_data_vec) => get_script_to_secret_map(
|
||||
&sp_wallet.get_sp_client().sp_receiver,
|
||||
tweak_data_vec,
|
||||
scan_sk.into(),
|
||||
&secp,
|
||||
)?,
|
||||
None => HashMap::new(),
|
||||
};
|
||||
|
||||
// check if new possible outputs are payments to us
|
||||
let candidate_spks: Vec<&[u8; 34]> = spk2secret.keys().collect();
|
||||
|
||||
// check if owned inputs are spent
|
||||
let owned_spks: Vec<Vec<u8>> = sp_wallet
|
||||
.get_outputs()
|
||||
.iter()
|
||||
.map(|(_, output)| {
|
||||
let script = output.script.to_bytes();
|
||||
script
|
||||
})
|
||||
.collect();
|
||||
|
||||
let matched = check_block(blkfilter, blkhash, candidate_spks, owned_spks)?;
|
||||
|
||||
if matched {
|
||||
let blk = core.get_block(blkhash)?;
|
||||
|
||||
// scan block for new outputs, and add them to our list
|
||||
let utxo_created_in_block = scan_block_outputs(
|
||||
&sp_wallet.get_sp_client().sp_receiver,
|
||||
&blk.txdata,
|
||||
blkheight.into(),
|
||||
spk2secret,
|
||||
)?;
|
||||
if !utxo_created_in_block.is_empty() {
|
||||
sp_wallet.get_mut_outputs().extend(utxo_created_in_block);
|
||||
}
|
||||
|
||||
// update the list of outputs just in case
|
||||
// utxos may be created and destroyed in the same block
|
||||
// search inputs and mark as mined
|
||||
let utxo_destroyed_in_block = scan_block_inputs(sp_wallet.get_outputs(), blk.txdata)?;
|
||||
if !utxo_destroyed_in_block.is_empty() {
|
||||
let outputs = sp_wallet.get_mut_outputs();
|
||||
for outpoint in utxo_destroyed_in_block {
|
||||
if let Some(output) = outputs.get_mut(&outpoint) {
|
||||
output.spend_status =
|
||||
OutputSpendStatus::Mined(blkhash.as_raw_hash().to_byte_array());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
let dust_limit = Amount::from_sat(0); // We don't really have a dust limit for this use case
|
||||
scanner
|
||||
.scan_blocks(
|
||||
Height::from_consensus(start)?,
|
||||
Height::from_consensus(end)?,
|
||||
dust_limit,
|
||||
WITH_CUTTHROUGH,
|
||||
)
|
||||
.await?;
|
||||
|
||||
// time elapsed for the scan
|
||||
log::info!(
|
||||
@ -351,14 +302,5 @@ pub async fn scan_blocks(mut n_blocks_to_scan: u32, blindbit_url: &str) -> anyho
|
||||
start_time.elapsed().as_secs()
|
||||
);
|
||||
|
||||
// update last_scan height
|
||||
sp_wallet.set_last_scan(end);
|
||||
STORAGE
|
||||
.get()
|
||||
.unwrap()
|
||||
.lock_anyhow()?
|
||||
.wallet_file
|
||||
.save(&serde_json::to_value(sp_wallet.clone())?)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user