Merge branch 'blindbit' into dev
This commit is contained in:
commit
fd3356c0d5
@ -3,4 +3,5 @@ ws_url=""
|
|||||||
wallet_name="default"
|
wallet_name="default"
|
||||||
network="signet"
|
network="signet"
|
||||||
electrum_url="tcp://localhost:60601"
|
electrum_url="tcp://localhost:60601"
|
||||||
|
blindbit_url="tcp://localhost:8000"
|
||||||
zmq_url=""
|
zmq_url=""
|
||||||
|
701
Cargo.lock
generated
701
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
@ -5,8 +5,8 @@ edition = "2021"
|
|||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
anyhow = "1.0"
|
anyhow = "1.0"
|
||||||
|
async-trait = "0.1"
|
||||||
bitcoincore-rpc = { version = "0.18" }
|
bitcoincore-rpc = { version = "0.18" }
|
||||||
electrum-client = { git = "https://github.com/cygnet3/rust-electrum-client", branch = "sp_tweaks" }
|
|
||||||
env_logger = "0.9"
|
env_logger = "0.9"
|
||||||
futures-util = { version = "0.3.28", default-features = false, features = ["sink", "std"] }
|
futures-util = { version = "0.3.28", default-features = false, features = ["sink", "std"] }
|
||||||
hex = "0.4.3"
|
hex = "0.4.3"
|
||||||
|
@ -18,8 +18,11 @@ use sdk_common::{
|
|||||||
sp_client::{silentpayments::SilentPaymentAddress, RecipientAddress},
|
sp_client::{silentpayments::SilentPaymentAddress, RecipientAddress},
|
||||||
};
|
};
|
||||||
|
|
||||||
use crate::message::{broadcast_message, BroadcastType};
|
|
||||||
use crate::{lock_freezed_utxos, MutexExt, DAEMON, STORAGE, WALLET};
|
use crate::{lock_freezed_utxos, MutexExt, DAEMON, STORAGE, WALLET};
|
||||||
|
use crate::{
|
||||||
|
message::{broadcast_message, BroadcastType},
|
||||||
|
CHAIN_TIP,
|
||||||
|
};
|
||||||
|
|
||||||
pub(crate) fn handle_commit_request(commit_msg: CommitMessage) -> Result<OutPoint> {
|
pub(crate) fn handle_commit_request(commit_msg: CommitMessage) -> Result<OutPoint> {
|
||||||
let mut processes = lock_processes()?;
|
let mut processes = lock_processes()?;
|
||||||
@ -48,10 +51,12 @@ pub(crate) fn handle_commit_request(commit_msg: CommitMessage) -> Result<OutPoin
|
|||||||
let mut new_process_map = HashMap::new();
|
let mut new_process_map = HashMap::new();
|
||||||
let new_process = processes.get(&commit_msg.process_id).unwrap().clone();
|
let new_process = processes.get(&commit_msg.process_id).unwrap().clone();
|
||||||
new_process_map.insert(commit_msg.process_id, new_process);
|
new_process_map.insert(commit_msg.process_id, new_process);
|
||||||
|
let current_tip = CHAIN_TIP.load(std::sync::atomic::Ordering::SeqCst);
|
||||||
let init_msg = HandshakeMessage::new(
|
let init_msg = HandshakeMessage::new(
|
||||||
our_sp_address.to_string(),
|
our_sp_address.to_string(),
|
||||||
OutPointMemberMap(HashMap::new()),
|
OutPointMemberMap(HashMap::new()),
|
||||||
OutPointProcessMap(new_process_map),
|
OutPointProcessMap(new_process_map),
|
||||||
|
current_tip.into(),
|
||||||
);
|
);
|
||||||
|
|
||||||
if let Err(e) = broadcast_message(
|
if let Err(e) = broadcast_message(
|
||||||
@ -147,9 +152,8 @@ fn handle_member_list(commit_msg: &CommitMessage) -> Result<OutPoint> {
|
|||||||
return Err(Error::msg("Process is not a pairing process"));
|
return Err(Error::msg("Process is not a pairing process"));
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Some(paired_addresses) = commit_msg.public_data.get("pairedAddresses") {
|
if let Ok(paired_addresses) = commit_msg.public_data.get_as_json("pairedAddresses") {
|
||||||
let paired_addresses: Vec<SilentPaymentAddress> =
|
let paired_addresses: Vec<SilentPaymentAddress> = serde_json::from_value(paired_addresses.clone())?;
|
||||||
sdk_common::serialization::ciborium_deserialize(paired_addresses)?;
|
|
||||||
let mut memberlist = lock_members()?;
|
let mut memberlist = lock_members()?;
|
||||||
memberlist.insert(commit_msg.process_id, Member::new(paired_addresses));
|
memberlist.insert(commit_msg.process_id, Member::new(paired_addresses));
|
||||||
return Ok(commit_msg.process_id);
|
return Ok(commit_msg.process_id);
|
||||||
|
@ -13,7 +13,7 @@ pub struct Config {
|
|||||||
pub ws_url: String,
|
pub ws_url: String,
|
||||||
pub wallet_name: String,
|
pub wallet_name: String,
|
||||||
pub network: Network,
|
pub network: Network,
|
||||||
pub electrum_url: String,
|
pub blindbit_url: String,
|
||||||
pub zmq_url: String,
|
pub zmq_url: String,
|
||||||
pub data_dir: String,
|
pub data_dir: String,
|
||||||
}
|
}
|
||||||
@ -63,9 +63,9 @@ impl Config {
|
|||||||
.ok_or(Error::msg("no \"network\""))?
|
.ok_or(Error::msg("no \"network\""))?
|
||||||
.trim_matches('\"'),
|
.trim_matches('\"'),
|
||||||
)?,
|
)?,
|
||||||
electrum_url: file_content
|
blindbit_url: file_content
|
||||||
.remove("electrum_url")
|
.remove("blindbit_url")
|
||||||
.ok_or(Error::msg("No \"electrum_url\""))?
|
.ok_or(Error::msg("No \"blindbit_url\""))?
|
||||||
.to_owned(),
|
.to_owned(),
|
||||||
zmq_url: file_content
|
zmq_url: file_content
|
||||||
.remove("zmq_url")
|
.remove("zmq_url")
|
||||||
@ -73,8 +73,7 @@ impl Config {
|
|||||||
.to_owned(),
|
.to_owned(),
|
||||||
data_dir: file_content
|
data_dir: file_content
|
||||||
.remove("data_dir")
|
.remove("data_dir")
|
||||||
.ok_or(Error::msg("No \"data_dir\""))?
|
.unwrap_or_else(|| ".4nk".to_string()),
|
||||||
.to_owned(),
|
|
||||||
};
|
};
|
||||||
|
|
||||||
Ok(config)
|
Ok(config)
|
||||||
|
@ -1,14 +0,0 @@
|
|||||||
use electrum_client::{Client, ConfigBuilder};
|
|
||||||
use log::info;
|
|
||||||
|
|
||||||
const VALIDATE_DOMAIN: bool = false; // self-signed cert, so we don't validate
|
|
||||||
|
|
||||||
pub fn create_electrum_client(electrum_url: &str) -> anyhow::Result<Client> {
|
|
||||||
let config = ConfigBuilder::new()
|
|
||||||
.validate_domain(VALIDATE_DOMAIN)
|
|
||||||
.build();
|
|
||||||
let electrum_client = Client::from_config(electrum_url, config)?;
|
|
||||||
info!("ssl client {}", electrum_url);
|
|
||||||
|
|
||||||
Ok(electrum_client)
|
|
||||||
}
|
|
@ -95,7 +95,7 @@ fn faucet_send(
|
|||||||
|
|
||||||
// We filter out the freezed utxos from available list
|
// We filter out the freezed utxos from available list
|
||||||
let available_outpoints: Vec<(OutPoint, OwnedOutput)> = sp_wallet
|
let available_outpoints: Vec<(OutPoint, OwnedOutput)> = sp_wallet
|
||||||
.get_outputs()
|
.get_unspent_outputs()
|
||||||
.iter()
|
.iter()
|
||||||
.filter_map(|(outpoint, output)| {
|
.filter_map(|(outpoint, output)| {
|
||||||
if !freezed_utxos.contains(&outpoint) {
|
if !freezed_utxos.contains(&outpoint) {
|
||||||
|
159
src/main.rs
159
src/main.rs
@ -7,11 +7,11 @@ use std::{
|
|||||||
net::SocketAddr,
|
net::SocketAddr,
|
||||||
path::PathBuf,
|
path::PathBuf,
|
||||||
str::FromStr,
|
str::FromStr,
|
||||||
sync::{atomic::AtomicU64, Mutex, MutexGuard, OnceLock},
|
sync::{atomic::AtomicU32, Arc, Mutex, MutexGuard, OnceLock},
|
||||||
};
|
};
|
||||||
|
|
||||||
use bitcoincore_rpc::{
|
use bitcoincore_rpc::{
|
||||||
bitcoin::secp256k1::SecretKey,
|
bitcoin::{hashes::Hash, secp256k1::SecretKey},
|
||||||
json::{self as bitcoin_json},
|
json::{self as bitcoin_json},
|
||||||
};
|
};
|
||||||
use commit::{lock_members, MEMBERLIST};
|
use commit::{lock_members, MEMBERLIST};
|
||||||
@ -19,15 +19,7 @@ use futures_util::{future, pin_mut, stream::TryStreamExt, FutureExt, StreamExt};
|
|||||||
use log::{debug, error, warn};
|
use log::{debug, error, warn};
|
||||||
use message::{broadcast_message, process_message, BroadcastType, MessageCache, MESSAGECACHE};
|
use message::{broadcast_message, process_message, BroadcastType, MessageCache, MESSAGECACHE};
|
||||||
use scan::{check_transaction_alone, compute_partial_tweak_to_transaction};
|
use scan::{check_transaction_alone, compute_partial_tweak_to_transaction};
|
||||||
use sdk_common::sp_client::{
|
use sdk_common::network::{AnkFlag, NewTxMessage};
|
||||||
bitcoin::secp256k1::rand::{thread_rng, Rng},
|
|
||||||
bitcoin::OutPoint,
|
|
||||||
SpClient, SpendKey,
|
|
||||||
};
|
|
||||||
use sdk_common::{
|
|
||||||
error::AnkError,
|
|
||||||
network::{AnkFlag, NewTxMessage},
|
|
||||||
};
|
|
||||||
use sdk_common::{
|
use sdk_common::{
|
||||||
network::HandshakeMessage,
|
network::HandshakeMessage,
|
||||||
pcd::Member,
|
pcd::Member,
|
||||||
@ -41,10 +33,16 @@ use sdk_common::{
|
|||||||
Amount, Network, Transaction,
|
Amount, Network, Transaction,
|
||||||
},
|
},
|
||||||
silentpayments::SilentPaymentAddress,
|
silentpayments::SilentPaymentAddress,
|
||||||
OwnedOutput,
|
|
||||||
},
|
},
|
||||||
MutexExt,
|
MutexExt,
|
||||||
};
|
};
|
||||||
|
use sdk_common::{
|
||||||
|
sp_client::{
|
||||||
|
bitcoin::{secp256k1::rand::thread_rng, OutPoint},
|
||||||
|
OutputSpendStatus, SpClient, SpendKey,
|
||||||
|
},
|
||||||
|
updates::{init_update_sink, NativeUpdateSink, StateUpdate},
|
||||||
|
};
|
||||||
|
|
||||||
use serde_json::Value;
|
use serde_json::Value;
|
||||||
use tokio::net::{TcpListener, TcpStream};
|
use tokio::net::{TcpListener, TcpStream};
|
||||||
@ -58,7 +56,6 @@ use zeromq::{Socket, SocketRecv};
|
|||||||
mod commit;
|
mod commit;
|
||||||
mod config;
|
mod config;
|
||||||
mod daemon;
|
mod daemon;
|
||||||
mod electrumclient;
|
|
||||||
mod faucet;
|
mod faucet;
|
||||||
mod message;
|
mod message;
|
||||||
mod scan;
|
mod scan;
|
||||||
@ -69,6 +66,8 @@ use crate::{
|
|||||||
scan::scan_blocks,
|
scan::scan_blocks,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
pub const WITH_CUTTHROUGH: bool = false; // We'd rather catch everything for this use case
|
||||||
|
|
||||||
type Tx = UnboundedSender<Message>;
|
type Tx = UnboundedSender<Message>;
|
||||||
|
|
||||||
type PeerMap = Mutex<HashMap<SocketAddr, Tx>>;
|
type PeerMap = Mutex<HashMap<SocketAddr, Tx>>;
|
||||||
@ -77,7 +76,7 @@ pub(crate) static PEERMAP: OnceLock<PeerMap> = OnceLock::new();
|
|||||||
|
|
||||||
pub(crate) static DAEMON: OnceLock<Mutex<Box<dyn RpcCall>>> = OnceLock::new();
|
pub(crate) static DAEMON: OnceLock<Mutex<Box<dyn RpcCall>>> = OnceLock::new();
|
||||||
|
|
||||||
static CHAIN_TIP: AtomicU64 = AtomicU64::new(0);
|
static CHAIN_TIP: AtomicU32 = AtomicU32::new(0);
|
||||||
|
|
||||||
pub static FREEZED_UTXOS: OnceLock<Mutex<HashSet<OutPoint>>> = OnceLock::new();
|
pub static FREEZED_UTXOS: OnceLock<Mutex<HashSet<OutPoint>>> = OnceLock::new();
|
||||||
|
|
||||||
@ -205,7 +204,7 @@ async fn handle_connection(
|
|||||||
our_sp_address.to_string(),
|
our_sp_address.to_string(),
|
||||||
OutPointMemberMap(members),
|
OutPointMemberMap(members),
|
||||||
OutPointProcessMap(processes),
|
OutPointProcessMap(processes),
|
||||||
current_tip,
|
current_tip.into(),
|
||||||
);
|
);
|
||||||
|
|
||||||
if let Err(e) = broadcast_message(
|
if let Err(e) = broadcast_message(
|
||||||
@ -246,7 +245,6 @@ async fn handle_connection(
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn create_new_tx_message(transaction: Vec<u8>) -> Result<NewTxMessage> {
|
fn create_new_tx_message(transaction: Vec<u8>) -> Result<NewTxMessage> {
|
||||||
// debug!("Creating tx message");
|
|
||||||
let tx: Transaction = deserialize(&transaction)?;
|
let tx: Transaction = deserialize(&transaction)?;
|
||||||
|
|
||||||
if tx.is_coinbase() {
|
if tx.is_coinbase() {
|
||||||
@ -267,7 +265,63 @@ fn create_new_tx_message(transaction: Vec<u8>) -> Result<NewTxMessage> {
|
|||||||
))
|
))
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn handle_zmq(zmq_url: String, electrum_url: String) {
|
async fn handle_scan_updates(
|
||||||
|
scan_rx: std::sync::mpsc::Receiver<sdk_common::updates::ScanProgress>,
|
||||||
|
) {
|
||||||
|
while let Ok(update) = scan_rx.recv() {
|
||||||
|
log::debug!("Received scan update: {:?}", update);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn handle_state_updates(
|
||||||
|
state_rx: std::sync::mpsc::Receiver<sdk_common::updates::StateUpdate>,
|
||||||
|
) {
|
||||||
|
while let Ok(update) = state_rx.recv() {
|
||||||
|
match update {
|
||||||
|
StateUpdate::Update {
|
||||||
|
blkheight,
|
||||||
|
blkhash,
|
||||||
|
found_outputs,
|
||||||
|
found_inputs,
|
||||||
|
} => {
|
||||||
|
// We update the wallet with found outputs and inputs
|
||||||
|
let mut sp_wallet = WALLET.get().unwrap().lock_anyhow().unwrap();
|
||||||
|
// inputs first
|
||||||
|
for outpoint in found_inputs {
|
||||||
|
sp_wallet.mark_output_mined(&outpoint, blkhash);
|
||||||
|
}
|
||||||
|
sp_wallet.get_mut_outputs().extend(found_outputs);
|
||||||
|
sp_wallet.set_last_scan(blkheight.to_consensus_u32());
|
||||||
|
let json = serde_json::to_value(sp_wallet.clone()).unwrap();
|
||||||
|
STORAGE
|
||||||
|
.get()
|
||||||
|
.unwrap()
|
||||||
|
.lock_anyhow()
|
||||||
|
.unwrap()
|
||||||
|
.wallet_file
|
||||||
|
.save(&json)
|
||||||
|
.unwrap();
|
||||||
|
}
|
||||||
|
StateUpdate::NoUpdate { blkheight } => {
|
||||||
|
// We just keep the current height to update the last_scan
|
||||||
|
debug!("No update, setting last scan at {}", blkheight);
|
||||||
|
let mut sp_wallet = WALLET.get().unwrap().lock_anyhow().unwrap();
|
||||||
|
sp_wallet.set_last_scan(blkheight.to_consensus_u32());
|
||||||
|
let json = serde_json::to_value(sp_wallet.clone()).unwrap();
|
||||||
|
STORAGE
|
||||||
|
.get()
|
||||||
|
.unwrap()
|
||||||
|
.lock_anyhow()
|
||||||
|
.unwrap()
|
||||||
|
.wallet_file
|
||||||
|
.save(&json)
|
||||||
|
.unwrap();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn handle_zmq(zmq_url: String, blindbit_url: String) {
|
||||||
debug!("Starting listening on Core");
|
debug!("Starting listening on Core");
|
||||||
let mut socket = zeromq::SubSocket::new();
|
let mut socket = zeromq::SubSocket::new();
|
||||||
socket.connect(&zmq_url).await.unwrap();
|
socket.connect(&zmq_url).await.unwrap();
|
||||||
@ -297,13 +351,51 @@ async fn handle_zmq(zmq_url: String, electrum_url: String) {
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
Ok("hashblock") => match scan_blocks(0, &electrum_url) {
|
Ok("hashblock") => {
|
||||||
Ok(_) => continue,
|
let current_height = DAEMON
|
||||||
Err(e) => {
|
.get()
|
||||||
error!("{}", e);
|
.unwrap()
|
||||||
continue;
|
.lock_anyhow()
|
||||||
|
.unwrap()
|
||||||
|
.get_current_height()
|
||||||
|
.unwrap();
|
||||||
|
CHAIN_TIP.store(current_height as u32, std::sync::atomic::Ordering::SeqCst);
|
||||||
|
|
||||||
|
// Add retry logic for hashblock processing
|
||||||
|
let mut retry_count = 0;
|
||||||
|
const MAX_RETRIES: u32 = 4;
|
||||||
|
const RETRY_DELAY_MS: u64 = 1000; // 1 second initial delay
|
||||||
|
|
||||||
|
loop {
|
||||||
|
match scan_blocks(0, &blindbit_url).await {
|
||||||
|
Ok(_) => {
|
||||||
|
debug!("Successfully scanned blocks after {} retries", retry_count);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
retry_count += 1;
|
||||||
|
if retry_count >= MAX_RETRIES {
|
||||||
|
error!(
|
||||||
|
"Failed to scan blocks after {} retries: {}",
|
||||||
|
MAX_RETRIES, e
|
||||||
|
);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Exponential backoff: 1s, 2s, 4s
|
||||||
|
let delay_ms = RETRY_DELAY_MS * (1 << (retry_count - 1));
|
||||||
|
warn!(
|
||||||
|
"Scan failed (attempt {}/{}), retrying in {}ms: {}",
|
||||||
|
retry_count, MAX_RETRIES, delay_ms, e
|
||||||
|
);
|
||||||
|
|
||||||
|
tokio::time::sleep(tokio::time::Duration::from_millis(delay_ms))
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
},
|
continue;
|
||||||
|
}
|
||||||
_ => {
|
_ => {
|
||||||
error!("Unexpected message in zmq");
|
error!("Unexpected message in zmq");
|
||||||
continue;
|
continue;
|
||||||
@ -356,6 +448,9 @@ async fn main() -> Result<()> {
|
|||||||
.get_current_height()?
|
.get_current_height()?
|
||||||
.try_into()?;
|
.try_into()?;
|
||||||
|
|
||||||
|
// Set CHAIN_TIP
|
||||||
|
CHAIN_TIP.store(current_tip, std::sync::atomic::Ordering::SeqCst);
|
||||||
|
|
||||||
let mut app_dir = PathBuf::from_str(&env::var("HOME")?)?;
|
let mut app_dir = PathBuf::from_str(&env::var("HOME")?)?;
|
||||||
app_dir.push(config.data_dir);
|
app_dir.push(config.data_dir);
|
||||||
let mut wallet_file = app_dir.clone();
|
let mut wallet_file = app_dir.clone();
|
||||||
@ -470,16 +565,24 @@ async fn main() -> Result<()> {
|
|||||||
|
|
||||||
STORAGE.set(Mutex::new(storage)).unwrap();
|
STORAGE.set(Mutex::new(storage)).unwrap();
|
||||||
|
|
||||||
|
let (sink, scan_rx, state_rx) = NativeUpdateSink::new();
|
||||||
|
init_update_sink(Arc::new(sink));
|
||||||
|
|
||||||
|
// Spawn the update handlers
|
||||||
|
tokio::spawn(handle_scan_updates(scan_rx));
|
||||||
|
tokio::spawn(handle_state_updates(state_rx));
|
||||||
|
|
||||||
if last_scan < current_tip {
|
if last_scan < current_tip {
|
||||||
log::info!("Scanning for our outputs");
|
log::info!("Scanning for our outputs");
|
||||||
scan_blocks(current_tip - last_scan, &config.electrum_url)?;
|
scan_blocks(current_tip - last_scan, &config.blindbit_url).await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
Set CHAIN_TIP
|
|
||||||
CHAIN_TIP.store(current_tip as u64, std::sync::atomic::Ordering::SeqCst);
|
|
||||||
|
|
||||||
// Subscribe to Bitcoin Core
|
// Subscribe to Bitcoin Core
|
||||||
tokio::spawn(handle_zmq(config.zmq_url, config.electrum_url));
|
let zmq_url = config.zmq_url.clone();
|
||||||
|
let blindbit_url = config.blindbit_url.clone();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
handle_zmq(zmq_url, blindbit_url).await;
|
||||||
|
});
|
||||||
|
|
||||||
// Create the event loop and TCP listener we'll accept connections on.
|
// Create the event loop and TCP listener we'll accept connections on.
|
||||||
let try_socket = TcpListener::bind(config.ws_url).await;
|
let try_socket = TcpListener::bind(config.ws_url).await;
|
||||||
|
443
src/scan.rs
443
src/scan.rs
@ -1,24 +1,38 @@
|
|||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
|
use std::collections::HashSet;
|
||||||
use std::str::FromStr;
|
use std::str::FromStr;
|
||||||
|
use std::sync::atomic::AtomicBool;
|
||||||
|
use std::sync::atomic::Ordering;
|
||||||
|
use std::sync::Arc;
|
||||||
use std::sync::MutexGuard;
|
use std::sync::MutexGuard;
|
||||||
|
|
||||||
|
use anyhow::bail;
|
||||||
use anyhow::{Error, Result};
|
use anyhow::{Error, Result};
|
||||||
use bitcoincore_rpc::bitcoin::absolute::Height;
|
use bitcoincore_rpc::bitcoin::absolute::Height;
|
||||||
|
use bitcoincore_rpc::bitcoin::hashes::sha256;
|
||||||
use bitcoincore_rpc::bitcoin::hashes::Hash;
|
use bitcoincore_rpc::bitcoin::hashes::Hash;
|
||||||
use electrum_client::ElectrumApi;
|
use bitcoincore_rpc::bitcoin::Amount;
|
||||||
|
use futures_util::Stream;
|
||||||
|
use log::info;
|
||||||
use sdk_common::silentpayments::SpWallet;
|
use sdk_common::silentpayments::SpWallet;
|
||||||
use sdk_common::sp_client::bitcoin::bip158::BlockFilter;
|
use sdk_common::sp_client::bitcoin::bip158::BlockFilter;
|
||||||
use sdk_common::sp_client::bitcoin::hex::DisplayHex;
|
|
||||||
use sdk_common::sp_client::bitcoin::secp256k1::{All, PublicKey, Scalar, Secp256k1, SecretKey};
|
use sdk_common::sp_client::bitcoin::secp256k1::{All, PublicKey, Scalar, Secp256k1, SecretKey};
|
||||||
use sdk_common::sp_client::bitcoin::{BlockHash, OutPoint, Transaction, TxOut, XOnlyPublicKey};
|
use sdk_common::sp_client::bitcoin::{BlockHash, OutPoint, Transaction, TxOut, XOnlyPublicKey};
|
||||||
use sdk_common::sp_client::silentpayments::receiving::Receiver;
|
use sdk_common::sp_client::silentpayments::receiving::Receiver;
|
||||||
use sdk_common::sp_client::silentpayments::utils::receiving::{
|
use sdk_common::sp_client::silentpayments::utils::receiving::{
|
||||||
calculate_tweak_data, get_pubkey_from_input,
|
calculate_tweak_data, get_pubkey_from_input,
|
||||||
};
|
};
|
||||||
use sdk_common::sp_client::{OutputSpendStatus, OwnedOutput};
|
use sdk_common::sp_client::BlockData;
|
||||||
|
use sdk_common::sp_client::ChainBackend;
|
||||||
|
use sdk_common::sp_client::FilterData;
|
||||||
|
use sdk_common::sp_client::SpClient;
|
||||||
|
use sdk_common::sp_client::Updater;
|
||||||
|
use sdk_common::sp_client::{BlindbitBackend, OutputSpendStatus, OwnedOutput, SpScanner};
|
||||||
|
use sdk_common::updates::StateUpdater;
|
||||||
use tokio::time::Instant;
|
use tokio::time::Instant;
|
||||||
|
|
||||||
use crate::{electrumclient, MutexExt, DAEMON, STORAGE, WALLET};
|
use crate::CHAIN_TIP;
|
||||||
|
use crate::{MutexExt, DAEMON, STORAGE, WALLET, WITH_CUTTHROUGH};
|
||||||
|
|
||||||
pub fn compute_partial_tweak_to_transaction(tx: &Transaction) -> Result<PublicKey> {
|
pub fn compute_partial_tweak_to_transaction(tx: &Transaction) -> Result<PublicKey> {
|
||||||
let daemon = DAEMON.get().ok_or(Error::msg("DAEMON not initialized"))?;
|
let daemon = DAEMON.get().ok_or(Error::msg("DAEMON not initialized"))?;
|
||||||
@ -236,23 +250,318 @@ fn scan_block_inputs(
|
|||||||
Ok(found)
|
Ok(found)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn scan_blocks(mut n_blocks_to_scan: u32, electrum_url: &str) -> anyhow::Result<()> {
|
pub struct NativeSpScanner<'a> {
|
||||||
|
updater: Box<dyn Updater + Sync + Send>,
|
||||||
|
backend: Box<dyn ChainBackend + Sync + Send>,
|
||||||
|
client: SpClient,
|
||||||
|
keep_scanning: &'a AtomicBool, // used to interrupt scanning
|
||||||
|
owned_outpoints: HashSet<OutPoint>, // used to scan block inputs
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a> NativeSpScanner<'a> {
|
||||||
|
pub fn new(
|
||||||
|
client: SpClient,
|
||||||
|
updater: Box<dyn Updater + Sync + Send>,
|
||||||
|
backend: Box<dyn ChainBackend + Sync + Send>,
|
||||||
|
owned_outpoints: HashSet<OutPoint>,
|
||||||
|
keep_scanning: &'a AtomicBool,
|
||||||
|
) -> Self {
|
||||||
|
Self {
|
||||||
|
client,
|
||||||
|
updater,
|
||||||
|
backend,
|
||||||
|
owned_outpoints,
|
||||||
|
keep_scanning,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn process_blocks(
|
||||||
|
&mut self,
|
||||||
|
start: Height,
|
||||||
|
end: Height,
|
||||||
|
block_data_stream: impl Stream<Item = Result<BlockData>> + Unpin + Send,
|
||||||
|
) -> Result<()> {
|
||||||
|
use sdk_common::sp_client::futures::StreamExt;
|
||||||
|
use std::time::{Duration, Instant};
|
||||||
|
|
||||||
|
let mut update_time = Instant::now();
|
||||||
|
let mut stream = block_data_stream;
|
||||||
|
|
||||||
|
while let Some(blockdata) = stream.next().await {
|
||||||
|
let blockdata = blockdata?;
|
||||||
|
let blkheight = blockdata.blkheight;
|
||||||
|
let blkhash = blockdata.blkhash;
|
||||||
|
|
||||||
|
// stop scanning and return if interrupted
|
||||||
|
if self.should_interrupt() {
|
||||||
|
self.save_state()?;
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut save_to_storage = false;
|
||||||
|
|
||||||
|
// always save on last block or after 30 seconds since last save
|
||||||
|
if blkheight == end || update_time.elapsed() > Duration::from_secs(30) {
|
||||||
|
save_to_storage = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
let (found_outputs, found_inputs) = self.process_block(blockdata).await?;
|
||||||
|
|
||||||
|
if !found_outputs.is_empty() {
|
||||||
|
save_to_storage = true;
|
||||||
|
self.record_outputs(blkheight, blkhash, found_outputs)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
if !found_inputs.is_empty() {
|
||||||
|
save_to_storage = true;
|
||||||
|
self.record_inputs(blkheight, blkhash, found_inputs)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
// tell the updater we scanned this block
|
||||||
|
self.record_progress(start, blkheight, end)?;
|
||||||
|
|
||||||
|
if save_to_storage {
|
||||||
|
self.save_state()?;
|
||||||
|
update_time = Instant::now();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
impl<'a> SpScanner for NativeSpScanner<'a> {
|
||||||
|
async fn scan_blocks(
|
||||||
|
&mut self,
|
||||||
|
start: Height,
|
||||||
|
end: Height,
|
||||||
|
dust_limit: Amount,
|
||||||
|
with_cutthrough: bool,
|
||||||
|
) -> Result<()> {
|
||||||
|
if start > end {
|
||||||
|
bail!("bigger start than end: {} > {}", start, end);
|
||||||
|
}
|
||||||
|
|
||||||
|
info!("start: {} end: {}", start, end);
|
||||||
|
let start_time: Instant = Instant::now();
|
||||||
|
|
||||||
|
// get block data stream
|
||||||
|
let range = start.to_consensus_u32()..=end.to_consensus_u32();
|
||||||
|
let block_data_stream = self.get_block_data_stream(range, dust_limit, with_cutthrough);
|
||||||
|
|
||||||
|
// process blocks using block data stream
|
||||||
|
self.process_blocks(start, end, block_data_stream).await?;
|
||||||
|
|
||||||
|
// time elapsed for the scan
|
||||||
|
info!(
|
||||||
|
"Blindbit scan complete in {} seconds",
|
||||||
|
start_time.elapsed().as_secs()
|
||||||
|
);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn process_block(
|
||||||
|
&mut self,
|
||||||
|
blockdata: BlockData,
|
||||||
|
) -> Result<(HashMap<OutPoint, OwnedOutput>, HashSet<OutPoint>)> {
|
||||||
|
let BlockData {
|
||||||
|
blkheight,
|
||||||
|
tweaks,
|
||||||
|
new_utxo_filter,
|
||||||
|
spent_filter,
|
||||||
|
..
|
||||||
|
} = blockdata;
|
||||||
|
|
||||||
|
let outs = self
|
||||||
|
.process_block_outputs(blkheight, tweaks, new_utxo_filter)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
// after processing outputs, we add the found outputs to our list
|
||||||
|
self.owned_outpoints.extend(outs.keys());
|
||||||
|
|
||||||
|
let ins = self.process_block_inputs(blkheight, spent_filter).await?;
|
||||||
|
|
||||||
|
// after processing inputs, we remove the found inputs
|
||||||
|
self.owned_outpoints.retain(|item| !ins.contains(item));
|
||||||
|
|
||||||
|
Ok((outs, ins))
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn process_block_outputs(
|
||||||
|
&self,
|
||||||
|
blkheight: Height,
|
||||||
|
tweaks: Vec<PublicKey>,
|
||||||
|
new_utxo_filter: FilterData,
|
||||||
|
) -> Result<HashMap<OutPoint, OwnedOutput>> {
|
||||||
|
let mut res = HashMap::new();
|
||||||
|
|
||||||
|
if !tweaks.is_empty() {
|
||||||
|
let secrets_map = self.client.get_script_to_secret_map(tweaks)?;
|
||||||
|
|
||||||
|
//last_scan = last_scan.max(n as u32);
|
||||||
|
let candidate_spks: Vec<&[u8; 34]> = secrets_map.keys().collect();
|
||||||
|
|
||||||
|
//get block gcs & check match
|
||||||
|
let blkfilter = BlockFilter::new(&new_utxo_filter.data);
|
||||||
|
let blkhash = new_utxo_filter.block_hash;
|
||||||
|
|
||||||
|
let matched_outputs = Self::check_block_outputs(blkfilter, blkhash, candidate_spks)?;
|
||||||
|
|
||||||
|
//if match: fetch and scan utxos
|
||||||
|
if matched_outputs {
|
||||||
|
info!("matched outputs on: {}", blkheight);
|
||||||
|
let found = self.scan_utxos(blkheight, secrets_map).await?;
|
||||||
|
|
||||||
|
if !found.is_empty() {
|
||||||
|
for (label, utxo, tweak) in found {
|
||||||
|
let outpoint = OutPoint {
|
||||||
|
txid: utxo.txid,
|
||||||
|
vout: utxo.vout,
|
||||||
|
};
|
||||||
|
|
||||||
|
let out = OwnedOutput {
|
||||||
|
blockheight: blkheight,
|
||||||
|
tweak: tweak.to_be_bytes(),
|
||||||
|
amount: utxo.value,
|
||||||
|
script: utxo.scriptpubkey,
|
||||||
|
label,
|
||||||
|
spend_status: OutputSpendStatus::Unspent,
|
||||||
|
};
|
||||||
|
|
||||||
|
res.insert(outpoint, out);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(res)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn process_block_inputs(
|
||||||
|
&self,
|
||||||
|
blkheight: Height,
|
||||||
|
spent_filter: FilterData,
|
||||||
|
) -> Result<HashSet<OutPoint>> {
|
||||||
|
let mut res = HashSet::new();
|
||||||
|
|
||||||
|
let blkhash = spent_filter.block_hash;
|
||||||
|
|
||||||
|
// first get the 8-byte hashes used to construct the input filter
|
||||||
|
let input_hashes_map = self.get_input_hashes(blkhash)?;
|
||||||
|
|
||||||
|
// check against filter
|
||||||
|
let blkfilter = BlockFilter::new(&spent_filter.data);
|
||||||
|
let matched_inputs = self.check_block_inputs(
|
||||||
|
blkfilter,
|
||||||
|
blkhash,
|
||||||
|
input_hashes_map.keys().cloned().collect(),
|
||||||
|
)?;
|
||||||
|
|
||||||
|
// if match: download spent data, collect the outpoints that are spent
|
||||||
|
if matched_inputs {
|
||||||
|
info!("matched inputs on: {}", blkheight);
|
||||||
|
let spent = self.backend.spent_index(blkheight).await?.data;
|
||||||
|
|
||||||
|
for spent in spent {
|
||||||
|
let hex: &[u8] = spent.as_ref();
|
||||||
|
|
||||||
|
if let Some(outpoint) = input_hashes_map.get(hex) {
|
||||||
|
res.insert(*outpoint);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(res)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_block_data_stream(
|
||||||
|
&self,
|
||||||
|
range: std::ops::RangeInclusive<u32>,
|
||||||
|
dust_limit: Amount,
|
||||||
|
with_cutthrough: bool,
|
||||||
|
) -> std::pin::Pin<Box<dyn Stream<Item = Result<BlockData>> + Send>> {
|
||||||
|
self.backend
|
||||||
|
.get_block_data_for_range(range, dust_limit, with_cutthrough)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn should_interrupt(&self) -> bool {
|
||||||
|
!self
|
||||||
|
.keep_scanning
|
||||||
|
.load(std::sync::atomic::Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn save_state(&mut self) -> Result<()> {
|
||||||
|
self.updater.save_to_persistent_storage()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn record_outputs(
|
||||||
|
&mut self,
|
||||||
|
height: Height,
|
||||||
|
block_hash: BlockHash,
|
||||||
|
outputs: HashMap<OutPoint, OwnedOutput>,
|
||||||
|
) -> Result<()> {
|
||||||
|
self.updater.record_block_outputs(height, block_hash, outputs)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn record_inputs(
|
||||||
|
&mut self,
|
||||||
|
height: Height,
|
||||||
|
block_hash: BlockHash,
|
||||||
|
inputs: HashSet<OutPoint>,
|
||||||
|
) -> Result<()> {
|
||||||
|
self.updater.record_block_inputs(height, block_hash, inputs)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn record_progress(&mut self, start: Height, current: Height, end: Height) -> Result<()> {
|
||||||
|
self.updater.record_scan_progress(start, current, end)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn client(&self) -> &SpClient {
|
||||||
|
&self.client
|
||||||
|
}
|
||||||
|
|
||||||
|
fn backend(&self) -> &dyn ChainBackend {
|
||||||
|
self.backend.as_ref()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn updater(&mut self) -> &mut dyn Updater {
|
||||||
|
self.updater.as_mut()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Override the default get_input_hashes implementation to use owned_outpoints
|
||||||
|
fn get_input_hashes(&self, blkhash: BlockHash) -> Result<HashMap<[u8; 8], OutPoint>> {
|
||||||
|
let mut map: HashMap<[u8; 8], OutPoint> = HashMap::new();
|
||||||
|
|
||||||
|
for outpoint in &self.owned_outpoints {
|
||||||
|
let mut arr = [0u8; 68];
|
||||||
|
arr[..32].copy_from_slice(&outpoint.txid.to_raw_hash().to_byte_array());
|
||||||
|
arr[32..36].copy_from_slice(&outpoint.vout.to_le_bytes());
|
||||||
|
arr[36..].copy_from_slice(&blkhash.to_byte_array());
|
||||||
|
let hash = sha256::Hash::hash(&arr);
|
||||||
|
|
||||||
|
let mut res = [0u8; 8];
|
||||||
|
res.copy_from_slice(&hash[..8]);
|
||||||
|
|
||||||
|
map.insert(res, outpoint.clone());
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(map)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn scan_blocks(mut n_blocks_to_scan: u32, blindbit_url: &str) -> anyhow::Result<()> {
|
||||||
log::info!("Starting a rescan");
|
log::info!("Starting a rescan");
|
||||||
let electrum_client = electrumclient::create_electrum_client(electrum_url)?;
|
|
||||||
|
|
||||||
let mut sp_wallet = WALLET
|
// Get all the data we need upfront, before any async operations
|
||||||
.get()
|
let (sp_wallet, scan_height, tip_height) = {
|
||||||
.ok_or(Error::msg("Wallet not initialized"))?
|
let sp_wallet = WALLET
|
||||||
.lock_anyhow()?;
|
.get()
|
||||||
|
.ok_or(Error::msg("Wallet not initialized"))?
|
||||||
let core = DAEMON
|
.lock_anyhow()?;
|
||||||
.get()
|
let scan_height = sp_wallet.get_last_scan();
|
||||||
.ok_or(Error::msg("DAEMON not initialized"))?
|
let tip_height: u32 = CHAIN_TIP.load(Ordering::Relaxed).try_into()?;
|
||||||
.lock_anyhow()?;
|
(sp_wallet.clone(), scan_height, tip_height)
|
||||||
|
};
|
||||||
let secp = Secp256k1::new();
|
|
||||||
let scan_height = sp_wallet.get_last_scan();
|
|
||||||
let tip_height: u32 = core.get_current_height()?.try_into()?;
|
|
||||||
|
|
||||||
// 0 means scan to tip
|
// 0 means scan to tip
|
||||||
if n_blocks_to_scan == 0 {
|
if n_blocks_to_scan == 0 {
|
||||||
@ -270,73 +579,32 @@ pub fn scan_blocks(mut n_blocks_to_scan: u32, electrum_url: &str) -> anyhow::Res
|
|||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let updater = StateUpdater::new();
|
||||||
|
let backend = BlindbitBackend::new(blindbit_url.to_string())?;
|
||||||
|
|
||||||
|
let owned_outpoints = sp_wallet.get_unspent_outputs().keys().map(|o| *o).collect();
|
||||||
|
|
||||||
|
let keep_scanning = Arc::new(AtomicBool::new(true));
|
||||||
|
|
||||||
log::info!("start: {} end: {}", start, end);
|
log::info!("start: {} end: {}", start, end);
|
||||||
let mut filters: Vec<(u32, BlockHash, BlockFilter)> = vec![];
|
|
||||||
for blkheight in start..=end {
|
|
||||||
filters.push(core.get_filters(blkheight)?);
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut tweak_data_map = electrum_client.sp_tweaks(start as usize)?;
|
|
||||||
|
|
||||||
let scan_sk = sp_wallet.get_sp_client().get_scan_key();
|
|
||||||
|
|
||||||
let start_time = Instant::now();
|
let start_time = Instant::now();
|
||||||
|
let mut scanner = NativeSpScanner::new(
|
||||||
|
sp_wallet.get_sp_client().clone(),
|
||||||
|
Box::new(updater),
|
||||||
|
Box::new(backend),
|
||||||
|
owned_outpoints,
|
||||||
|
&keep_scanning,
|
||||||
|
);
|
||||||
|
|
||||||
for (blkheight, blkhash, blkfilter) in filters {
|
let dust_limit = Amount::from_sat(0); // We don't really have a dust limit for this use case
|
||||||
let spk2secret = match tweak_data_map.remove(&blkheight) {
|
scanner
|
||||||
Some(tweak_data_vec) => get_script_to_secret_map(
|
.scan_blocks(
|
||||||
&sp_wallet.get_sp_client().sp_receiver,
|
Height::from_consensus(start)?,
|
||||||
tweak_data_vec,
|
Height::from_consensus(end)?,
|
||||||
scan_sk.into(),
|
dust_limit,
|
||||||
&secp,
|
WITH_CUTTHROUGH,
|
||||||
)?,
|
)
|
||||||
None => HashMap::new(),
|
.await?;
|
||||||
};
|
|
||||||
|
|
||||||
// check if new possible outputs are payments to us
|
|
||||||
let candidate_spks: Vec<&[u8; 34]> = spk2secret.keys().collect();
|
|
||||||
|
|
||||||
// check if owned inputs are spent
|
|
||||||
let owned_spks: Vec<Vec<u8>> = sp_wallet
|
|
||||||
.get_outputs()
|
|
||||||
.iter()
|
|
||||||
.map(|(_, output)| {
|
|
||||||
let script = output.script.to_bytes();
|
|
||||||
script
|
|
||||||
})
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
let matched = check_block(blkfilter, blkhash, candidate_spks, owned_spks)?;
|
|
||||||
|
|
||||||
if matched {
|
|
||||||
let blk = core.get_block(blkhash)?;
|
|
||||||
|
|
||||||
// scan block for new outputs, and add them to our list
|
|
||||||
let utxo_created_in_block = scan_block_outputs(
|
|
||||||
&sp_wallet.get_sp_client().sp_receiver,
|
|
||||||
&blk.txdata,
|
|
||||||
blkheight.into(),
|
|
||||||
spk2secret,
|
|
||||||
)?;
|
|
||||||
if !utxo_created_in_block.is_empty() {
|
|
||||||
sp_wallet.get_mut_outputs().extend(utxo_created_in_block);
|
|
||||||
}
|
|
||||||
|
|
||||||
// update the list of outputs just in case
|
|
||||||
// utxos may be created and destroyed in the same block
|
|
||||||
// search inputs and mark as mined
|
|
||||||
let utxo_destroyed_in_block = scan_block_inputs(sp_wallet.get_outputs(), blk.txdata)?;
|
|
||||||
if !utxo_destroyed_in_block.is_empty() {
|
|
||||||
let outputs = sp_wallet.get_mut_outputs();
|
|
||||||
for outpoint in utxo_destroyed_in_block {
|
|
||||||
if let Some(output) = outputs.get_mut(&outpoint) {
|
|
||||||
output.spend_status =
|
|
||||||
OutputSpendStatus::Mined(blkhash.as_raw_hash().to_byte_array());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// time elapsed for the scan
|
// time elapsed for the scan
|
||||||
log::info!(
|
log::info!(
|
||||||
@ -344,14 +612,5 @@ pub fn scan_blocks(mut n_blocks_to_scan: u32, electrum_url: &str) -> anyhow::Res
|
|||||||
start_time.elapsed().as_secs()
|
start_time.elapsed().as_secs()
|
||||||
);
|
);
|
||||||
|
|
||||||
// update last_scan height
|
|
||||||
sp_wallet.set_last_scan(end);
|
|
||||||
STORAGE
|
|
||||||
.get()
|
|
||||||
.unwrap()
|
|
||||||
.lock_anyhow()?
|
|
||||||
.wallet_file
|
|
||||||
.save(&serde_json::to_value(sp_wallet.clone())?)?;
|
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user