From cabc273cb211d57effacd01ffb31b63c58f2c91e Mon Sep 17 00:00:00 2001 From: 4NK Dev Date: Wed, 1 Oct 2025 09:45:33 +0000 Subject: [PATCH] style: format code (whitespace cleanup) --- src/anchor.rs | 110 +++++++++++++++++----------------- src/api/metrics_prometheus.rs | 17 +++--- src/api/payments.rs | 13 ++-- src/api/verify.rs | 19 +++--- src/main.rs | 8 +-- src/monitor.rs | 70 +++++++++++----------- src/payment_watcher.rs | 77 ++++++++++++------------ 7 files changed, 155 insertions(+), 159 deletions(-) diff --git a/src/anchor.rs b/src/anchor.rs index 6c1f689..00c4dba 100644 --- a/src/anchor.rs +++ b/src/anchor.rs @@ -17,7 +17,7 @@ use crate::models::{Anchor, ProcessStateSnapshot, PriceConfig}; pub async fn start_anchoring_loop(config: Config, db: Arc>) -> Result<()> { info!("⚓ Starting anchoring loop..."); - + loop { match perform_anchoring_cycle(&config, &db).await { Ok(_) => { @@ -27,7 +27,7 @@ pub async fn start_anchoring_loop(config: Config, db: Arc>) -> R error!("Anchoring cycle error: {}", e); } } - + // Wait for next cycle (check every 10 minutes) sleep(Duration::from_secs(600)).await; } @@ -38,7 +38,7 @@ async fn perform_anchoring_cycle(config: &Config, db: &Arc>) -> debug!("Auto-anchoring is disabled"); return Ok(()); } - + // Connect to Bitcoin RPC let rpc_client = Client::new( &config.bitcoin.rpc_url, @@ -47,50 +47,50 @@ async fn perform_anchoring_cycle(config: &Config, db: &Arc>) -> config.bitcoin.rpc_password.clone(), ), )?; - + let blockchain_info = rpc_client.get_blockchain_info()?; let current_block = blockchain_info.blocks as u32; - + debug!("📊 Current block height: {}", current_block); - + // Get all processes let db_lock = db.lock().await; let processes = db_lock.get_all_processes().await?; drop(db_lock); - + for process in processes { // Check if process has paid status if process.payment_status != "PAID" { - debug!("⏭️ Skipping process {} (payment status: {})", + debug!("⏭️ Skipping process {} (payment status: {})", process.process_id, process.payment_status); continue; } - + // Check if we need to anchor (based on block interval) let db_lock = db.lock().await; let existing_anchors = db_lock.get_anchors_for_process(&process.process_id).await?; drop(db_lock); - + let last_anchor_block = existing_anchors.first() .map(|a| a.period_end_block as u32) .unwrap_or(0); - + let blocks_since_last_anchor = current_block.saturating_sub(last_anchor_block); - + if blocks_since_last_anchor < config.anchoring.interval_blocks { - debug!("⏳ Process {}: {} blocks since last anchor (need {})", - process.process_id, + debug!("⏳ Process {}: {} blocks since last anchor (need {})", + process.process_id, blocks_since_last_anchor, config.anchoring.interval_blocks); continue; } - + info!("🎯 Time to anchor process: {}", process.process_id); - + // Calculate period let period_start = last_anchor_block; let period_end = current_block; - + // Get metrics for this period let db_lock = db.lock().await; let metrics = db_lock.get_metrics_for_period( @@ -99,45 +99,45 @@ async fn perform_anchoring_cycle(config: &Config, db: &Arc>) -> period_end as i32, ).await?; drop(db_lock); - + if metrics.is_empty() { - info!("⚠️ No metrics found for process {} in period {}-{}", + info!("⚠️ No metrics found for process {} in period {}-{}", process.process_id, period_start, period_end); continue; } - + // Aggregate metrics let total_bytes_sent: i64 = metrics.iter().map(|m| m.bytes_sent).sum(); let total_bytes_received: i64 = metrics.iter().map(|m| m.bytes_received).sum(); let total_messages: i32 = metrics.iter().map(|m| m.message_count).sum(); - + let total_bytes = (total_bytes_sent + total_bytes_received) as u64; let total_mb = total_bytes / 1_048_576; - - info!("📊 Process {} stats: {} MB, {} messages", + + info!("📊 Process {} stats: {} MB, {} messages", process.process_id, total_mb, total_messages); - + // Check payment condition - if let (Some(price_mo_sats), Some(btc_address)) = - (process.price_mo_sats, &process.btc_address) + if let (Some(price_mo_sats), Some(btc_address)) = + (process.price_mo_sats, &process.btc_address) { let price_config = PriceConfig { price_mo_sats: price_mo_sats as u64, btc_address: btc_address.clone(), }; - + let required_sats = price_config.price_mo_sats * total_mb; let paid_sats = process.total_paid_sats as u64; - + if paid_sats < required_sats { - warn!("⚠️ Process {}: Insufficient payment ({} < {} sats)", + warn!("⚠️ Process {}: Insufficient payment ({} < {} sats)", process.process_id, paid_sats, required_sats); continue; } - + info!("✅ Payment verified: {} >= {} sats", paid_sats, required_sats); } - + // Create snapshot let snapshot = ProcessStateSnapshot { process_id: process.process_id.clone(), @@ -151,12 +151,12 @@ async fn perform_anchoring_cycle(config: &Config, db: &Arc>) -> .and_then(|bytes| bytes.try_into().ok()) .unwrap_or([0u8; 32]), }; - + // Compute anchor hash let anchor_hash = snapshot.compute_anchor_hash(); - + info!("🔐 Computed anchor hash: {}", hex::encode(&anchor_hash)); - + // Create anchor record let anchor = Anchor { id: None, @@ -170,25 +170,25 @@ async fn perform_anchoring_cycle(config: &Config, db: &Arc>) -> created_at: Utc::now(), status: "PENDING".to_string(), }; - + // Save to database let db_lock = db.lock().await; let anchor_id = db_lock.insert_anchor(&anchor).await?; drop(db_lock); - + info!("💾 Anchor record created with id: {}", anchor_id); - + // Create and broadcast Bitcoin transaction match create_and_broadcast_anchor_tx(&anchor_hash, &rpc_client, config).await { Ok(txid) => { info!("✅ Anchor broadcasted! Txid: {}", txid); - + // Update anchor with txid let db_lock = db.lock().await; // TODO: Add update_anchor method to Database // For now, the anchor remains with status PENDING drop(db_lock); - + info!("🎉 Process {} successfully anchored on Bitcoin mainnet!", process.process_id); } Err(e) => { @@ -196,7 +196,7 @@ async fn perform_anchoring_cycle(config: &Config, db: &Arc>) -> } } } - + Ok(()) } @@ -206,25 +206,25 @@ async fn create_and_broadcast_anchor_tx( config: &Config, ) -> Result { info!("🔨 Creating anchor transaction..."); - + // Load wallet if let Err(e) = rpc.load_wallet(&config.bitcoin.wallet_name) { warn!("Wallet load warning (may already be loaded): {}", e); } - + // Get a new address from our wallet for change let change_address = rpc.get_new_address(None, None)?; - + // Create OP_RETURN output let op_return_script = create_op_return_anchor(anchor_hash); let op_return_output = TxOut { value: Amount::ZERO, script_pubkey: op_return_script, }; - + // We need at least one input to pay for fees // Use Bitcoin Core's fundrawtransaction to handle this - + // Create a basic transaction with just the OP_RETURN output let mut tx = Transaction { version: 2, @@ -232,31 +232,31 @@ async fn create_and_broadcast_anchor_tx( input: vec![], output: vec![op_return_output], }; - + // Serialize the transaction let tx_hex = bitcoin::consensus::encode::serialize_hex(&tx); - + // Fund the transaction (adds inputs and change output) let funded = rpc.fund_raw_transaction(&tx_hex, None, None)?; - + // Sign the transaction let signed = rpc.sign_raw_transaction_with_wallet(&funded.hex, None, None)?; - + if !signed.complete { return Err(anyhow!("Transaction signing incomplete")); } - + // Test the transaction first let test_result = rpc.test_mempool_accept(&[signed.hex.clone()])?; if !test_result[0].allowed { return Err(anyhow!("Transaction rejected by mempool: {:?}", test_result[0].reject_reason)); } - + // Broadcast let txid = rpc.send_raw_transaction(&signed.hex)?; - + info!("📡 Transaction broadcasted: {}", txid); - + Ok(txid) } @@ -264,7 +264,7 @@ fn create_op_return_anchor(hash: &[u8; 32]) -> ScriptBuf { let mut data = Vec::new(); data.extend_from_slice(b"NKA1"); // Protocol identifier: 4NK Anchor v1 data.extend_from_slice(hash); - + ScriptBuf::new_op_return(&data) } @@ -280,7 +280,7 @@ pub fn verify_anchor(tx: &Transaction, expected_hash: &[u8; 32]) -> bool { } else { 3 // Multi-byte push }; - + if script_bytes.len() >= data_start + 36 { let protocol_id = &script_bytes[data_start..data_start + 4]; if protocol_id == b"NKA1" { diff --git a/src/api/metrics_prometheus.rs b/src/api/metrics_prometheus.rs index cdb22a6..23224dc 100644 --- a/src/api/metrics_prometheus.rs +++ b/src/api/metrics_prometheus.rs @@ -8,22 +8,22 @@ use crate::db::Database; lazy_static! { static ref REGISTRY: Registry = Registry::new(); - + static ref PROCESSES_TOTAL: IntGauge = IntGauge::new( "4nk_certificator_processes_total", "Total number of monitored processes" ).unwrap(); - + static ref ANCHORS_CREATED: IntCounter = IntCounter::new( "4nk_certificator_anchors_created_total", "Total number of anchors created" ).unwrap(); - + static ref ANCHORS_CONFIRMED: IntCounter = IntCounter::new( "4nk_certificator_anchors_confirmed_total", "Total number of anchors confirmed on-chain" ).unwrap(); - + static ref DATA_VOLUME_BYTES: IntCounter = IntCounter::new( "4nk_certificator_data_volume_bytes", "Total data volume monitored in bytes" @@ -43,18 +43,18 @@ pub async fn metrics_handler(db: web::Data>>) -> HttpRespons if let Ok(processes) = db_lock.get_all_processes().await { PROCESSES_TOTAL.set(processes.len() as i64); } - + // TODO: Count anchors and data volume } - + let encoder = TextEncoder::new(); let metric_families = REGISTRY.gather(); let mut buffer = vec![]; - + if let Err(e) = encoder.encode(&metric_families, &mut buffer) { return HttpResponse::InternalServerError().body(format!("Failed to encode metrics: {}", e)); } - + HttpResponse::Ok() .content_type("text/plain; version=0.0.4") .body(buffer) @@ -71,4 +71,3 @@ pub fn increment_anchors_confirmed() { pub fn add_data_volume(bytes: u64) { DATA_VOLUME_BYTES.inc_by(bytes); } - diff --git a/src/api/payments.rs b/src/api/payments.rs index 7fa05fe..310b26e 100644 --- a/src/api/payments.rs +++ b/src/api/payments.rs @@ -10,30 +10,30 @@ pub async fn get_payments( ) -> HttpResponse { let address = path.into_inner(); let db = db.lock().await; - + // Get all processes with this address match db.get_all_processes().await { Ok(processes) => { let matching_processes: Vec<_> = processes.iter() .filter(|p| p.btc_address.as_ref() == Some(&address)) .collect(); - + if matching_processes.is_empty() { return HttpResponse::NotFound().json(serde_json::json!({ "error": "No processes found with this address" })); } - + let mut all_payments = vec![]; let mut total_received: i64 = 0; - + // TODO: Add get_payments_for_address method to Database // For now, return process info - + for process in matching_processes { total_received += process.total_paid_sats; } - + HttpResponse::Ok().json(serde_json::json!({ "address": address, "total_received_sats": total_received, @@ -46,4 +46,3 @@ pub async fn get_payments( })) } } - diff --git a/src/api/verify.rs b/src/api/verify.rs index 351cd3c..9df7960 100644 --- a/src/api/verify.rs +++ b/src/api/verify.rs @@ -22,16 +22,16 @@ pub async fn verify_anchor_endpoint( "error": format!("Invalid anchor hash: {}", e) })) }; - + if anchor_hash_bytes.len() != 32 { return HttpResponse::BadRequest().json(serde_json::json!({ "error": "Anchor hash must be 32 bytes" })); } - + let mut hash_array = [0u8; 32]; hash_array.copy_from_slice(&anchor_hash_bytes); - + // Parse txid let txid = match req.txid.parse() { Ok(t) => t, @@ -39,7 +39,7 @@ pub async fn verify_anchor_endpoint( "error": format!("Invalid txid: {}", e) })) }; - + // Connect to Bitcoin RPC let rpc_client = match Client::new( &config.bitcoin.rpc_url, @@ -53,7 +53,7 @@ pub async fn verify_anchor_endpoint( "error": format!("Failed to connect to Bitcoin RPC: {}", e) })) }; - + // Get transaction let tx_info = match rpc_client.get_raw_transaction_info(&txid, None) { Ok(info) => info, @@ -65,20 +65,20 @@ pub async fn verify_anchor_endpoint( } })) }; - + let tx = match tx_info.transaction() { Some(t) => t, None => return HttpResponse::InternalServerError().json(serde_json::json!({ "error": "Failed to decode transaction" })) }; - + // Verify anchor let hash_matches = verify_anchor(&tx, &hash_array); - + let confirmations = tx_info.confirmations.unwrap_or(0); let block_height = tx_info.blockheight; - + HttpResponse::Ok().json(serde_json::json!({ "valid": hash_matches, "details": { @@ -90,4 +90,3 @@ pub async fn verify_anchor_endpoint( } })) } - diff --git a/src/main.rs b/src/main.rs index 7682a2c..4fa3115 100644 --- a/src/main.rs +++ b/src/main.rs @@ -32,11 +32,11 @@ async fn main() -> Result<()> { // Run migrations database.run_migrations().await?; info!("✅ Database migrations applied"); - + // Initialize Prometheus metrics api::metrics_prometheus::init_metrics(); info!("📊 Prometheus metrics initialized"); - + // Wrap database in Arc for shared state let db = Arc::new(Mutex::new(database)); @@ -48,7 +48,7 @@ async fn main() -> Result<()> { monitor::start_monitoring(config, db).await } }); - + let payment_handle = tokio::spawn({ let config = config.clone(); let db = db.clone(); @@ -56,7 +56,7 @@ async fn main() -> Result<()> { payment_watcher::start_payment_watching(config, db).await } }); - + let anchor_handle = tokio::spawn({ let config = config.clone(); let db = db.clone(); diff --git a/src/monitor.rs b/src/monitor.rs index 371b0e7..b49483c 100644 --- a/src/monitor.rs +++ b/src/monitor.rs @@ -14,7 +14,7 @@ use crate::models::{Process, Metric, PriceConfig}; pub async fn start_monitoring(config: Config, db: Arc>) -> Result<()> { info!("📡 Starting relay monitoring..."); - + loop { match monitor_relay(&config, &db).await { Ok(_) => { @@ -30,15 +30,15 @@ pub async fn start_monitoring(config: Config, db: Arc>) -> Resul async fn monitor_relay(config: &Config, db: &Arc>) -> Result<()> { info!("🔌 Connecting to relay at {}", config.relay.websocket_url); - + let (ws_stream, _) = connect_async(&config.relay.websocket_url) .await .map_err(|e| anyhow!("Failed to connect to relay: {}", e))?; - + info!("✅ Connected to relay WebSocket"); - + let (mut write, mut read) = ws_stream.split(); - + // Send a ping to keep connection alive tokio::spawn(async move { loop { @@ -49,7 +49,7 @@ async fn monitor_relay(config: &Config, db: &Arc>) -> Result<()> sleep(Duration::from_secs(30)).await; } }); - + // Listen for messages while let Some(msg) = read.next().await { match msg { @@ -75,19 +75,19 @@ async fn monitor_relay(config: &Config, db: &Arc>) -> Result<()> _ => {} } } - + Ok(()) } async fn handle_relay_message(text: &str, db: &Arc>) -> Result<()> { let envelope: Value = serde_json::from_str(text)?; - + let flag = envelope.get("flag") .and_then(|f| f.as_str()) .unwrap_or("Unknown"); - + debug!("📨 Received message with flag: {}", flag); - + match flag { "Handshake" => { handle_handshake(&envelope, db).await?; @@ -105,7 +105,7 @@ async fn handle_relay_message(text: &str, db: &Arc>) -> Result<( .and_then(|c| c.as_str()) .map(|s| s.len()) .unwrap_or(0); - + debug!("Data message size: {} bytes", content_size); // TODO: Attribute to specific process if possible } @@ -113,26 +113,26 @@ async fn handle_relay_message(text: &str, db: &Arc>) -> Result<( debug!("Unhandled message flag: {}", flag); } } - + Ok(()) } async fn handle_handshake(envelope: &Value, db: &Arc>) -> Result<()> { info!("🤝 Processing Handshake message"); - + let content = envelope.get("content") .and_then(|c| c.as_str()) .ok_or_else(|| anyhow!("No content in handshake"))?; - + let handshake: Value = serde_json::from_str(content)?; - + // Extract processes list if let Some(processes_list) = handshake.get("processes_list") { if let Some(processes_map) = processes_list.as_object() { info!("📦 Found {} processes in handshake", processes_map.len()); - + let db_lock = db.lock().await; - + for (process_id, process_data) in processes_map { // Extract process information if let Err(e) = sync_process_from_handshake(process_id, process_data, &db_lock).await { @@ -141,7 +141,7 @@ async fn handle_handshake(envelope: &Value, db: &Arc>) -> Result } } } - + Ok(()) } @@ -151,10 +151,10 @@ async fn sync_process_from_handshake( db: &Database, ) -> Result<()> { debug!("Syncing process: {}", process_id); - + // Try to extract price configuration from public_data or roles let price_config = extract_price_config(process_data); - + let process = Process { process_id: process_id.to_string(), created_at: Utc::now(), @@ -165,17 +165,17 @@ async fn sync_process_from_handshake( total_paid_sats: 0, state_merkle_root: None, }; - + db.upsert_process(&process).await?; info!("✅ Synced process: {}", process_id); - + Ok(()) } fn extract_price_config(process_data: &Value) -> Option { // Try to find price configuration in the process data // It could be in public_data or in the process state - + if let Some(states) = process_data.get("states").and_then(|s| s.as_array()) { for state in states { if let Some(public_data) = state.get("public_data") { @@ -185,23 +185,23 @@ fn extract_price_config(process_data: &Value) -> Option { } } } - + // Also check root level if let Some(price) = process_data.get("price") { return parse_price_config(price); } - + None } fn parse_price_config(price_value: &Value) -> Option { let price_mo_sats = price_value.get("priceMoSats") .and_then(|v| v.as_u64())?; - + let btc_address = price_value.get("btcAddress") .and_then(|v| v.as_str())? .to_string(); - + Some(PriceConfig { price_mo_sats, btc_address, @@ -210,20 +210,20 @@ fn parse_price_config(price_value: &Value) -> Option { async fn handle_commit_message(envelope: &Value, db: &Arc>) -> Result<()> { debug!("📝 Processing Commit message"); - + let content = envelope.get("content") .and_then(|c| c.as_str()) .ok_or_else(|| anyhow!("No content in commit"))?; - + let commit_msg: Value = serde_json::from_str(content)?; - + let process_id = commit_msg.get("process_id") .and_then(|p| p.as_str()) .ok_or_else(|| anyhow!("No process_id in commit"))?; - + // Estimate data size from the commit message let data_size = content.len() as i64; - + // Record metric let metric = Metric { id: None, @@ -234,11 +234,11 @@ async fn handle_commit_message(envelope: &Value, db: &Arc>) -> R bytes_received: 0, message_count: 1, }; - + let db_lock = db.lock().await; db_lock.insert_metric(&metric).await?; - + debug!("📊 Recorded metric for process {}: {} bytes", process_id, data_size); - + Ok(()) } diff --git a/src/payment_watcher.rs b/src/payment_watcher.rs index 714c427..80787f7 100644 --- a/src/payment_watcher.rs +++ b/src/payment_watcher.rs @@ -14,7 +14,7 @@ use crate::models::Payment; pub async fn start_payment_watching(config: Config, db: Arc>) -> Result<()> { info!("💰 Starting payment watcher..."); - + loop { match watch_payments(&config, &db).await { Ok(_) => { @@ -30,7 +30,7 @@ pub async fn start_payment_watching(config: Config, db: Arc>) -> async fn watch_payments(config: &Config, db: &Arc>) -> Result<()> { info!("🔌 Connecting to Bitcoin RPC at {}", config.bitcoin.rpc_url); - + let rpc_client = Client::new( &config.bitcoin.rpc_url, Auth::UserPass( @@ -38,40 +38,40 @@ async fn watch_payments(config: &Config, db: &Arc>) -> Result<() config.bitcoin.rpc_password.clone(), ), )?; - + info!("✅ Connected to Bitcoin RPC"); - + // Verify connection let blockchain_info = rpc_client.get_blockchain_info()?; - info!("📊 Bitcoin network: {}, blocks: {}", - blockchain_info.chain, + info!("📊 Bitcoin network: {}, blocks: {}", + blockchain_info.chain, blockchain_info.blocks); - + let mut last_block_height = blockchain_info.blocks; - + loop { // Check for new blocks let current_info = rpc_client.get_blockchain_info()?; let current_height = current_info.blocks; - + if current_height > last_block_height { info!("🔔 New block detected: {} -> {}", last_block_height, current_height); - + // Scan new blocks for payments to our watched addresses for height in (last_block_height + 1)..=current_height { if let Err(e) = scan_block_for_payments(height, &rpc_client, db, config).await { error!("Failed to scan block {}: {}", height, e); } } - + last_block_height = current_height; } - + // Check mempool for pending payments if let Err(e) = scan_mempool_for_payments(&rpc_client, db).await { warn!("Failed to scan mempool: {}", e); } - + sleep(Duration::from_secs(config.relay.monitor_interval_secs)).await; } } @@ -83,14 +83,14 @@ async fn scan_block_for_payments( config: &Config, ) -> Result<()> { debug!("🔍 Scanning block {} for payments", block_height); - + let block_hash = rpc.get_block_hash(block_height)?; let block = rpc.get_block(&block_hash)?; - + let db_lock = db.lock().await; let processes = db_lock.get_all_processes().await?; drop(db_lock); - + // Get all addresses we're watching let watched_addresses: Vec<(String, Address)> = processes.iter() .filter_map(|p| { @@ -101,13 +101,13 @@ async fn scan_block_for_payments( }) }) .collect(); - + if watched_addresses.is_empty() { return Ok(()); } - + debug!("👀 Watching {} addresses", watched_addresses.len()); - + // Scan all transactions in the block for tx in &block.txdata { for (vout_idx, output) in tx.output.iter().enumerate() { @@ -116,10 +116,10 @@ async fn scan_block_for_payments( for (process_id, watched_addr) in &watched_addresses { if addr == *watched_addr { let amount_sats = output.value.to_sat() as i64; - - info!("💸 Payment detected! Process: {}, Amount: {} sats, Tx: {}", + + info!("💸 Payment detected! Process: {}, Amount: {} sats, Tx: {}", process_id, amount_sats, tx.txid()); - + // Record payment let payment = Payment { id: None, @@ -131,7 +131,7 @@ async fn scan_block_for_payments( block_height: Some(block_height as i32), confirmations: 1, }; - + let db_lock = db.lock().await; if let Err(e) = db_lock.insert_payment(&payment).await { error!("Failed to insert payment: {}", e); @@ -140,17 +140,17 @@ async fn scan_block_for_payments( if let Ok(Some(mut process)) = db_lock.get_process(process_id).await { process.total_paid_sats += amount_sats; process.payment_status = if confirmations_sufficient( - 1, + 1, config.bitcoin.min_confirmations ) { "PAID".to_string() } else { "PENDING".to_string() }; - + let _ = db_lock.upsert_process(&process).await; - - info!("✅ Updated process {} payment status: {}", + + info!("✅ Updated process {} payment status: {}", process_id, process.payment_status); } } @@ -160,7 +160,7 @@ async fn scan_block_for_payments( } } } - + Ok(()) } @@ -170,17 +170,17 @@ async fn scan_mempool_for_payments( ) -> Result<()> { // Get mempool transactions let mempool_txids = rpc.get_raw_mempool()?; - + if mempool_txids.is_empty() { return Ok(()); } - + debug!("🔍 Scanning {} mempool transactions", mempool_txids.len()); - + let db_lock = db.lock().await; let processes = db_lock.get_all_processes().await?; drop(db_lock); - + // Get all addresses we're watching let watched_addresses: Vec<(String, Address)> = processes.iter() .filter_map(|p| { @@ -191,11 +191,11 @@ async fn scan_mempool_for_payments( }) }) .collect(); - + if watched_addresses.is_empty() { return Ok(()); } - + // Scan transactions (limit to avoid overload) for txid in mempool_txids.iter().take(100) { if let Ok(tx_info) = rpc.get_raw_transaction_info(txid, None) { @@ -205,10 +205,10 @@ async fn scan_mempool_for_payments( for (process_id, watched_addr) in &watched_addresses { if addr == *watched_addr { let amount_sats = output.value.to_sat() as i64; - - info!("💸 Pending payment in mempool! Process: {}, Amount: {} sats", + + info!("💸 Pending payment in mempool! Process: {}, Amount: {} sats", process_id, amount_sats); - + // Update process status to PENDING let db_lock = db.lock().await; if let Ok(Some(mut process)) = db_lock.get_process(process_id).await { @@ -225,11 +225,10 @@ async fn scan_mempool_for_payments( } } } - + Ok(()) } fn confirmations_sufficient(current: u32, required: u32) -> bool { current >= required } -