Cargo fmt
This commit is contained in:
parent
2dadcef03c
commit
7d5375e5df
@ -18,11 +18,11 @@ use sdk_common::{
|
|||||||
sp_client::{silentpayments::SilentPaymentAddress, RecipientAddress},
|
sp_client::{silentpayments::SilentPaymentAddress, RecipientAddress},
|
||||||
};
|
};
|
||||||
|
|
||||||
use crate::{
|
|
||||||
CHAIN_TIP,
|
|
||||||
message::{broadcast_message, BroadcastType},
|
|
||||||
};
|
|
||||||
use crate::{lock_freezed_utxos, MutexExt, DAEMON, STORAGE, WALLET};
|
use crate::{lock_freezed_utxos, MutexExt, DAEMON, STORAGE, WALLET};
|
||||||
|
use crate::{
|
||||||
|
message::{broadcast_message, BroadcastType},
|
||||||
|
CHAIN_TIP,
|
||||||
|
};
|
||||||
|
|
||||||
pub(crate) fn handle_commit_request(commit_msg: CommitMessage) -> Result<OutPoint> {
|
pub(crate) fn handle_commit_request(commit_msg: CommitMessage) -> Result<OutPoint> {
|
||||||
let mut processes = lock_processes()?;
|
let mut processes = lock_processes()?;
|
||||||
|
83
src/main.rs
83
src/main.rs
@ -19,9 +19,6 @@ use futures_util::{future, pin_mut, stream::TryStreamExt, FutureExt, StreamExt};
|
|||||||
use log::{debug, error, warn};
|
use log::{debug, error, warn};
|
||||||
use message::{broadcast_message, process_message, BroadcastType, MessageCache, MESSAGECACHE};
|
use message::{broadcast_message, process_message, BroadcastType, MessageCache, MESSAGECACHE};
|
||||||
use scan::{check_transaction_alone, compute_partial_tweak_to_transaction};
|
use scan::{check_transaction_alone, compute_partial_tweak_to_transaction};
|
||||||
use sdk_common::{sp_client::{
|
|
||||||
bitcoin::{secp256k1::rand::thread_rng, OutPoint}, OutputSpendStatus, SpClient, SpendKey
|
|
||||||
}, updates::{init_update_sink, NativeUpdateSink, StateUpdate}};
|
|
||||||
use sdk_common::network::{AnkFlag, NewTxMessage};
|
use sdk_common::network::{AnkFlag, NewTxMessage};
|
||||||
use sdk_common::{
|
use sdk_common::{
|
||||||
network::HandshakeMessage,
|
network::HandshakeMessage,
|
||||||
@ -39,6 +36,13 @@ use sdk_common::{
|
|||||||
},
|
},
|
||||||
MutexExt,
|
MutexExt,
|
||||||
};
|
};
|
||||||
|
use sdk_common::{
|
||||||
|
sp_client::{
|
||||||
|
bitcoin::{secp256k1::rand::thread_rng, OutPoint},
|
||||||
|
OutputSpendStatus, SpClient, SpendKey,
|
||||||
|
},
|
||||||
|
updates::{init_update_sink, NativeUpdateSink, StateUpdate},
|
||||||
|
};
|
||||||
|
|
||||||
use serde_json::Value;
|
use serde_json::Value;
|
||||||
use tokio::net::{TcpListener, TcpStream};
|
use tokio::net::{TcpListener, TcpStream};
|
||||||
@ -261,22 +265,32 @@ fn create_new_tx_message(transaction: Vec<u8>) -> Result<NewTxMessage> {
|
|||||||
))
|
))
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn handle_scan_updates(scan_rx: std::sync::mpsc::Receiver<sdk_common::updates::ScanProgress>) {
|
async fn handle_scan_updates(
|
||||||
|
scan_rx: std::sync::mpsc::Receiver<sdk_common::updates::ScanProgress>,
|
||||||
|
) {
|
||||||
while let Ok(update) = scan_rx.recv() {
|
while let Ok(update) = scan_rx.recv() {
|
||||||
log::debug!("Received scan update: {:?}", update);
|
log::debug!("Received scan update: {:?}", update);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn handle_state_updates(state_rx: std::sync::mpsc::Receiver<sdk_common::updates::StateUpdate>) {
|
async fn handle_state_updates(
|
||||||
|
state_rx: std::sync::mpsc::Receiver<sdk_common::updates::StateUpdate>,
|
||||||
|
) {
|
||||||
while let Ok(update) = state_rx.recv() {
|
while let Ok(update) = state_rx.recv() {
|
||||||
match update {
|
match update {
|
||||||
StateUpdate::Update { blkheight, blkhash, found_outputs, found_inputs } => {
|
StateUpdate::Update {
|
||||||
|
blkheight,
|
||||||
|
blkhash,
|
||||||
|
found_outputs,
|
||||||
|
found_inputs,
|
||||||
|
} => {
|
||||||
// We update the wallet with found outputs and inputs
|
// We update the wallet with found outputs and inputs
|
||||||
let mut sp_wallet = WALLET.get().unwrap().lock_anyhow().unwrap();
|
let mut sp_wallet = WALLET.get().unwrap().lock_anyhow().unwrap();
|
||||||
// inputs first
|
// inputs first
|
||||||
for outpoint in found_inputs {
|
for outpoint in found_inputs {
|
||||||
if let Some(output) = sp_wallet.get_mut_outputs().get_mut(&outpoint) {
|
if let Some(output) = sp_wallet.get_mut_outputs().get_mut(&outpoint) {
|
||||||
output.spend_status = OutputSpendStatus::Mined(blkhash.as_raw_hash().to_byte_array());
|
output.spend_status =
|
||||||
|
OutputSpendStatus::Mined(blkhash.as_raw_hash().to_byte_array());
|
||||||
} else {
|
} else {
|
||||||
// We found an input that we don't have in our wallet, that shouldn't happen
|
// We found an input that we don't have in our wallet, that shouldn't happen
|
||||||
error!("Spent unknown output: {:?}", outpoint);
|
error!("Spent unknown output: {:?}", outpoint);
|
||||||
@ -285,15 +299,29 @@ async fn handle_state_updates(state_rx: std::sync::mpsc::Receiver<sdk_common::up
|
|||||||
sp_wallet.get_mut_outputs().extend(found_outputs);
|
sp_wallet.get_mut_outputs().extend(found_outputs);
|
||||||
sp_wallet.set_last_scan(blkheight.to_consensus_u32());
|
sp_wallet.set_last_scan(blkheight.to_consensus_u32());
|
||||||
let json = serde_json::to_value(sp_wallet.clone()).unwrap();
|
let json = serde_json::to_value(sp_wallet.clone()).unwrap();
|
||||||
STORAGE.get().unwrap().lock_anyhow().unwrap().wallet_file.save(&json).unwrap();
|
STORAGE
|
||||||
|
.get()
|
||||||
|
.unwrap()
|
||||||
|
.lock_anyhow()
|
||||||
|
.unwrap()
|
||||||
|
.wallet_file
|
||||||
|
.save(&json)
|
||||||
|
.unwrap();
|
||||||
}
|
}
|
||||||
StateUpdate::NoUpdate { blkheight }=> {
|
StateUpdate::NoUpdate { blkheight } => {
|
||||||
// We just keep the current height to update the last_scan
|
// We just keep the current height to update the last_scan
|
||||||
debug!("No update, setting last scan at {}", blkheight);
|
debug!("No update, setting last scan at {}", blkheight);
|
||||||
let mut sp_wallet = WALLET.get().unwrap().lock_anyhow().unwrap();
|
let mut sp_wallet = WALLET.get().unwrap().lock_anyhow().unwrap();
|
||||||
sp_wallet.set_last_scan(blkheight.to_consensus_u32());
|
sp_wallet.set_last_scan(blkheight.to_consensus_u32());
|
||||||
let json = serde_json::to_value(sp_wallet.clone()).unwrap();
|
let json = serde_json::to_value(sp_wallet.clone()).unwrap();
|
||||||
STORAGE.get().unwrap().lock_anyhow().unwrap().wallet_file.save(&json).unwrap();
|
STORAGE
|
||||||
|
.get()
|
||||||
|
.unwrap()
|
||||||
|
.lock_anyhow()
|
||||||
|
.unwrap()
|
||||||
|
.wallet_file
|
||||||
|
.save(&json)
|
||||||
|
.unwrap();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -330,14 +358,20 @@ async fn handle_zmq(zmq_url: String, blindbit_url: String) {
|
|||||||
}
|
}
|
||||||
},
|
},
|
||||||
Ok("hashblock") => {
|
Ok("hashblock") => {
|
||||||
let current_height = DAEMON.get().unwrap().lock_anyhow().unwrap().get_current_height().unwrap();
|
let current_height = DAEMON
|
||||||
|
.get()
|
||||||
|
.unwrap()
|
||||||
|
.lock_anyhow()
|
||||||
|
.unwrap()
|
||||||
|
.get_current_height()
|
||||||
|
.unwrap();
|
||||||
CHAIN_TIP.store(current_height as u32, std::sync::atomic::Ordering::SeqCst);
|
CHAIN_TIP.store(current_height as u32, std::sync::atomic::Ordering::SeqCst);
|
||||||
|
|
||||||
// Add retry logic for hashblock processing
|
// Add retry logic for hashblock processing
|
||||||
let mut retry_count = 0;
|
let mut retry_count = 0;
|
||||||
const MAX_RETRIES: u32 = 3;
|
const MAX_RETRIES: u32 = 3;
|
||||||
const RETRY_DELAY_MS: u64 = 1000; // 1 second initial delay
|
const RETRY_DELAY_MS: u64 = 1000; // 1 second initial delay
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
match scan_blocks(0, &blindbit_url).await {
|
match scan_blocks(0, &blindbit_url).await {
|
||||||
Ok(_) => {
|
Ok(_) => {
|
||||||
@ -347,21 +381,27 @@ async fn handle_zmq(zmq_url: String, blindbit_url: String) {
|
|||||||
Err(e) => {
|
Err(e) => {
|
||||||
retry_count += 1;
|
retry_count += 1;
|
||||||
if retry_count >= MAX_RETRIES {
|
if retry_count >= MAX_RETRIES {
|
||||||
error!("Failed to scan blocks after {} retries: {}", MAX_RETRIES, e);
|
error!(
|
||||||
|
"Failed to scan blocks after {} retries: {}",
|
||||||
|
MAX_RETRIES, e
|
||||||
|
);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Exponential backoff: 1s, 2s, 4s
|
// Exponential backoff: 1s, 2s, 4s
|
||||||
let delay_ms = RETRY_DELAY_MS * (1 << (retry_count - 1));
|
let delay_ms = RETRY_DELAY_MS * (1 << (retry_count - 1));
|
||||||
warn!("Scan failed (attempt {}/{}), retrying in {}ms: {}",
|
warn!(
|
||||||
retry_count, MAX_RETRIES, delay_ms, e);
|
"Scan failed (attempt {}/{}), retrying in {}ms: {}",
|
||||||
|
retry_count, MAX_RETRIES, delay_ms, e
|
||||||
tokio::time::sleep(tokio::time::Duration::from_millis(delay_ms)).await;
|
);
|
||||||
|
|
||||||
|
tokio::time::sleep(tokio::time::Duration::from_millis(delay_ms))
|
||||||
|
.await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
continue;
|
continue;
|
||||||
},
|
}
|
||||||
_ => {
|
_ => {
|
||||||
error!("Unexpected message in zmq");
|
error!("Unexpected message in zmq");
|
||||||
continue;
|
continue;
|
||||||
@ -533,7 +573,7 @@ async fn main() -> Result<()> {
|
|||||||
|
|
||||||
let (sink, scan_rx, state_rx) = NativeUpdateSink::new();
|
let (sink, scan_rx, state_rx) = NativeUpdateSink::new();
|
||||||
init_update_sink(Arc::new(sink));
|
init_update_sink(Arc::new(sink));
|
||||||
|
|
||||||
// Spawn the update handlers
|
// Spawn the update handlers
|
||||||
tokio::spawn(handle_scan_updates(scan_rx));
|
tokio::spawn(handle_scan_updates(scan_rx));
|
||||||
tokio::spawn(handle_state_updates(state_rx));
|
tokio::spawn(handle_state_updates(state_rx));
|
||||||
@ -543,7 +583,6 @@ async fn main() -> Result<()> {
|
|||||||
scan_blocks(current_tip - last_scan, &config.blindbit_url).await?;
|
scan_blocks(current_tip - last_scan, &config.blindbit_url).await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
// Subscribe to Bitcoin Core
|
// Subscribe to Bitcoin Core
|
||||||
let zmq_url = config.zmq_url.clone();
|
let zmq_url = config.zmq_url.clone();
|
||||||
let blindbit_url = config.blindbit_url.clone();
|
let blindbit_url = config.blindbit_url.clone();
|
||||||
|
Loading…
x
Reference in New Issue
Block a user