sdk_relay/src/main.rs
Nicolas Cantu 3b636cef91 feat: Ajout du support des relais externes via external_nodes.conf
- Ajout de la fonction load_external_config() pour charger la configuration externe
- Ajout de la fonction parse_external_config() pour parser le fichier TOML
- Modification de discover_relays() pour inclure les relais externes
- Support des relais avec ancienne version (0.9.0) et capacités limitées
- Ajout du fichier EXEMPLES_PRATIQUES.md avec exemples d'utilisation
- Mise à jour de la documentation technique
2025-08-27 14:00:53 +02:00

649 lines
20 KiB
Rust

use std::{
collections::{HashMap, HashSet},
env,
fmt::Debug,
fs,
io::{Read, Write},
net::SocketAddr,
path::PathBuf,
str::FromStr,
sync::{atomic::AtomicU32, Arc, Mutex, MutexGuard, OnceLock},
};
use bitcoincore_rpc::{
bitcoin::secp256k1::SecretKey,
json::{self as bitcoin_json},
};
use commit::{lock_members, MEMBERLIST};
use futures_util::{future, pin_mut, stream::TryStreamExt, FutureExt, StreamExt};
use log::{debug, error, warn};
use message::{broadcast_message, process_message, BroadcastType, MessageCache, MESSAGECACHE};
use scan::{check_transaction_alone, compute_partial_tweak_to_transaction};
use sdk_common::network::{AnkFlag, NewTxMessage};
use sdk_common::{
network::HandshakeMessage,
pcd::Member,
process::{lock_processes, Process, CACHEDPROCESSES},
serialization::{OutPointMemberMap, OutPointProcessMap},
silentpayments::SpWallet,
sp_client::{
bitcoin::{
consensus::deserialize,
hex::{DisplayHex, FromHex},
Amount, Network, Transaction,
},
silentpayments::SilentPaymentAddress,
},
MutexExt,
};
use sdk_common::{
sp_client::{
bitcoin::{secp256k1::rand::thread_rng, OutPoint},
SpClient, SpendKey,
},
updates::{init_update_sink, NativeUpdateSink, StateUpdate},
};
use serde_json::Value;
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::mpsc::{unbounded_channel, UnboundedSender};
use tokio_stream::wrappers::UnboundedReceiverStream;
use tokio_tungstenite::tungstenite::Message;
use anyhow::{Error, Result};
use zeromq::{Socket, SocketRecv};
mod commit;
mod config;
mod daemon;
mod faucet;
mod message;
mod scan;
mod sync;
use crate::config::Config;
use crate::{
daemon::{Daemon, RpcCall},
scan::scan_blocks,
sync::{get_sync_manager, SYNC_MANAGER, start_sync_test_loop},
};
pub const WITH_CUTTHROUGH: bool = false; // We'd rather catch everything for this use case
type Tx = UnboundedSender<Message>;
type PeerMap = Mutex<HashMap<SocketAddr, Tx>>;
pub(crate) static PEERMAP: OnceLock<PeerMap> = OnceLock::new();
pub(crate) static DAEMON: OnceLock<Mutex<Box<dyn RpcCall>>> = OnceLock::new();
static CHAIN_TIP: AtomicU32 = AtomicU32::new(0);
pub static FREEZED_UTXOS: OnceLock<Mutex<HashSet<OutPoint>>> = OnceLock::new();
pub fn lock_freezed_utxos() -> Result<MutexGuard<'static, HashSet<OutPoint>>, Error> {
FREEZED_UTXOS
.get_or_init(|| Mutex::new(HashSet::new()))
.lock_anyhow()
}
#[derive(Debug)]
pub struct StateFile {
path: PathBuf,
}
impl StateFile {
fn new(path: PathBuf) -> Self {
Self { path }
}
fn create(&self) -> Result<()> {
let parent: PathBuf;
if let Some(dir) = self.path.parent() {
if !dir.ends_with(".4nk") {
return Err(Error::msg("parent dir must be \".4nk\""));
}
parent = dir.to_path_buf();
} else {
return Err(Error::msg("wallet file has no parent dir"));
}
// Ensure the parent directory exists
if !parent.exists() {
fs::create_dir_all(parent)?;
}
// Create a new file
fs::File::create(&self.path)?;
Ok(())
}
fn save(&self, json: &Value) -> Result<()> {
let mut f = fs::File::options()
.write(true)
.truncate(true)
.open(&self.path)?;
let stringified = serde_json::to_string(&json)?;
let bin = stringified.as_bytes();
f.write_all(bin)?;
Ok(())
}
fn load(&self) -> Result<Value> {
let mut f = fs::File::open(&self.path)?;
let mut content = vec![];
f.read_to_end(&mut content)?;
let res: Value = serde_json::from_slice(&content)?;
Ok(res)
}
}
#[derive(Debug)]
pub struct DiskStorage {
pub wallet_file: StateFile,
pub processes_file: StateFile,
pub members_file: StateFile,
}
pub static STORAGE: OnceLock<Mutex<DiskStorage>> = OnceLock::new();
const FAUCET_AMT: Amount = Amount::from_sat(10_000);
pub(crate) static WALLET: OnceLock<Mutex<SpWallet>> = OnceLock::new();
fn handle_new_tx_request(new_tx_msg: &NewTxMessage) -> Result<()> {
let tx = deserialize::<Transaction>(&Vec::from_hex(&new_tx_msg.transaction)?)?;
let daemon = DAEMON.get().unwrap().lock_anyhow()?;
daemon.test_mempool_accept(&tx)?;
daemon.broadcast(&tx)?;
Ok(())
}
async fn handle_connection(
raw_stream: TcpStream,
addr: SocketAddr,
our_sp_address: SilentPaymentAddress,
) {
debug!("Incoming TCP connection from: {}", addr);
let peers = PEERMAP.get().expect("Peer Map not initialized");
let ws_stream = match tokio_tungstenite::accept_async(raw_stream).await {
Ok(stream) => {
debug!("WebSocket connection established");
stream
}
Err(e) => {
log::error!("WebSocket handshake failed for {}: {}", addr, e);
return;
}
};
// Insert the write part of this peer to the peer map.
let (tx, rx) = unbounded_channel();
match peers.lock_anyhow() {
Ok(mut peer_map) => peer_map.insert(addr, tx),
Err(e) => {
log::error!("{}", e);
panic!();
}
};
let processes = lock_processes().unwrap().clone();
let members = lock_members().unwrap().clone();
let current_tip = CHAIN_TIP.load(std::sync::atomic::Ordering::SeqCst);
let init_msg = HandshakeMessage::new(
our_sp_address.to_string(),
OutPointMemberMap(members),
OutPointProcessMap(processes),
current_tip.into(),
);
if let Err(e) = broadcast_message(
AnkFlag::Handshake,
format!("{}", init_msg.to_string()),
BroadcastType::Sender(addr),
) {
log::error!("Failed to send init message: {}", e);
return;
}
let (outgoing, incoming) = ws_stream.split();
let broadcast_incoming = incoming.try_for_each(|msg| {
if let Ok(raw_msg) = msg.to_text() {
// debug!("Received msg: {}", raw_msg);
process_message(raw_msg, addr);
} else {
debug!("Received non-text message {} from peer {}", msg, addr);
}
future::ok(())
});
let receive_from_others = UnboundedReceiverStream::new(rx)
.map(Ok)
.forward(outgoing)
.map(|result| {
if let Err(e) = result {
debug!("Error sending message: {}", e);
}
});
pin_mut!(broadcast_incoming, receive_from_others);
future::select(broadcast_incoming, receive_from_others).await;
debug!("{} disconnected", &addr);
peers.lock().unwrap().remove(&addr);
}
fn create_new_tx_message(transaction: Vec<u8>) -> Result<NewTxMessage> {
let tx: Transaction = deserialize(&transaction)?;
if tx.is_coinbase() {
return Err(Error::msg("Can't process coinbase transaction"));
}
let partial_tweak = compute_partial_tweak_to_transaction(&tx)?;
let sp_wallet = WALLET
.get()
.ok_or_else(|| Error::msg("Wallet not initialized"))?
.lock_anyhow()?;
check_transaction_alone(sp_wallet, &tx, &partial_tweak)?;
Ok(NewTxMessage::new(
transaction.to_lower_hex_string(),
Some(partial_tweak.to_string()),
))
}
async fn handle_scan_updates(
scan_rx: std::sync::mpsc::Receiver<sdk_common::updates::ScanProgress>,
) {
while let Ok(update) = scan_rx.recv() {
log::debug!("Received scan update: {:?}", update);
}
}
async fn handle_state_updates(
state_rx: std::sync::mpsc::Receiver<sdk_common::updates::StateUpdate>,
) {
while let Ok(update) = state_rx.recv() {
match update {
StateUpdate::Update {
blkheight,
blkhash,
found_outputs,
found_inputs,
} => {
// We update the wallet with found outputs and inputs
let mut sp_wallet = WALLET.get().unwrap().lock_anyhow().unwrap();
// inputs first
for outpoint in found_inputs {
sp_wallet.mark_output_mined(&outpoint, blkhash);
}
sp_wallet.get_mut_outputs().extend(found_outputs);
sp_wallet.set_last_scan(blkheight.to_consensus_u32());
let json = serde_json::to_value(sp_wallet.clone()).unwrap();
STORAGE
.get()
.unwrap()
.lock_anyhow()
.unwrap()
.wallet_file
.save(&json)
.unwrap();
}
StateUpdate::NoUpdate { blkheight } => {
// We just keep the current height to update the last_scan
debug!("No update, setting last scan at {}", blkheight);
let mut sp_wallet = WALLET.get().unwrap().lock_anyhow().unwrap();
sp_wallet.set_last_scan(blkheight.to_consensus_u32());
let json = serde_json::to_value(sp_wallet.clone()).unwrap();
STORAGE
.get()
.unwrap()
.lock_anyhow()
.unwrap()
.wallet_file
.save(&json)
.unwrap();
}
}
}
}
async fn handle_zmq(zmq_url: String, blindbit_url: String) {
debug!("Starting listening on Core");
let mut socket = zeromq::SubSocket::new();
socket.connect(&zmq_url).await.unwrap();
socket.subscribe("rawtx").await.unwrap();
socket.subscribe("hashblock").await.unwrap();
loop {
let core_msg = match socket.recv().await {
Ok(m) => m,
Err(e) => {
error!("Zmq error: {}", e);
continue;
}
};
debug!("Received a message");
let payload: String = if let (Some(topic), Some(data)) = (core_msg.get(0), core_msg.get(1))
{
debug!("topic: {}", std::str::from_utf8(&topic).unwrap());
match std::str::from_utf8(&topic) {
Ok("rawtx") => match create_new_tx_message(data.to_vec()) {
Ok(m) => {
debug!("Created message");
serde_json::to_string(&m).expect("This shouldn't fail")
}
Err(e) => {
error!("{}", e);
continue;
}
},
Ok("hashblock") => {
let current_height = DAEMON
.get()
.unwrap()
.lock_anyhow()
.unwrap()
.get_current_height()
.unwrap();
CHAIN_TIP.store(current_height as u32, std::sync::atomic::Ordering::SeqCst);
// Add retry logic for hashblock processing
let mut retry_count = 0;
const MAX_RETRIES: u32 = 4;
const RETRY_DELAY_MS: u64 = 1000; // 1 second initial delay
loop {
match scan_blocks(0, &blindbit_url).await {
Ok(_) => {
debug!("Successfully scanned blocks after {} retries", retry_count);
break;
}
Err(e) => {
retry_count += 1;
if retry_count >= MAX_RETRIES {
error!(
"Failed to scan blocks after {} retries: {}",
MAX_RETRIES, e
);
break;
}
// Exponential backoff: 1s, 2s, 4s
let delay_ms = RETRY_DELAY_MS * (1 << (retry_count - 1));
warn!(
"Scan failed (attempt {}/{}), retrying in {}ms: {}",
retry_count, MAX_RETRIES, delay_ms, e
);
tokio::time::sleep(tokio::time::Duration::from_millis(delay_ms))
.await;
}
}
}
continue;
}
_ => {
error!("Unexpected message in zmq");
continue;
}
}
} else {
error!("Empty message");
continue;
};
if let Err(e) = broadcast_message(AnkFlag::NewTx, payload, BroadcastType::ToAll) {
log::error!("{}", e.to_string());
}
}
}
#[tokio::main(flavor = "multi_thread")]
async fn main() -> Result<()> {
env_logger::init();
// todo: take the path to conf file as argument
// default to "./.conf"
let config = Config::read_from_file(".conf")?;
if config.network == Network::Bitcoin {
warn!("Running on mainnet, you're on your own");
}
MESSAGECACHE
.set(MessageCache::new())
.expect("Message Cache initialization failed");
PEERMAP
.set(PeerMap::new(HashMap::new()))
.expect("PeerMap initialization failed");
// Connect the rpc daemon with retry logic
let mut retry_count = 0;
const MAX_RETRIES: u32 = 5;
const RETRY_DELAY_MS: u64 = 2000; // 2 seconds initial delay
let daemon = loop {
let cookie_path = config.cookie_path.as_ref().map(|p| PathBuf::from(p));
match Daemon::connect(
config.core_wallet.clone(),
config.core_url.clone(),
config.network,
cookie_path,
) {
Ok(daemon) => break daemon,
Err(e) => {
retry_count += 1;
if retry_count >= MAX_RETRIES {
return Err(e.context("Failed to connect to Bitcoin Core after multiple attempts"));
}
log::warn!("Failed to connect to Bitcoin Core (attempt {}/{}): {}", retry_count, MAX_RETRIES, e);
std::thread::sleep(std::time::Duration::from_millis(RETRY_DELAY_MS * retry_count as u64));
}
}
};
DAEMON
.set(Mutex::new(Box::new(daemon)))
.expect("DAEMON initialization failed");
let current_tip: u32 = DAEMON
.get()
.unwrap()
.lock_anyhow()?
.get_current_height()?
.try_into()?;
// Set CHAIN_TIP
CHAIN_TIP.store(current_tip, std::sync::atomic::Ordering::SeqCst);
let mut app_dir = PathBuf::from_str(&env::var("HOME")?)?;
app_dir.push(config.data_dir);
let mut wallet_file = app_dir.clone();
wallet_file.push(&config.wallet_name);
let mut processes_file = app_dir.clone();
processes_file.push("processes");
let mut members_file = app_dir.clone();
members_file.push("members");
let wallet_file = StateFile::new(wallet_file);
let processes_file = StateFile::new(processes_file);
let members_file = StateFile::new(members_file);
// load an existing sp_wallet, or create a new one
let sp_wallet: SpWallet = match wallet_file.load() {
Ok(wallet) => {
// TODO: Verify the wallet is compatible with the current network
serde_json::from_value(wallet)?
}
Err(_) => {
// Create a new wallet file if it doesn't exist or fails to load
wallet_file.create()?;
let mut rng = thread_rng();
let new_client = SpClient::new(
SecretKey::new(&mut rng),
SpendKey::Secret(SecretKey::new(&mut rng)),
config.network,
)
.expect("Failed to create a new SpClient");
let mut sp_wallet = SpWallet::new(new_client);
// Set birthday and update scan information
sp_wallet.set_birthday(current_tip);
sp_wallet.set_last_scan(current_tip);
// Save the newly created wallet to disk
let json = serde_json::to_value(sp_wallet.clone())?;
wallet_file.save(&json)?;
sp_wallet
}
};
let cached_processes: HashMap<OutPoint, Process> = match processes_file.load() {
Ok(processes) => {
let deserialized: OutPointProcessMap = serde_json::from_value(processes)?;
deserialized.0
}
Err(_) => {
debug!("creating process file at {}", processes_file.path.display());
processes_file.create()?;
HashMap::new()
}
};
let members: HashMap<OutPoint, Member> = match members_file.load() {
Ok(members) => {
let deserialized: OutPointMemberMap = serde_json::from_value(members)?;
deserialized.0
}
Err(_) => {
debug!("creating members file at {}", members_file.path.display());
members_file.create()?;
HashMap::new()
}
};
{
let utxo_to_freeze: HashSet<OutPoint> = cached_processes
.iter()
.map(|(_, process)| process.get_last_unspent_outpoint().unwrap())
.collect();
let mut freezed_utxos = lock_freezed_utxos()?;
*freezed_utxos = utxo_to_freeze;
}
let our_sp_address = sp_wallet.get_sp_client().get_receiving_address();
log::info!("Using wallet with address {}", our_sp_address,);
log::info!(
"Found {} outputs for a total balance of {}",
sp_wallet.get_outputs().len(),
sp_wallet.get_balance()
);
let last_scan = sp_wallet.get_last_scan();
WALLET
.set(Mutex::new(sp_wallet))
.expect("Failed to initialize WALLET");
CACHEDPROCESSES
.set(Mutex::new(cached_processes))
.expect("Failed to initialize CACHEDPROCESSES");
MEMBERLIST
.set(Mutex::new(members))
.expect("Failed to initialize MEMBERLIST");
let storage = DiskStorage {
wallet_file,
processes_file,
members_file,
};
STORAGE.set(Mutex::new(storage)).unwrap();
let (sink, scan_rx, state_rx) = NativeUpdateSink::new();
init_update_sink(Arc::new(sink));
// Spawn the update handlers
tokio::spawn(handle_scan_updates(scan_rx));
tokio::spawn(handle_state_updates(state_rx));
if last_scan < current_tip {
log::info!("Scanning for our outputs");
scan_blocks(current_tip - last_scan, &config.blindbit_url).await?;
}
// Subscribe to Bitcoin Core
let zmq_url = config.zmq_url.clone();
let blindbit_url = config.blindbit_url.clone();
tokio::spawn(async move {
handle_zmq(zmq_url, blindbit_url).await;
});
// Create the event loop and TCP listener we'll accept connections on.
let try_socket = TcpListener::bind(config.ws_url).await;
let listener = try_socket.expect("Failed to bind");
tokio::spawn(MessageCache::clean_up());
// Initialize the sync manager
let sync_manager = sync::SyncManager::new();
SYNC_MANAGER.set(sync_manager).unwrap();
// Start the sync manager cleanup task
let sync_manager = get_sync_manager();
tokio::spawn(sync_manager.cleanup_cache());
// Start the periodic sync task
let sync_manager = get_sync_manager();
tokio::spawn(sync_manager.start_periodic_sync());
// Découverte automatique des relais
let sync_manager = get_sync_manager();
tokio::spawn(async move {
// Attendre un peu avant de commencer la découverte
tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
if let Err(e) = sync_manager.discover_relays().await {
log::error!("Erreur lors de la découverte des relais: {}", e);
}
});
// Start the sync test loop (optionnel, pour démonstration)
if std::env::var("ENABLE_SYNC_TEST").is_ok() {
tokio::spawn(start_sync_test_loop());
}
// Let's spawn the handling of each connection in a separate task.
while let Ok((stream, addr)) = listener.accept().await {
tokio::spawn(handle_connection(stream, addr, our_sp_address));
}
Ok(())
}