
All checks were successful
build-and-push-ext / build_push (push) Successful in 1m25s
- Ajout de la gestion des fichiers vides dans StateFile::load() - Résolution de l'erreur 'invalid type: sequence, expected a map' - sdk_relay peut maintenant démarrer avec des données vides
719 lines
23 KiB
Rust
719 lines
23 KiB
Rust
use std::{
|
|
collections::{HashMap, HashSet},
|
|
env,
|
|
fmt::Debug,
|
|
fs,
|
|
io::{Read, Write},
|
|
net::SocketAddr,
|
|
path::PathBuf,
|
|
str::FromStr,
|
|
sync::{atomic::AtomicU32, Arc, Mutex, MutexGuard, OnceLock},
|
|
};
|
|
|
|
use bitcoincore_rpc::{
|
|
bitcoin::secp256k1::SecretKey,
|
|
json::{self as bitcoin_json},
|
|
};
|
|
use commit::{lock_members, MEMBERLIST};
|
|
use futures_util::{future, pin_mut, stream::TryStreamExt, FutureExt, StreamExt, SinkExt};
|
|
use log::{debug, error, warn};
|
|
use message::{broadcast_message, process_message, BroadcastType, MessageCache, MESSAGECACHE};
|
|
use scan::{check_transaction_alone, compute_partial_tweak_to_transaction};
|
|
use sdk_common::network::{AnkFlag, NewTxMessage};
|
|
use sdk_common::{
|
|
network::HandshakeMessage,
|
|
pcd::Member,
|
|
process::{lock_processes, Process, CACHEDPROCESSES},
|
|
serialization::{OutPointMemberMap, OutPointProcessMap},
|
|
silentpayments::SpWallet,
|
|
sp_client::{
|
|
bitcoin::{
|
|
consensus::deserialize,
|
|
hex::{DisplayHex, FromHex},
|
|
Amount, Network, Transaction,
|
|
},
|
|
silentpayments::SilentPaymentAddress,
|
|
},
|
|
MutexExt,
|
|
};
|
|
use sdk_common::{
|
|
sp_client::{
|
|
bitcoin::{secp256k1::rand::thread_rng, OutPoint},
|
|
SpClient, SpendKey,
|
|
},
|
|
updates::{init_update_sink, NativeUpdateSink, StateUpdate},
|
|
};
|
|
|
|
use serde_json::Value;
|
|
use tokio::net::{TcpListener, TcpStream};
|
|
use tokio::sync::mpsc::{unbounded_channel, UnboundedSender};
|
|
use tokio_stream::wrappers::UnboundedReceiverStream;
|
|
use tokio_tungstenite::tungstenite::Message;
|
|
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
|
|
|
use anyhow::{Error, Result};
|
|
use zeromq::{Socket, SocketRecv};
|
|
|
|
mod commit;
|
|
mod config;
|
|
mod daemon;
|
|
mod faucet;
|
|
mod message;
|
|
mod scan;
|
|
mod peers;
|
|
mod sync;
|
|
|
|
use crate::config::Config;
|
|
use crate::peers as peer_store;
|
|
use crate::{
|
|
daemon::{Daemon, RpcCall},
|
|
scan::scan_blocks,
|
|
};
|
|
|
|
pub const WITH_CUTTHROUGH: bool = false; // We'd rather catch everything for this use case
|
|
|
|
type Tx = UnboundedSender<Message>;
|
|
|
|
type PeerMap = Mutex<HashMap<SocketAddr, Tx>>;
|
|
|
|
pub(crate) static PEERMAP: OnceLock<PeerMap> = OnceLock::new();
|
|
|
|
pub(crate) static DAEMON: OnceLock<Mutex<Box<dyn RpcCall>>> = OnceLock::new();
|
|
|
|
static CHAIN_TIP: AtomicU32 = AtomicU32::new(0);
|
|
|
|
pub static FREEZED_UTXOS: OnceLock<Mutex<HashSet<OutPoint>>> = OnceLock::new();
|
|
|
|
pub fn lock_freezed_utxos() -> Result<MutexGuard<'static, HashSet<OutPoint>>, Error> {
|
|
FREEZED_UTXOS
|
|
.get_or_init(|| Mutex::new(HashSet::new()))
|
|
.lock_anyhow()
|
|
}
|
|
|
|
#[derive(Debug)]
|
|
pub struct StateFile {
|
|
path: PathBuf,
|
|
}
|
|
|
|
impl StateFile {
|
|
fn new(path: PathBuf) -> Self {
|
|
Self { path }
|
|
}
|
|
|
|
fn create(&self) -> Result<()> {
|
|
let parent: PathBuf;
|
|
if let Some(dir) = self.path.parent() {
|
|
if !dir.ends_with(".4nk") {
|
|
return Err(Error::msg("parent dir must be \".4nk\""));
|
|
}
|
|
parent = dir.to_path_buf();
|
|
} else {
|
|
return Err(Error::msg("wallet file has no parent dir"));
|
|
}
|
|
|
|
// Ensure the parent directory exists
|
|
if !parent.exists() {
|
|
fs::create_dir_all(parent)?;
|
|
}
|
|
|
|
// Create a new file
|
|
fs::File::create(&self.path)?;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
fn save(&self, json: &Value) -> Result<()> {
|
|
let mut f = fs::File::options()
|
|
.write(true)
|
|
.truncate(true)
|
|
.open(&self.path)?;
|
|
|
|
let stringified = serde_json::to_string(&json)?;
|
|
let bin = stringified.as_bytes();
|
|
f.write_all(bin)?;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
fn load(&self) -> Result<Value> {
|
|
let mut f = fs::File::open(&self.path)?;
|
|
|
|
let mut content = vec![];
|
|
f.read_to_end(&mut content)?;
|
|
|
|
// Handle empty files or invalid JSON gracefully
|
|
if content.is_empty() {
|
|
return Ok(serde_json::Value::Object(serde_json::Map::new()));
|
|
}
|
|
|
|
let res: Value = serde_json::from_slice(&content)?;
|
|
|
|
Ok(res)
|
|
}
|
|
}
|
|
|
|
#[derive(Debug)]
|
|
pub struct DiskStorage {
|
|
pub wallet_file: StateFile,
|
|
pub processes_file: StateFile,
|
|
pub members_file: StateFile,
|
|
}
|
|
|
|
pub static STORAGE: OnceLock<Mutex<DiskStorage>> = OnceLock::new();
|
|
|
|
const FAUCET_AMT: Amount = Amount::from_sat(10_000);
|
|
|
|
pub(crate) static WALLET: OnceLock<Mutex<SpWallet>> = OnceLock::new();
|
|
|
|
fn handle_new_tx_request(new_tx_msg: &NewTxMessage) -> Result<()> {
|
|
let tx = deserialize::<Transaction>(&Vec::from_hex(&new_tx_msg.transaction)?)?;
|
|
|
|
let daemon = DAEMON.get().unwrap().lock_anyhow()?;
|
|
daemon.test_mempool_accept(&tx)?;
|
|
daemon.broadcast(&tx)?;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
async fn handle_connection(
|
|
raw_stream: TcpStream,
|
|
addr: SocketAddr,
|
|
our_sp_address: SilentPaymentAddress,
|
|
) {
|
|
debug!("Incoming TCP connection from: {}", addr);
|
|
|
|
let peers = PEERMAP.get().expect("Peer Map not initialized");
|
|
|
|
let ws_stream = match tokio_tungstenite::accept_async(raw_stream).await {
|
|
Ok(stream) => {
|
|
debug!("WebSocket connection established");
|
|
stream
|
|
}
|
|
Err(e) => {
|
|
log::warn!("WebSocket handshake failed for {}: {} - This may be a non-WebSocket connection attempt", addr, e);
|
|
// Don't return immediately, try to handle gracefully
|
|
return;
|
|
}
|
|
};
|
|
|
|
// Insert the write part of this peer to the peer map.
|
|
let (tx, rx) = unbounded_channel();
|
|
match peers.lock_anyhow() {
|
|
Ok(mut peer_map) => peer_map.insert(addr, tx),
|
|
Err(e) => {
|
|
log::error!("{}", e);
|
|
panic!();
|
|
}
|
|
};
|
|
|
|
let processes = lock_processes().unwrap().clone();
|
|
let members = lock_members().unwrap().clone();
|
|
let current_tip = CHAIN_TIP.load(std::sync::atomic::Ordering::SeqCst);
|
|
|
|
let init_msg = HandshakeMessage::new(
|
|
our_sp_address.to_string(),
|
|
OutPointMemberMap(members),
|
|
OutPointProcessMap(processes),
|
|
current_tip.into(),
|
|
);
|
|
|
|
if let Err(e) = broadcast_message(
|
|
AnkFlag::Handshake,
|
|
format!("{}", init_msg.to_string()),
|
|
BroadcastType::Sender(addr),
|
|
) {
|
|
log::error!("Failed to send init message: {}", e);
|
|
return;
|
|
}
|
|
|
|
let (outgoing, incoming) = ws_stream.split();
|
|
|
|
let broadcast_incoming = incoming.try_for_each(|msg| {
|
|
if let Ok(raw_msg) = msg.to_text() {
|
|
// debug!("Received msg: {}", raw_msg);
|
|
process_message(raw_msg, addr);
|
|
} else {
|
|
debug!("Received non-text message {} from peer {}", msg, addr);
|
|
}
|
|
future::ok(())
|
|
});
|
|
|
|
let receive_from_others = UnboundedReceiverStream::new(rx)
|
|
.map(Ok)
|
|
.forward(outgoing)
|
|
.map(|result| {
|
|
if let Err(e) = result {
|
|
debug!("Error sending message: {}", e);
|
|
}
|
|
});
|
|
|
|
pin_mut!(broadcast_incoming, receive_from_others);
|
|
future::select(broadcast_incoming, receive_from_others).await;
|
|
|
|
debug!("{} disconnected", &addr);
|
|
peers.lock().unwrap().remove(&addr);
|
|
}
|
|
|
|
fn create_new_tx_message(transaction: Vec<u8>) -> Result<NewTxMessage> {
|
|
let tx: Transaction = deserialize(&transaction)?;
|
|
|
|
if tx.is_coinbase() {
|
|
return Err(Error::msg("Can't process coinbase transaction"));
|
|
}
|
|
|
|
let partial_tweak = compute_partial_tweak_to_transaction(&tx)?;
|
|
|
|
let sp_wallet = WALLET
|
|
.get()
|
|
.ok_or_else(|| Error::msg("Wallet not initialized"))?
|
|
.lock_anyhow()?;
|
|
check_transaction_alone(sp_wallet, &tx, &partial_tweak)?;
|
|
|
|
Ok(NewTxMessage::new(
|
|
transaction.to_lower_hex_string(),
|
|
Some(partial_tweak.to_string()),
|
|
))
|
|
}
|
|
|
|
async fn handle_scan_updates(
|
|
scan_rx: std::sync::mpsc::Receiver<sdk_common::updates::ScanProgress>,
|
|
) {
|
|
while let Ok(update) = scan_rx.recv() {
|
|
log::debug!("Received scan update: {:?}", update);
|
|
}
|
|
}
|
|
|
|
async fn handle_state_updates(
|
|
state_rx: std::sync::mpsc::Receiver<sdk_common::updates::StateUpdate>,
|
|
) {
|
|
while let Ok(update) = state_rx.recv() {
|
|
match update {
|
|
StateUpdate::Update {
|
|
blkheight,
|
|
blkhash,
|
|
found_outputs,
|
|
found_inputs,
|
|
} => {
|
|
// We update the wallet with found outputs and inputs
|
|
let mut sp_wallet = WALLET.get().unwrap().lock_anyhow().unwrap();
|
|
// inputs first
|
|
for outpoint in found_inputs {
|
|
sp_wallet.mark_output_mined(&outpoint, blkhash);
|
|
}
|
|
sp_wallet.get_mut_outputs().extend(found_outputs);
|
|
sp_wallet.set_last_scan(blkheight.to_consensus_u32());
|
|
let json = serde_json::to_value(sp_wallet.clone()).unwrap();
|
|
STORAGE
|
|
.get()
|
|
.unwrap()
|
|
.lock_anyhow()
|
|
.unwrap()
|
|
.wallet_file
|
|
.save(&json)
|
|
.unwrap();
|
|
}
|
|
StateUpdate::NoUpdate { blkheight } => {
|
|
// We just keep the current height to update the last_scan
|
|
debug!("No update, setting last scan at {}", blkheight);
|
|
let mut sp_wallet = WALLET.get().unwrap().lock_anyhow().unwrap();
|
|
sp_wallet.set_last_scan(blkheight.to_consensus_u32());
|
|
let json = serde_json::to_value(sp_wallet.clone()).unwrap();
|
|
STORAGE
|
|
.get()
|
|
.unwrap()
|
|
.lock_anyhow()
|
|
.unwrap()
|
|
.wallet_file
|
|
.save(&json)
|
|
.unwrap();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
async fn handle_zmq(zmq_url: String, blindbit_url: String) {
|
|
debug!("Starting listening on Core");
|
|
let mut socket = zeromq::SubSocket::new();
|
|
socket.connect(&zmq_url).await.unwrap();
|
|
socket.subscribe("rawtx").await.unwrap();
|
|
socket.subscribe("hashblock").await.unwrap();
|
|
loop {
|
|
let core_msg = match socket.recv().await {
|
|
Ok(m) => m,
|
|
Err(e) => {
|
|
error!("Zmq error: {}", e);
|
|
continue;
|
|
}
|
|
};
|
|
debug!("Received a message");
|
|
|
|
let payload: String = if let (Some(topic), Some(data)) = (core_msg.get(0), core_msg.get(1))
|
|
{
|
|
debug!("topic: {}", std::str::from_utf8(&topic).unwrap());
|
|
match std::str::from_utf8(&topic) {
|
|
Ok("rawtx") => match create_new_tx_message(data.to_vec()) {
|
|
Ok(m) => {
|
|
debug!("Created message");
|
|
serde_json::to_string(&m).expect("This shouldn't fail")
|
|
}
|
|
Err(e) => {
|
|
error!("{}", e);
|
|
continue;
|
|
}
|
|
},
|
|
Ok("hashblock") => {
|
|
let current_height = DAEMON
|
|
.get()
|
|
.unwrap()
|
|
.lock_anyhow()
|
|
.unwrap()
|
|
.get_current_height()
|
|
.unwrap();
|
|
CHAIN_TIP.store(current_height as u32, std::sync::atomic::Ordering::SeqCst);
|
|
|
|
// Add retry logic for hashblock processing
|
|
let mut retry_count = 0;
|
|
const MAX_RETRIES: u32 = 4;
|
|
const RETRY_DELAY_MS: u64 = 1000; // 1 second initial delay
|
|
|
|
loop {
|
|
match scan_blocks(0, &blindbit_url).await {
|
|
Ok(_) => {
|
|
debug!("Successfully scanned blocks after {} retries", retry_count);
|
|
break;
|
|
}
|
|
Err(e) => {
|
|
retry_count += 1;
|
|
if retry_count >= MAX_RETRIES {
|
|
error!(
|
|
"Failed to scan blocks after {} retries: {}",
|
|
MAX_RETRIES, e
|
|
);
|
|
break;
|
|
}
|
|
|
|
// Exponential backoff: 1s, 2s, 4s
|
|
let delay_ms = RETRY_DELAY_MS * (1 << (retry_count - 1));
|
|
warn!(
|
|
"Scan failed (attempt {}/{}), retrying in {}ms: {}",
|
|
retry_count, MAX_RETRIES, delay_ms, e
|
|
);
|
|
|
|
tokio::time::sleep(tokio::time::Duration::from_millis(delay_ms))
|
|
.await;
|
|
}
|
|
}
|
|
}
|
|
continue;
|
|
}
|
|
_ => {
|
|
error!("Unexpected message in zmq");
|
|
continue;
|
|
}
|
|
}
|
|
} else {
|
|
error!("Empty message");
|
|
continue;
|
|
};
|
|
|
|
if let Err(e) = broadcast_message(AnkFlag::NewTx, payload, BroadcastType::ToAll) {
|
|
log::error!("{}", e.to_string());
|
|
}
|
|
}
|
|
}
|
|
|
|
async fn handle_health_endpoint(mut stream: TcpStream) {
|
|
let response = "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: 15\r\n\r\n{\"status\":\"ok\"}";
|
|
let _ = stream.write_all(response.as_bytes()).await;
|
|
let _ = stream.shutdown().await;
|
|
}
|
|
|
|
async fn start_health_server(port: u16) {
|
|
// Use configurable bind address for health server
|
|
let bind_address = std::env::var("HEALTH_BIND_ADDRESS").unwrap_or_else(|_| "0.0.0.0".to_string());
|
|
let listener = match TcpListener::bind(format!("{}:{}", bind_address, port)).await {
|
|
Ok(listener) => listener,
|
|
Err(e) => {
|
|
log::error!("Failed to bind health server on port {}: {}", port, e);
|
|
return;
|
|
}
|
|
};
|
|
|
|
log::info!("Health server listening on port {}", port);
|
|
|
|
while let Ok((stream, _)) = listener.accept().await {
|
|
tokio::spawn(handle_health_endpoint(stream));
|
|
}
|
|
}
|
|
|
|
#[tokio::main(flavor = "multi_thread")]
|
|
async fn main() -> Result<()> {
|
|
env_logger::init();
|
|
|
|
// todo: take the path to conf file as argument
|
|
// default to "./.conf"
|
|
let config = Config::read_from_file(".conf")?;
|
|
|
|
if config.network == Network::Bitcoin {
|
|
warn!("Running on mainnet, you're on your own");
|
|
}
|
|
|
|
MESSAGECACHE
|
|
.set(MessageCache::new())
|
|
.expect("Message Cache initialization failed");
|
|
|
|
PEERMAP
|
|
.set(PeerMap::new(HashMap::new()))
|
|
.expect("PeerMap initialization failed");
|
|
|
|
// Connect the rpc daemon
|
|
DAEMON
|
|
.set(Mutex::new(Box::new(Daemon::connect(
|
|
config.core_wallet,
|
|
config.core_url,
|
|
config.network,
|
|
)?)))
|
|
.expect("DAEMON initialization failed");
|
|
|
|
let current_tip: u32 = DAEMON
|
|
.get()
|
|
.unwrap()
|
|
.lock_anyhow()?
|
|
.get_current_height()?
|
|
.try_into()?;
|
|
|
|
// Set CHAIN_TIP
|
|
CHAIN_TIP.store(current_tip, std::sync::atomic::Ordering::SeqCst);
|
|
|
|
let mut app_dir = PathBuf::from_str(&env::var("HOME")?)?;
|
|
app_dir.push(config.data_dir);
|
|
let mut wallet_file = app_dir.clone();
|
|
wallet_file.push(&config.wallet_name);
|
|
let mut processes_file = app_dir.clone();
|
|
processes_file.push("processes");
|
|
let mut members_file = app_dir.clone();
|
|
members_file.push("members");
|
|
|
|
let wallet_file = StateFile::new(wallet_file);
|
|
let processes_file = StateFile::new(processes_file);
|
|
let members_file = StateFile::new(members_file);
|
|
|
|
// load an existing sp_wallet, or create a new one
|
|
let sp_wallet: SpWallet = match wallet_file.load() {
|
|
Ok(wallet) => {
|
|
// TODO: Verify the wallet is compatible with the current network
|
|
serde_json::from_value(wallet)?
|
|
}
|
|
Err(_) => {
|
|
// Create a new wallet file if it doesn't exist or fails to load
|
|
wallet_file.create()?;
|
|
|
|
let mut rng = thread_rng();
|
|
|
|
let new_client = SpClient::new(
|
|
SecretKey::new(&mut rng),
|
|
SpendKey::Secret(SecretKey::new(&mut rng)),
|
|
config.network,
|
|
)
|
|
.expect("Failed to create a new SpClient");
|
|
|
|
let mut sp_wallet = SpWallet::new(new_client);
|
|
|
|
// Set birthday and update scan information
|
|
sp_wallet.set_birthday(current_tip);
|
|
sp_wallet.set_last_scan(current_tip);
|
|
|
|
// Save the newly created wallet to disk
|
|
let json = serde_json::to_value(sp_wallet.clone())?;
|
|
wallet_file.save(&json)?;
|
|
|
|
sp_wallet
|
|
}
|
|
};
|
|
|
|
let cached_processes: HashMap<OutPoint, Process> = match processes_file.load() {
|
|
Ok(processes) => {
|
|
let deserialized: OutPointProcessMap = serde_json::from_value(processes)?;
|
|
deserialized.0
|
|
}
|
|
Err(_) => {
|
|
debug!("creating process file at {}", processes_file.path.display());
|
|
processes_file.create()?;
|
|
|
|
HashMap::new()
|
|
}
|
|
};
|
|
|
|
let members: HashMap<OutPoint, Member> = match members_file.load() {
|
|
Ok(members) => {
|
|
let deserialized: OutPointMemberMap = serde_json::from_value(members)?;
|
|
deserialized.0
|
|
}
|
|
Err(_) => {
|
|
debug!("creating members file at {}", members_file.path.display());
|
|
members_file.create()?;
|
|
|
|
HashMap::new()
|
|
}
|
|
};
|
|
|
|
{
|
|
let utxo_to_freeze: HashSet<OutPoint> = cached_processes
|
|
.iter()
|
|
.map(|(_, process)| process.get_last_unspent_outpoint().unwrap())
|
|
.collect();
|
|
|
|
let mut freezed_utxos = lock_freezed_utxos()?;
|
|
*freezed_utxos = utxo_to_freeze;
|
|
}
|
|
|
|
let our_sp_address = sp_wallet.get_sp_client().get_receiving_address();
|
|
|
|
log::info!("Using wallet with address {}", our_sp_address,);
|
|
|
|
log::info!(
|
|
"Found {} outputs for a total balance of {}",
|
|
sp_wallet.get_outputs().len(),
|
|
sp_wallet.get_balance()
|
|
);
|
|
|
|
let last_scan = sp_wallet.get_last_scan();
|
|
|
|
WALLET
|
|
.set(Mutex::new(sp_wallet))
|
|
.expect("Failed to initialize WALLET");
|
|
|
|
CACHEDPROCESSES
|
|
.set(Mutex::new(cached_processes))
|
|
.expect("Failed to initialize CACHEDPROCESSES");
|
|
|
|
MEMBERLIST
|
|
.set(Mutex::new(members))
|
|
.expect("Failed to initialize MEMBERLIST");
|
|
|
|
let storage = DiskStorage {
|
|
wallet_file,
|
|
processes_file,
|
|
members_file,
|
|
};
|
|
|
|
STORAGE.set(Mutex::new(storage)).unwrap();
|
|
|
|
let (sink, scan_rx, state_rx) = NativeUpdateSink::new();
|
|
init_update_sink(Arc::new(sink));
|
|
|
|
// Spawn the update handlers
|
|
tokio::spawn(handle_scan_updates(scan_rx));
|
|
tokio::spawn(handle_state_updates(state_rx));
|
|
|
|
if last_scan < current_tip {
|
|
log::info!("Scanning for our outputs");
|
|
let blindbit_url = config.blindbit_url.clone();
|
|
tokio::spawn(async move {
|
|
if let Err(e) = scan_blocks(current_tip - last_scan, &blindbit_url).await {
|
|
log::error!("Failed to scan blocks: {}", e);
|
|
} else {
|
|
log::info!("Block scan completed successfully");
|
|
}
|
|
});
|
|
}
|
|
|
|
// Init peers store and optional bootstrap
|
|
peer_store::init(config.ws_url.clone(), config.bootstrap_url.clone());
|
|
if let Some(bs_url) = &config.bootstrap_url {
|
|
let bs_url = bs_url.clone();
|
|
let do_faucet = config.bootstrap_faucet;
|
|
let our_sp = our_sp_address.to_string();
|
|
tokio::spawn(async move {
|
|
log::info!("Starting bootstrap connection and sync");
|
|
if let Err(e) = crate::sync::bootstrap_connect_and_sync(bs_url.clone()).await {
|
|
log::warn!("bootstrap failed: {}", e);
|
|
} else {
|
|
log::info!("Bootstrap sync successful, checking faucet request");
|
|
if do_faucet && !peer_store::faucet_already_done(&bs_url) {
|
|
log::info!("Sending faucet request to bootstrap");
|
|
let env = sdk_common::network::Envelope {
|
|
flag: sdk_common::network::AnkFlag::Faucet,
|
|
content: serde_json::json!({
|
|
"sp_address": our_sp,
|
|
"commitment": "",
|
|
"error": null
|
|
}).to_string(),
|
|
};
|
|
if let Ok((mut ws, _)) = tokio_tungstenite::connect_async(&bs_url).await {
|
|
log::info!("Connected to bootstrap for faucet request");
|
|
let _ = ws
|
|
.send(tokio_tungstenite::tungstenite::Message::Text(
|
|
serde_json::to_string(&env).unwrap(),
|
|
))
|
|
.await;
|
|
log::info!("Faucet request sent to bootstrap");
|
|
peer_store::mark_faucet_done(&bs_url);
|
|
} else {
|
|
log::error!("Failed to connect to bootstrap for faucet request");
|
|
}
|
|
} else {
|
|
log::info!("Faucet request skipped: do_faucet={}, already_done={}", do_faucet, peer_store::faucet_already_done(&bs_url));
|
|
}
|
|
}
|
|
});
|
|
}
|
|
|
|
// Subscribe to Bitcoin Core
|
|
let zmq_url = config.zmq_url.clone();
|
|
let blindbit_url = config.blindbit_url.clone();
|
|
tokio::spawn(async move {
|
|
handle_zmq(zmq_url, blindbit_url).await;
|
|
});
|
|
|
|
// Create the event loop and TCP listener we'll accept connections on.
|
|
// Try to bind with retry logic
|
|
let mut listener = None;
|
|
let mut retry_count = 0;
|
|
const MAX_RETRIES: u32 = 5;
|
|
const RETRY_DELAY_MS: u64 = 1000;
|
|
|
|
// Allow environment variable override of ws_url
|
|
let ws_bind_url = std::env::var("WS_BIND_URL").unwrap_or_else(|_| config.ws_url.clone());
|
|
log::info!("Using WebSocket bind URL: {}", ws_bind_url);
|
|
|
|
while listener.is_none() && retry_count < MAX_RETRIES {
|
|
let try_socket = TcpListener::bind(ws_bind_url.clone()).await;
|
|
match try_socket {
|
|
Ok(socket) => {
|
|
log::info!("Successfully bound to {}", ws_bind_url);
|
|
listener = Some(socket);
|
|
}
|
|
Err(e) => {
|
|
retry_count += 1;
|
|
log::warn!("Failed to bind to {} (attempt {}/{}): {}", ws_bind_url, retry_count, MAX_RETRIES, e);
|
|
if retry_count < MAX_RETRIES {
|
|
log::info!("Retrying in {}ms...", RETRY_DELAY_MS);
|
|
tokio::time::sleep(tokio::time::Duration::from_millis(RETRY_DELAY_MS)).await;
|
|
} else {
|
|
log::error!("Failed to bind to {} after {} attempts: {}", ws_bind_url, MAX_RETRIES, e);
|
|
return Err(anyhow::anyhow!("Failed to bind to {} after {} attempts: {}", ws_bind_url, MAX_RETRIES, e));
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
let listener = listener.unwrap();
|
|
|
|
tokio::spawn(MessageCache::clean_up());
|
|
|
|
// Start health server on configurable port
|
|
let health_port = std::env::var("HEALTH_PORT")
|
|
.ok()
|
|
.and_then(|p| p.parse::<u16>().ok())
|
|
.unwrap_or(8091);
|
|
tokio::spawn(start_health_server(health_port));
|
|
|
|
// Let's spawn the handling of each connection in a separate task.
|
|
while let Ok((stream, addr)) = listener.accept().await {
|
|
tokio::spawn(handle_connection(stream, addr, our_sp_address));
|
|
}
|
|
|
|
Ok(())
|
|
}
|