use std::{ collections::{HashMap, HashSet}, env, fmt::Debug, fs, io::{Read, Write}, net::SocketAddr, path::PathBuf, str::FromStr, sync::{atomic::AtomicU32, Arc, Mutex, MutexGuard, OnceLock}, }; use bitcoincore_rpc::{ bitcoin::secp256k1::SecretKey, json::{self as bitcoin_json}, }; use commit::{lock_members, MEMBERLIST}; use futures_util::{future, pin_mut, stream::TryStreamExt, FutureExt, StreamExt}; use log::{debug, error, warn}; use message::{broadcast_message, process_message, BroadcastType, MessageCache, MESSAGECACHE}; use scan::{check_transaction_alone, compute_partial_tweak_to_transaction}; use sdk_common::network::{AnkFlag, NewTxMessage}; use sdk_common::{ network::HandshakeMessage, pcd::Member, process::{lock_processes, Process, CACHEDPROCESSES}, serialization::{OutPointMemberMap, OutPointProcessMap}, silentpayments::SpWallet, sp_client::{ bitcoin::{ consensus::deserialize, hex::{DisplayHex, FromHex}, Amount, Network, Transaction, }, silentpayments::SilentPaymentAddress, }, MutexExt, }; use sdk_common::{ sp_client::{ bitcoin::{secp256k1::rand::thread_rng, OutPoint}, SpClient, SpendKey, }, updates::{init_update_sink, NativeUpdateSink, StateUpdate}, }; use serde_json::Value; use tokio::net::{TcpListener, TcpStream}; use tokio::sync::mpsc::{unbounded_channel, UnboundedSender}; use tokio_stream::wrappers::UnboundedReceiverStream; use tokio_tungstenite::tungstenite::Message; use anyhow::{Error, Result}; use zeromq::{Socket, SocketRecv}; mod commit; mod config; mod daemon; mod faucet; mod message; mod scan; use crate::config::Config; use crate::{ daemon::{Daemon, RpcCall}, scan::scan_blocks, }; pub const WITH_CUTTHROUGH: bool = false; // We'd rather catch everything for this use case type Tx = UnboundedSender; type PeerMap = Mutex>; pub(crate) static PEERMAP: OnceLock = OnceLock::new(); pub(crate) static DAEMON: OnceLock>> = OnceLock::new(); static CHAIN_TIP: AtomicU32 = AtomicU32::new(0); pub static FREEZED_UTXOS: OnceLock>> = OnceLock::new(); pub fn lock_freezed_utxos() -> Result>, Error> { FREEZED_UTXOS .get_or_init(|| Mutex::new(HashSet::new())) .lock_anyhow() } #[derive(Debug)] pub struct StateFile { path: PathBuf, } impl StateFile { fn new(path: PathBuf) -> Self { Self { path } } fn create(&self) -> Result<()> { let parent: PathBuf; if let Some(dir) = self.path.parent() { if !dir.ends_with(".4nk") { return Err(Error::msg("parent dir must be \".4nk\"")); } parent = dir.to_path_buf(); } else { return Err(Error::msg("wallet file has no parent dir")); } // Ensure the parent directory exists if !parent.exists() { fs::create_dir_all(parent)?; } // Create a new file fs::File::create(&self.path)?; Ok(()) } fn save(&self, json: &Value) -> Result<()> { let mut f = fs::File::options() .write(true) .truncate(true) .open(&self.path)?; let stringified = serde_json::to_string(&json)?; let bin = stringified.as_bytes(); f.write_all(bin)?; Ok(()) } fn load(&self) -> Result { let mut f = fs::File::open(&self.path)?; let mut content = vec![]; f.read_to_end(&mut content)?; let res: Value = serde_json::from_slice(&content)?; Ok(res) } } #[derive(Debug)] pub struct DiskStorage { pub wallet_file: StateFile, pub processes_file: StateFile, pub members_file: StateFile, } pub static STORAGE: OnceLock> = OnceLock::new(); const FAUCET_AMT: Amount = Amount::from_sat(10_000); pub(crate) static WALLET: OnceLock> = OnceLock::new(); fn handle_new_tx_request(new_tx_msg: &NewTxMessage) -> Result<()> { let tx = deserialize::(&Vec::from_hex(&new_tx_msg.transaction)?)?; let daemon = DAEMON.get().unwrap().lock_anyhow()?; daemon.test_mempool_accept(&tx)?; daemon.broadcast(&tx)?; Ok(()) } async fn handle_connection( raw_stream: TcpStream, addr: SocketAddr, our_sp_address: SilentPaymentAddress, ) { debug!("Incoming TCP connection from: {}", addr); let peers = PEERMAP.get().expect("Peer Map not initialized"); let ws_stream = match tokio_tungstenite::accept_async(raw_stream).await { Ok(stream) => { debug!("WebSocket connection established"); stream } Err(e) => { log::error!("WebSocket handshake failed for {}: {}", addr, e); return; } }; // Insert the write part of this peer to the peer map. let (tx, rx) = unbounded_channel(); match peers.lock_anyhow() { Ok(mut peer_map) => peer_map.insert(addr, tx), Err(e) => { log::error!("{}", e); panic!(); } }; let processes = lock_processes().unwrap().clone(); let members = lock_members().unwrap().clone(); let current_tip = CHAIN_TIP.load(std::sync::atomic::Ordering::SeqCst); let init_msg = HandshakeMessage::new( our_sp_address.to_string(), OutPointMemberMap(members), OutPointProcessMap(processes), current_tip.into(), ); if let Err(e) = broadcast_message( AnkFlag::Handshake, format!("{}", init_msg.to_string()), BroadcastType::Sender(addr), ) { log::error!("Failed to send init message: {}", e); return; } let (outgoing, incoming) = ws_stream.split(); let broadcast_incoming = incoming.try_for_each(|msg| { if let Ok(raw_msg) = msg.to_text() { // debug!("Received msg: {}", raw_msg); process_message(raw_msg, addr); } else { debug!("Received non-text message {} from peer {}", msg, addr); } future::ok(()) }); let receive_from_others = UnboundedReceiverStream::new(rx) .map(Ok) .forward(outgoing) .map(|result| { if let Err(e) = result { debug!("Error sending message: {}", e); } }); pin_mut!(broadcast_incoming, receive_from_others); future::select(broadcast_incoming, receive_from_others).await; debug!("{} disconnected", &addr); peers.lock().unwrap().remove(&addr); } fn create_new_tx_message(transaction: Vec) -> Result { let tx: Transaction = deserialize(&transaction)?; if tx.is_coinbase() { return Err(Error::msg("Can't process coinbase transaction")); } let partial_tweak = compute_partial_tweak_to_transaction(&tx)?; let sp_wallet = WALLET .get() .ok_or_else(|| Error::msg("Wallet not initialized"))? .lock_anyhow()?; check_transaction_alone(sp_wallet, &tx, &partial_tweak)?; Ok(NewTxMessage::new( transaction.to_lower_hex_string(), Some(partial_tweak.to_string()), )) } async fn handle_scan_updates( scan_rx: std::sync::mpsc::Receiver, ) { while let Ok(update) = scan_rx.recv() { log::debug!("Received scan update: {:?}", update); } } async fn handle_state_updates( state_rx: std::sync::mpsc::Receiver, ) { while let Ok(update) = state_rx.recv() { match update { StateUpdate::Update { blkheight, blkhash, found_outputs, found_inputs, } => { // We update the wallet with found outputs and inputs let mut sp_wallet = WALLET.get().unwrap().lock_anyhow().unwrap(); // inputs first for outpoint in found_inputs { sp_wallet.mark_output_mined(&outpoint, blkhash); } sp_wallet.get_mut_outputs().extend(found_outputs); sp_wallet.set_last_scan(blkheight.to_consensus_u32()); let json = serde_json::to_value(sp_wallet.clone()).unwrap(); STORAGE .get() .unwrap() .lock_anyhow() .unwrap() .wallet_file .save(&json) .unwrap(); } StateUpdate::NoUpdate { blkheight } => { // We just keep the current height to update the last_scan debug!("No update, setting last scan at {}", blkheight); let mut sp_wallet = WALLET.get().unwrap().lock_anyhow().unwrap(); sp_wallet.set_last_scan(blkheight.to_consensus_u32()); let json = serde_json::to_value(sp_wallet.clone()).unwrap(); STORAGE .get() .unwrap() .lock_anyhow() .unwrap() .wallet_file .save(&json) .unwrap(); } } } } async fn handle_zmq(zmq_url: String, blindbit_url: String) { debug!("Starting listening on Core"); let mut socket = zeromq::SubSocket::new(); socket.connect(&zmq_url).await.unwrap(); socket.subscribe("rawtx").await.unwrap(); socket.subscribe("hashblock").await.unwrap(); loop { let core_msg = match socket.recv().await { Ok(m) => m, Err(e) => { error!("Zmq error: {}", e); continue; } }; debug!("Received a message"); let payload: String = if let (Some(topic), Some(data)) = (core_msg.get(0), core_msg.get(1)) { debug!("topic: {}", std::str::from_utf8(&topic).unwrap()); match std::str::from_utf8(&topic) { Ok("rawtx") => match create_new_tx_message(data.to_vec()) { Ok(m) => { debug!("Created message"); serde_json::to_string(&m).expect("This shouldn't fail") } Err(e) => { error!("{}", e); continue; } }, Ok("hashblock") => { let current_height = DAEMON .get() .unwrap() .lock_anyhow() .unwrap() .get_current_height() .unwrap(); CHAIN_TIP.store(current_height as u32, std::sync::atomic::Ordering::SeqCst); // Add retry logic for hashblock processing let mut retry_count = 0; const MAX_RETRIES: u32 = 4; const RETRY_DELAY_MS: u64 = 1000; // 1 second initial delay loop { match scan_blocks(0, &blindbit_url).await { Ok(_) => { debug!("Successfully scanned blocks after {} retries", retry_count); break; } Err(e) => { retry_count += 1; if retry_count >= MAX_RETRIES { error!( "Failed to scan blocks after {} retries: {}", MAX_RETRIES, e ); break; } // Exponential backoff: 1s, 2s, 4s let delay_ms = RETRY_DELAY_MS * (1 << (retry_count - 1)); warn!( "Scan failed (attempt {}/{}), retrying in {}ms: {}", retry_count, MAX_RETRIES, delay_ms, e ); tokio::time::sleep(tokio::time::Duration::from_millis(delay_ms)) .await; } } } continue; } _ => { error!("Unexpected message in zmq"); continue; } } } else { error!("Empty message"); continue; }; if let Err(e) = broadcast_message(AnkFlag::NewTx, payload, BroadcastType::ToAll) { log::error!("{}", e.to_string()); } } } #[tokio::main(flavor = "multi_thread")] async fn main() -> Result<()> { env_logger::init(); // todo: take the path to conf file as argument // default to "./.conf" let config = Config::read_from_file(".conf")?; if config.network == Network::Bitcoin { warn!("Running on mainnet, you're on your own"); } MESSAGECACHE .set(MessageCache::new()) .expect("Message Cache initialization failed"); PEERMAP .set(PeerMap::new(HashMap::new())) .expect("PeerMap initialization failed"); // Connect the rpc daemon with retry logic let mut retry_count = 0; const MAX_RETRIES: u32 = 5; const RETRY_DELAY_MS: u64 = 2000; // 2 seconds initial delay let daemon = loop { let cookie_path = config.cookie_path.as_ref().map(|p| PathBuf::from(p)); match Daemon::connect( config.core_wallet.clone(), config.core_url.clone(), config.network, cookie_path, ) { Ok(daemon) => break daemon, Err(e) => { retry_count += 1; if retry_count >= MAX_RETRIES { return Err(e.context("Failed to connect to Bitcoin Core after multiple attempts")); } log::warn!("Failed to connect to Bitcoin Core (attempt {}/{}): {}", retry_count, MAX_RETRIES, e); std::thread::sleep(std::time::Duration::from_millis(RETRY_DELAY_MS * retry_count as u64)); } } }; DAEMON .set(Mutex::new(Box::new(daemon))) .expect("DAEMON initialization failed"); let current_tip: u32 = DAEMON .get() .unwrap() .lock_anyhow()? .get_current_height()? .try_into()?; // Set CHAIN_TIP CHAIN_TIP.store(current_tip, std::sync::atomic::Ordering::SeqCst); let mut app_dir = PathBuf::from_str(&env::var("HOME")?)?; app_dir.push(config.data_dir); let mut wallet_file = app_dir.clone(); wallet_file.push(&config.wallet_name); let mut processes_file = app_dir.clone(); processes_file.push("processes"); let mut members_file = app_dir.clone(); members_file.push("members"); let wallet_file = StateFile::new(wallet_file); let processes_file = StateFile::new(processes_file); let members_file = StateFile::new(members_file); // load an existing sp_wallet, or create a new one let sp_wallet: SpWallet = match wallet_file.load() { Ok(wallet) => { // TODO: Verify the wallet is compatible with the current network serde_json::from_value(wallet)? } Err(_) => { // Create a new wallet file if it doesn't exist or fails to load wallet_file.create()?; let mut rng = thread_rng(); let new_client = SpClient::new( SecretKey::new(&mut rng), SpendKey::Secret(SecretKey::new(&mut rng)), config.network, ) .expect("Failed to create a new SpClient"); let mut sp_wallet = SpWallet::new(new_client); // Set birthday and update scan information sp_wallet.set_birthday(current_tip); sp_wallet.set_last_scan(current_tip); // Save the newly created wallet to disk let json = serde_json::to_value(sp_wallet.clone())?; wallet_file.save(&json)?; sp_wallet } }; let cached_processes: HashMap = match processes_file.load() { Ok(processes) => { let deserialized: OutPointProcessMap = serde_json::from_value(processes)?; deserialized.0 } Err(_) => { debug!("creating process file at {}", processes_file.path.display()); processes_file.create()?; HashMap::new() } }; let members: HashMap = match members_file.load() { Ok(members) => { let deserialized: OutPointMemberMap = serde_json::from_value(members)?; deserialized.0 } Err(_) => { debug!("creating members file at {}", members_file.path.display()); members_file.create()?; HashMap::new() } }; { let utxo_to_freeze: HashSet = cached_processes .iter() .map(|(_, process)| process.get_last_unspent_outpoint().unwrap()) .collect(); let mut freezed_utxos = lock_freezed_utxos()?; *freezed_utxos = utxo_to_freeze; } let our_sp_address = sp_wallet.get_sp_client().get_receiving_address(); log::info!("Using wallet with address {}", our_sp_address,); log::info!( "Found {} outputs for a total balance of {}", sp_wallet.get_outputs().len(), sp_wallet.get_balance() ); let last_scan = sp_wallet.get_last_scan(); WALLET .set(Mutex::new(sp_wallet)) .expect("Failed to initialize WALLET"); CACHEDPROCESSES .set(Mutex::new(cached_processes)) .expect("Failed to initialize CACHEDPROCESSES"); MEMBERLIST .set(Mutex::new(members)) .expect("Failed to initialize MEMBERLIST"); let storage = DiskStorage { wallet_file, processes_file, members_file, }; STORAGE.set(Mutex::new(storage)).unwrap(); let (sink, scan_rx, state_rx) = NativeUpdateSink::new(); init_update_sink(Arc::new(sink)); // Spawn the update handlers tokio::spawn(handle_scan_updates(scan_rx)); tokio::spawn(handle_state_updates(state_rx)); if last_scan < current_tip { log::info!("Scanning for our outputs"); scan_blocks(current_tip - last_scan, &config.blindbit_url).await?; } // Subscribe to Bitcoin Core let zmq_url = config.zmq_url.clone(); let blindbit_url = config.blindbit_url.clone(); tokio::spawn(async move { handle_zmq(zmq_url, blindbit_url).await; }); // Create the event loop and TCP listener we'll accept connections on. let try_socket = TcpListener::bind(config.ws_url).await; let listener = try_socket.expect("Failed to bind"); tokio::spawn(MessageCache::clean_up()); // Let's spawn the handling of each connection in a separate task. while let Ok((stream, addr)) = listener.accept().await { tokio::spawn(handle_connection(stream, addr, our_sp_address)); } Ok(()) }