use std::{ collections::HashMap, env, fmt::Debug, fs, net::SocketAddr, path::PathBuf, str::FromStr, sync::{Arc, Mutex, MutexGuard, OnceLock}, time::{Duration, Instant}, }; use bitcoincore_rpc::json::{self as bitcoin_json}; use futures_util::{future, pin_mut, stream::TryStreamExt, FutureExt, StreamExt}; use log::{debug, error, info, warn}; use sdk_common::sp_client::bitcoin::{ absolute::LockTime, consensus::{deserialize, serialize}, hex::{DisplayHex, FromHex}, key::TapTweak, script::PushBytesBuf, sighash::{Prevouts, SighashCache}, taproot::Signature, transaction::Version, Amount, Network, OutPoint, Psbt, ScriptBuf, TapSighashType, Transaction, TxIn, TxOut, Witness, XOnlyPublicKey, }; use sdk_common::sp_client::{ bitcoin::secp256k1::{ rand::{thread_rng, Rng}, Keypair, Message as Secp256k1Message, PublicKey, Secp256k1, ThirtyTwoByteHash, }, spclient::SpWallet, }; use sdk_common::{ error::AnkError, network::{AnkFlag, AnkNetworkMsg, FaucetMessage, NewTxMessage}, silentpayments::create_transaction_for_address_with_shared_secret, }; use sdk_common::sp_client::db::{JsonFile, Storage}; use sdk_common::sp_client::silentpayments::sending::{ generate_recipient_pubkeys, SilentPaymentAddress, }; use sdk_common::sp_client::silentpayments::utils::receiving::{ calculate_tweak_data, get_pubkey_from_input, }; use sdk_common::sp_client::silentpayments::utils::sending::calculate_partial_secret; use sdk_common::sp_client::spclient::{derive_keys_from_seed, Recipient, SpClient, SpendKey}; use tokio::sync::mpsc::{unbounded_channel, UnboundedSender}; use tokio::{ net::{TcpListener, TcpStream}, time, }; use tokio_stream::wrappers::UnboundedReceiverStream; use tokio_tungstenite::tungstenite::Message; use anyhow::{Error, Result}; use zeromq::{Socket, SocketRecv}; mod daemon; mod electrumclient; mod scan; use crate::{daemon::Daemon, scan::scan_blocks}; type Tx = UnboundedSender; type PeerMap = Arc>>; type SharedDaemon = Arc>; static MESSAGECACHE: OnceLock = OnceLock::new(); const MESSAGECACHEDURATION: Duration = Duration::from_secs(10); const MESSAGECACHEINTERVAL: Duration = Duration::from_secs(2); #[derive(Debug)] struct MessageCache { store: Mutex>, } impl MessageCache { fn new() -> Self { Self { store: Mutex::new(HashMap::new()), } } fn insert(&self, key: String) { let mut store = self.store.lock().unwrap(); store.insert(key.clone(), Instant::now()); } fn contains(&self, key: &str) -> bool { let store = self.store.lock().unwrap(); store.contains_key(key) } } async fn clean_up() { let cache = MESSAGECACHE.get().unwrap(); let mut interval = time::interval(MESSAGECACHEINTERVAL); loop { interval.tick().await; let mut store = cache.store.lock().unwrap(); let now = Instant::now(); let to_rm: Vec = store .iter() .filter_map(|(entry, entrytime)| { if let Some(duration) = now.checked_duration_since(*entrytime) { if duration > MESSAGECACHEDURATION { Some(entry.clone()) } else { None } } else { None } }) .collect(); for key in to_rm { store.remove(&key); } } } const FAUCET_AMT: Amount = Amount::from_sat(100_000); pub(crate) trait MutexExt { fn lock_anyhow(&self) -> Result, Error>; } impl MutexExt for Mutex { fn lock_anyhow(&self) -> Result, Error> { self.lock() .map_err(|e| Error::msg(format!("Failed to lock: {}", e))) } } #[derive(Debug)] struct SilentPaymentWallet { sp_wallet: Mutex, storage: Mutex, } impl SilentPaymentWallet { pub fn get_wallet(&self) -> Result> { self.sp_wallet.lock_anyhow() } pub fn save(&self) -> Result<()> { self.storage.lock_anyhow()?.save(&self.sp_wallet) } } enum BroadcastType { Sender(SocketAddr), #[allow(dead_code)] ExcludeSender(SocketAddr), #[allow(dead_code)] ToAll, } fn broadcast_message( peers: PeerMap, flag: AnkFlag, payload: String, broadcast: BroadcastType, ) -> Result<()> { let ank_msg = AnkNetworkMsg { flag, content: payload, }; let msg = Message::Text(serde_json::to_string(&ank_msg)?); log::debug!("Broadcasting message: {}", msg); match broadcast { BroadcastType::Sender(addr) => { peers .lock() .map_err(|e| Error::msg(format!("Failed to lock peers: {}", e.to_string())))? .iter() .find(|(peer_addr, _)| peer_addr == &&addr) .ok_or(Error::msg("Failed to find the sender in the peer_map"))? .1 .send(msg)?; } BroadcastType::ExcludeSender(addr) => { peers .lock() .map_err(|e| Error::msg(format!("Failed to lock peers: {}", e.to_string())))? .iter() .filter(|(peer_addr, _)| peer_addr != &&addr) .for_each(|(_, peer_tx)| { let _ = peer_tx.send(msg.clone()); }); } BroadcastType::ToAll => { peers .lock() .map_err(|e| Error::msg(format!("Failed to lock peers: {}", e.to_string())))? .iter() .for_each(|(_, peer_tx)| { let _ = peer_tx.send(msg.clone()); }); } } Ok(()) } fn spend_from_core( dest: XOnlyPublicKey, daemon: Arc>, ) -> Result<(Transaction, Amount)> { let core = daemon .lock() .map_err(|e| Error::msg(format!("Failed to lock daemon: {}", e.to_string())))?; let unspent_list: Vec = core.list_unspent_from_to(None)?; if !unspent_list.is_empty() { let network = core.get_network()?; let spk = ScriptBuf::new_p2tr_tweaked(dest.dangerous_assume_tweaked()); let new_psbt = core.create_psbt(&unspent_list, spk, network)?; let processed_psbt = core.process_psbt(new_psbt)?; let finalize_psbt_result = core.finalize_psbt(processed_psbt)?; let final_psbt = Psbt::from_str(&finalize_psbt_result)?; let total_fee = final_psbt.fee()?; let final_tx = final_psbt.extract_tx()?; let fee_rate = total_fee .checked_div(final_tx.weight().to_vbytes_ceil()) .unwrap(); Ok((final_tx, fee_rate)) } else { // we don't have enough available coins to pay for this faucet request Err(Error::msg("No spendable outputs")) } } fn faucet_send( sp_address: SilentPaymentAddress, commitment: &str, sp_wallet: Arc, shared_daemon: SharedDaemon, ) -> Result { let mut first_tx: Option = None; let final_tx: Transaction; // do we have a sp output available ? let available_outpoints = sp_wallet.get_wallet()?.get_outputs().to_spendable_list(); let available_amt = available_outpoints .iter() .fold(Amount::from_sat(0), |acc, (_, x)| acc + x.amount); // If we don't have at least 4 times the amount we need to send, we take some reserves out if available_amt > FAUCET_AMT.checked_mul(4).unwrap() { let mut total_amt = Amount::from_sat(0); let mut inputs = HashMap::new(); for (outpoint, output) in available_outpoints { total_amt += output.amount; inputs.insert(outpoint, output); if total_amt >= FAUCET_AMT { break; } } let recipient = Recipient { address: sp_address.into(), amount: FAUCET_AMT, nb_outputs: 1, }; let fee_estimate = shared_daemon .lock_anyhow()? .estimate_fee(6) .unwrap_or(Amount::from_sat(1000)) .checked_div(1000) .unwrap(); log::debug!("fee estimate for 6 blocks: {}", fee_estimate); let wallet = sp_wallet.get_wallet()?; let signed_psbt = create_transaction_for_address_with_shared_secret( recipient, &wallet, Some(commitment), fee_estimate, )?; let psbt = Psbt::from_str(&signed_psbt)?; final_tx = psbt.extract_tx()?; } else { // let's try to spend directly from the mining address let secp = Secp256k1::signing_only(); let keypair = Keypair::new(&secp, &mut thread_rng()); // we first spend from core to the pubkey we just created let (core_tx, fee_rate) = spend_from_core(keypair.x_only_public_key().0, shared_daemon.clone())?; // check that the first output of the transaction pays to the key we just created debug_assert!( core_tx.output[0].script_pubkey == ScriptBuf::new_p2tr_tweaked( keypair.x_only_public_key().0.dangerous_assume_tweaked() ) ); // This is ugly and can be streamlined // create a new transaction that spends the newly created UTXO to the sp_address let mut faucet_tx = Transaction { input: vec![TxIn { previous_output: OutPoint::new(core_tx.txid(), 0), ..Default::default() }], output: vec![], version: Version::TWO, lock_time: LockTime::ZERO, }; // now do the silent payment operations with the final recipient address let partial_secret = calculate_partial_secret( &[(keypair.secret_key(), true)], &[(core_tx.txid().to_string(), 0)], )?; let ext_output_key: XOnlyPublicKey = generate_recipient_pubkeys(vec![sp_address.into()], partial_secret)? .into_values() .flatten() .collect::>() .get(0) .expect("Failed to generate keys") .to_owned(); let change_sp_address = sp_wallet.get_wallet()?.get_client().get_receiving_address(); let change_output_key: XOnlyPublicKey = generate_recipient_pubkeys(vec![change_sp_address], partial_secret)? .into_values() .flatten() .collect::>() .get(0) .expect("Failed to generate keys") .to_owned(); let ext_spk = ScriptBuf::new_p2tr_tweaked(ext_output_key.dangerous_assume_tweaked()); let change_spk = ScriptBuf::new_p2tr_tweaked(change_output_key.dangerous_assume_tweaked()); let mut op_return = PushBytesBuf::new(); op_return.extend_from_slice(&Vec::from_hex(commitment)?)?; let data_spk = ScriptBuf::new_op_return(op_return); // Take some margin to pay for the fees if core_tx.output[0].value < FAUCET_AMT * 4 { return Err(Error::msg("Not enough funds")); } let change_amt = core_tx.output[0].value.checked_sub(FAUCET_AMT).unwrap(); faucet_tx.output.push(TxOut { value: FAUCET_AMT, script_pubkey: ext_spk, }); faucet_tx.output.push(TxOut { value: change_amt, script_pubkey: change_spk, }); faucet_tx.output.push(TxOut { value: Amount::from_sat(0), script_pubkey: data_spk, }); // dummy signature only used for fee estimation faucet_tx.input[0].witness.push([1; 64].to_vec()); let abs_fee = fee_rate .checked_mul(faucet_tx.weight().to_vbytes_ceil()) .ok_or_else(|| Error::msg("Fee rate multiplication overflowed"))?; // reset the witness to empty faucet_tx.input[0].witness = Witness::new(); faucet_tx.output[1].value -= abs_fee; let first_tx_outputs = vec![core_tx.output[0].clone()]; let prevouts = Prevouts::All(&first_tx_outputs); let hash_ty = TapSighashType::Default; let mut cache = SighashCache::new(&faucet_tx); let sighash = cache.taproot_key_spend_signature_hash(0, &prevouts, hash_ty)?; let msg = Secp256k1Message::from_digest(sighash.into_32()); let sig = secp.sign_schnorr_with_rng(&msg, &keypair, &mut thread_rng()); let final_sig = Signature { sig, hash_ty }; faucet_tx.input[0].witness.push(final_sig.to_vec()); first_tx = Some(core_tx); final_tx = faucet_tx; } if let Ok(daemon) = shared_daemon.lock() { // broadcast one or two transactions if first_tx.is_some() { daemon.broadcast(&first_tx.unwrap())?; } let txid = daemon.broadcast(&final_tx)?; debug!("Sent tx {}", txid); } else { return Err(Error::msg("Failed to lock daemon")); } Ok(final_tx) } fn handle_faucet_request( msg: &FaucetMessage, sp_wallet: Arc, shared_daemon: SharedDaemon, ) -> Result { let sp_address = SilentPaymentAddress::try_from(msg.sp_address.as_str())?; debug!("Sending bootstrap coins to {}", sp_address); // send bootstrap coins to this sp_address let tx = faucet_send( sp_address, &msg.commitment, sp_wallet.clone(), shared_daemon.clone(), )?; // get the tweak let partial_tweak = compute_partial_tweak_to_transaction(&tx, shared_daemon.clone())?; // get current blockheight let blkheight: u32 = shared_daemon .lock_anyhow()? .get_current_height()? .try_into()?; // update our sp_client with the change output(s) sp_wallet .get_wallet()? .update_wallet_with_transaction(&tx, blkheight, partial_tweak)?; debug!("{:?}", sp_wallet); debug!("updated the wallet"); // save to disk sp_wallet.save()?; debug!("saved the wallet"); Ok(NewTxMessage::new( serialize(&tx).to_lower_hex_string(), Some(partial_tweak.to_string()), )) } fn handle_new_tx_request(new_tx_msg: &mut NewTxMessage, shared_daemon: SharedDaemon) -> Result<()> { let tx = deserialize::(&Vec::from_hex(&new_tx_msg.transaction)?)?; let mempool_accept = shared_daemon.lock_anyhow()?.test_mempool_accept(&tx)?; if !mempool_accept.allowed { return Err(AnkError::NewTxError(mempool_accept.reject_reason.unwrap()))?; } if new_tx_msg.tweak_data.is_none() { // we add the tweak_data let partial_tweak = compute_partial_tweak_to_transaction(&tx, shared_daemon.clone())?; new_tx_msg.tweak_data = Some(partial_tweak.to_string()); } // we try to broadcast it shared_daemon.lock_anyhow()?.broadcast(&tx)?; Ok(()) } async fn handle_connection( peers: PeerMap, shared_daemon: SharedDaemon, sp_wallet: Arc, raw_stream: TcpStream, addr: SocketAddr, ) { debug!("Incoming TCP connection from: {}", addr); let ws_stream = tokio_tungstenite::accept_async(raw_stream) .await .expect("Error during the websocket handshake occurred"); debug!("WebSocket connection established"); // Insert the write part of this peer to the peer map. let (tx, rx) = unbounded_channel(); match peers.lock_anyhow() { Ok(mut peer_map) => peer_map.insert(addr, tx), Err(e) => { log::error!("{}", e); panic!(); } }; let (outgoing, incoming) = ws_stream.split(); let broadcast_incoming = incoming.try_for_each(|msg| { let peers = peers.clone(); if let Ok(raw_msg) = msg.to_text() { debug!("Received msg: {}", raw_msg); let cache = MESSAGECACHE.get().expect("Cache should be initialized"); if cache.contains(raw_msg) { debug!("Message already processed, dropping"); return future::ok(()); } else { cache.insert(raw_msg.to_owned()); } let parsed = serde_json::from_str::(raw_msg); match parsed { Ok(ank_msg) => match ank_msg.flag { AnkFlag::Faucet => { debug!("Received a faucet message"); if let Ok(mut content) = serde_json::from_str::(&ank_msg.content) { match handle_faucet_request( &content, sp_wallet.clone(), shared_daemon.clone(), ) { Ok(new_tx_msg) => { debug!( "Obtained new_tx_msg: {}", serde_json::to_string(&new_tx_msg).unwrap() ); } Err(e) => { log::error!("Failed to send faucet tx: {}", e); content.error = Some(e.into()); let payload = serde_json::to_string(&content) .expect("Message type shouldn't fail"); if let Err(e) = broadcast_message( peers.clone(), AnkFlag::Faucet, payload, BroadcastType::Sender(addr), ) { log::error!("Failed to broadcast message: {}", e); } } } } else { log::error!("Invalid content for faucet message"); } } AnkFlag::NewTx => { debug!("Received a new tx message"); if let Ok(mut new_tx_msg) = serde_json::from_str::(&ank_msg.content) { match handle_new_tx_request(&mut new_tx_msg, shared_daemon.clone()) { Ok(new_tx_msg) => { // Repeat the msg to all except sender if let Err(e) = broadcast_message( peers.clone(), AnkFlag::NewTx, serde_json::to_string(&new_tx_msg) .expect("This should not fail"), BroadcastType::ExcludeSender(addr), ) { log::error!("Failed to send message with error: {}", e); } } Err(e) => { log::error!("handle_new_tx_request returned error: {}", e); new_tx_msg.error = Some(e.into()); if let Err(e) = broadcast_message( peers.clone(), AnkFlag::NewTx, serde_json::to_string(&new_tx_msg) .expect("This shouldn't fail"), BroadcastType::Sender(addr), ) { log::error!("Failed to broadcast message: {}", e); } } } } else { log::error!("Invalid content for new_tx message"); } } AnkFlag::Cipher => { // For now we just send it to everyone debug!("Received a cipher message"); if let Err(e) = broadcast_message( peers.clone(), AnkFlag::Cipher, ank_msg.content, BroadcastType::ExcludeSender(addr), ) { log::error!("Failed to send message with error: {}", e); } } AnkFlag::Unknown => { debug!("Received an unknown message"); if let Err(e) = broadcast_message( peers.clone(), AnkFlag::Unknown, ank_msg.content, BroadcastType::ExcludeSender(addr), ) { log::error!("Failed to send message with error: {}", e); } } }, Err(_) => log::error!("Failed to parse network message"), } } else { // we don't care log::debug!("Received non-text message {} from peer {}", msg, addr); } future::ok(()) }); let receive_from_others = UnboundedReceiverStream::new(rx) .map(Ok) .forward(outgoing) .map(|result| { if let Err(e) = result { debug!("Error sending message: {}", e); } }); pin_mut!(broadcast_incoming, receive_from_others); future::select(broadcast_incoming, receive_from_others).await; debug!("{} disconnected", &addr); peers.lock().unwrap().remove(&addr); } fn compute_partial_tweak_to_transaction( tx: &Transaction, daemon: Arc>, ) -> Result { let mut outpoints: Vec<(String, u32)> = Vec::with_capacity(tx.input.len()); let mut pubkeys: Vec = Vec::with_capacity(tx.input.len()); for input in tx.input.iter() { outpoints.push(( input.previous_output.txid.to_string(), input.previous_output.vout, )); let prev_tx = daemon .lock_anyhow()? .get_transaction(&input.previous_output.txid, None) .map_err(|e| Error::msg(format!("Failed to find previous transaction: {}", e)))?; if let Some(output) = prev_tx.output.get(input.previous_output.vout as usize) { match get_pubkey_from_input( &input.script_sig.to_bytes(), &input.witness.to_vec(), &output.script_pubkey.to_bytes(), ) { Ok(Some(pubkey)) => pubkeys.push(pubkey), Ok(None) => continue, Err(e) => { return Err(Error::msg(format!( "Can't extract pubkey from input: {}", e ))) } } } else { return Err(Error::msg("Transaction with a non-existing input")); } } let input_pub_keys: Vec<&PublicKey> = pubkeys.iter().collect(); let partial_tweak = calculate_tweak_data(&input_pub_keys, &outpoints)?; Ok(partial_tweak) } fn create_new_tx_message(transaction: Vec, daemon: Arc>) -> Result { // debug!("Creating tx message"); let tx: Transaction = deserialize(&transaction)?; if tx.is_coinbase() { return Err(Error::msg("Can't process coinbase transaction")); } let partial_tweak = compute_partial_tweak_to_transaction(&tx, daemon)?; Ok(NewTxMessage::new( transaction.to_lower_hex_string(), Some(partial_tweak.to_string()), )) } async fn handle_zmq(peers: PeerMap, shared_daemon: SharedDaemon) { debug!("Starting listening on Core"); let mut socket = zeromq::SubSocket::new(); socket.connect("tcp://127.0.0.1:29100").await.unwrap(); socket.subscribe("rawtx").await.unwrap(); // socket.subscribe("hashblock"); loop { let core_msg = match socket.recv().await { Ok(m) => m, Err(e) => { error!("Zmq error: {}", e); continue; } }; debug!("Received a message"); let payload: String = if let (Some(topic), Some(data)) = (core_msg.get(0), core_msg.get(1)) { // debug!("topic: {}", std::str::from_utf8(&topic).unwrap()); match std::str::from_utf8(&topic) { Ok("rawtx") => match create_new_tx_message(data.to_vec(), shared_daemon.clone()) { Ok(m) => { debug!("Created message"); serde_json::to_string(&m).expect("This shouldn't fail") } Err(e) => { error!("{}", e); continue; } }, Ok("hashblock") => todo!(), _ => { error!("Unexpected message in zmq"); continue; } } } else { error!("Empty message"); continue; }; debug!("Broadcasting message {}", payload); if let Err(e) = broadcast_message(peers.clone(), AnkFlag::NewTx, payload, BroadcastType::ToAll) { log::error!("{}", e.to_string()); } } } #[tokio::main(flavor = "multi_thread")] async fn main() -> Result<()> { env_logger::init(); // This is rudimentary, if you change the network don't forget to change rpc_url either, we won't do that for you let rpc_url = env::args() .nth(1) .unwrap_or_else(|| "127.0.0.1:38332".to_owned()); let listening_addr = env::args() .nth(2) .unwrap_or_else(|| "127.0.0.1:8090".to_string()); let wallet_name = env::args().nth(3).unwrap_or_else(|| "default".to_owned()); let network_arg: String = env::args().nth(4).unwrap_or_else(|| "signet".to_owned()); let core_wallet: Option = env::args().nth(5); let network = Network::from_core_arg(&network_arg)?; if network == Network::Bitcoin { warn!("Running on mainnet, you're on your own"); } MESSAGECACHE .set(MessageCache::new()) .expect("Message Cache initialization failed"); tokio::spawn(clean_up()); let peers = PeerMap::new(Mutex::new(HashMap::new())); // Connect the rpc daemon let shared_daemon = Arc::new(Mutex::new(Daemon::connect(core_wallet, rpc_url, network)?)); let current_tip: u32 = shared_daemon .lock_anyhow()? .get_current_height()? .try_into()?; let mut config_dir = PathBuf::from_str(&env::var("HOME")?)?; config_dir.push(".4nk"); let sp_wallet_file = JsonFile::new(&config_dir, &PathBuf::from_str(&wallet_name)?); fs::create_dir_all(config_dir)?; // load an existing sp_wallet, or create a new one let is_testnet = if network == Network::Bitcoin { false } else { true }; let sp_wallet = match >::load(&sp_wallet_file) { Err(_) => { let mut seed = [0u8; 64]; thread_rng().fill(&mut seed); let (scan_sk, spend_sk) = derive_keys_from_seed(&seed, is_testnet) .expect("Couldn't generate a new sp_wallet"); let new_client = SpClient::new( wallet_name, scan_sk, SpendKey::Secret(spend_sk), None, is_testnet, ) .expect("Failed to create a new SpClient"); let mut wallet = SpWallet::new(new_client, None)?; // set birthday to avoid unnecessary scanning let outputs = wallet.get_mut_outputs(); outputs.set_birthday(current_tip); outputs.update_last_scan(current_tip); wallet } Ok(wallet) => wallet, }; log::info!( "Using wallet {} with address {}", sp_wallet.get_client().label, sp_wallet.get_client().get_receiving_address() ); log::info!( "Found {} outputs for a total balance of {}", sp_wallet.get_outputs().to_spendable_list().len(), sp_wallet.get_outputs().get_balance() ); let last_scan = sp_wallet.get_outputs().get_last_scan(); let shared_sp_wallet = Mutex::new(sp_wallet); let shared_wallet_storage = Mutex::new(sp_wallet_file); let sp_wallet = Arc::new(SilentPaymentWallet { sp_wallet: shared_sp_wallet, storage: shared_wallet_storage, }); sp_wallet.save()?; if last_scan < current_tip { log::info!("Scanning for our outputs"); scan_blocks( shared_daemon.clone(), sp_wallet.clone(), current_tip - last_scan, )?; } // Subscribe to Bitcoin Core tokio::spawn(handle_zmq(peers.clone(), shared_daemon.clone())); // Create the event loop and TCP listener we'll accept connections on. let try_socket = TcpListener::bind(&listening_addr).await; let listener = try_socket.expect("Failed to bind"); debug!("Listening on: {}", listening_addr); // Let's spawn the handling of each connection in a separate task. while let Ok((stream, addr)) = listener.accept().await { tokio::spawn(handle_connection( peers.clone(), shared_daemon.clone(), sp_wallet.clone(), stream, addr, )); } Ok(()) }