style: format code (whitespace cleanup)

This commit is contained in:
4NK Dev 2025-10-01 09:45:33 +00:00
parent 3b3650d7de
commit cabc273cb2
7 changed files with 155 additions and 159 deletions

View File

@ -17,7 +17,7 @@ use crate::models::{Anchor, ProcessStateSnapshot, PriceConfig};
pub async fn start_anchoring_loop(config: Config, db: Arc<Mutex<Database>>) -> Result<()> { pub async fn start_anchoring_loop(config: Config, db: Arc<Mutex<Database>>) -> Result<()> {
info!("⚓ Starting anchoring loop..."); info!("⚓ Starting anchoring loop...");
loop { loop {
match perform_anchoring_cycle(&config, &db).await { match perform_anchoring_cycle(&config, &db).await {
Ok(_) => { Ok(_) => {
@ -27,7 +27,7 @@ pub async fn start_anchoring_loop(config: Config, db: Arc<Mutex<Database>>) -> R
error!("Anchoring cycle error: {}", e); error!("Anchoring cycle error: {}", e);
} }
} }
// Wait for next cycle (check every 10 minutes) // Wait for next cycle (check every 10 minutes)
sleep(Duration::from_secs(600)).await; sleep(Duration::from_secs(600)).await;
} }
@ -38,7 +38,7 @@ async fn perform_anchoring_cycle(config: &Config, db: &Arc<Mutex<Database>>) ->
debug!("Auto-anchoring is disabled"); debug!("Auto-anchoring is disabled");
return Ok(()); return Ok(());
} }
// Connect to Bitcoin RPC // Connect to Bitcoin RPC
let rpc_client = Client::new( let rpc_client = Client::new(
&config.bitcoin.rpc_url, &config.bitcoin.rpc_url,
@ -47,50 +47,50 @@ async fn perform_anchoring_cycle(config: &Config, db: &Arc<Mutex<Database>>) ->
config.bitcoin.rpc_password.clone(), config.bitcoin.rpc_password.clone(),
), ),
)?; )?;
let blockchain_info = rpc_client.get_blockchain_info()?; let blockchain_info = rpc_client.get_blockchain_info()?;
let current_block = blockchain_info.blocks as u32; let current_block = blockchain_info.blocks as u32;
debug!("📊 Current block height: {}", current_block); debug!("📊 Current block height: {}", current_block);
// Get all processes // Get all processes
let db_lock = db.lock().await; let db_lock = db.lock().await;
let processes = db_lock.get_all_processes().await?; let processes = db_lock.get_all_processes().await?;
drop(db_lock); drop(db_lock);
for process in processes { for process in processes {
// Check if process has paid status // Check if process has paid status
if process.payment_status != "PAID" { if process.payment_status != "PAID" {
debug!("⏭️ Skipping process {} (payment status: {})", debug!("⏭️ Skipping process {} (payment status: {})",
process.process_id, process.payment_status); process.process_id, process.payment_status);
continue; continue;
} }
// Check if we need to anchor (based on block interval) // Check if we need to anchor (based on block interval)
let db_lock = db.lock().await; let db_lock = db.lock().await;
let existing_anchors = db_lock.get_anchors_for_process(&process.process_id).await?; let existing_anchors = db_lock.get_anchors_for_process(&process.process_id).await?;
drop(db_lock); drop(db_lock);
let last_anchor_block = existing_anchors.first() let last_anchor_block = existing_anchors.first()
.map(|a| a.period_end_block as u32) .map(|a| a.period_end_block as u32)
.unwrap_or(0); .unwrap_or(0);
let blocks_since_last_anchor = current_block.saturating_sub(last_anchor_block); let blocks_since_last_anchor = current_block.saturating_sub(last_anchor_block);
if blocks_since_last_anchor < config.anchoring.interval_blocks { if blocks_since_last_anchor < config.anchoring.interval_blocks {
debug!("⏳ Process {}: {} blocks since last anchor (need {})", debug!("⏳ Process {}: {} blocks since last anchor (need {})",
process.process_id, process.process_id,
blocks_since_last_anchor, blocks_since_last_anchor,
config.anchoring.interval_blocks); config.anchoring.interval_blocks);
continue; continue;
} }
info!("🎯 Time to anchor process: {}", process.process_id); info!("🎯 Time to anchor process: {}", process.process_id);
// Calculate period // Calculate period
let period_start = last_anchor_block; let period_start = last_anchor_block;
let period_end = current_block; let period_end = current_block;
// Get metrics for this period // Get metrics for this period
let db_lock = db.lock().await; let db_lock = db.lock().await;
let metrics = db_lock.get_metrics_for_period( let metrics = db_lock.get_metrics_for_period(
@ -99,45 +99,45 @@ async fn perform_anchoring_cycle(config: &Config, db: &Arc<Mutex<Database>>) ->
period_end as i32, period_end as i32,
).await?; ).await?;
drop(db_lock); drop(db_lock);
if metrics.is_empty() { if metrics.is_empty() {
info!("⚠️ No metrics found for process {} in period {}-{}", info!("⚠️ No metrics found for process {} in period {}-{}",
process.process_id, period_start, period_end); process.process_id, period_start, period_end);
continue; continue;
} }
// Aggregate metrics // Aggregate metrics
let total_bytes_sent: i64 = metrics.iter().map(|m| m.bytes_sent).sum(); let total_bytes_sent: i64 = metrics.iter().map(|m| m.bytes_sent).sum();
let total_bytes_received: i64 = metrics.iter().map(|m| m.bytes_received).sum(); let total_bytes_received: i64 = metrics.iter().map(|m| m.bytes_received).sum();
let total_messages: i32 = metrics.iter().map(|m| m.message_count).sum(); let total_messages: i32 = metrics.iter().map(|m| m.message_count).sum();
let total_bytes = (total_bytes_sent + total_bytes_received) as u64; let total_bytes = (total_bytes_sent + total_bytes_received) as u64;
let total_mb = total_bytes / 1_048_576; let total_mb = total_bytes / 1_048_576;
info!("📊 Process {} stats: {} MB, {} messages", info!("📊 Process {} stats: {} MB, {} messages",
process.process_id, total_mb, total_messages); process.process_id, total_mb, total_messages);
// Check payment condition // Check payment condition
if let (Some(price_mo_sats), Some(btc_address)) = if let (Some(price_mo_sats), Some(btc_address)) =
(process.price_mo_sats, &process.btc_address) (process.price_mo_sats, &process.btc_address)
{ {
let price_config = PriceConfig { let price_config = PriceConfig {
price_mo_sats: price_mo_sats as u64, price_mo_sats: price_mo_sats as u64,
btc_address: btc_address.clone(), btc_address: btc_address.clone(),
}; };
let required_sats = price_config.price_mo_sats * total_mb; let required_sats = price_config.price_mo_sats * total_mb;
let paid_sats = process.total_paid_sats as u64; let paid_sats = process.total_paid_sats as u64;
if paid_sats < required_sats { if paid_sats < required_sats {
warn!("⚠️ Process {}: Insufficient payment ({} < {} sats)", warn!("⚠️ Process {}: Insufficient payment ({} < {} sats)",
process.process_id, paid_sats, required_sats); process.process_id, paid_sats, required_sats);
continue; continue;
} }
info!("✅ Payment verified: {} >= {} sats", paid_sats, required_sats); info!("✅ Payment verified: {} >= {} sats", paid_sats, required_sats);
} }
// Create snapshot // Create snapshot
let snapshot = ProcessStateSnapshot { let snapshot = ProcessStateSnapshot {
process_id: process.process_id.clone(), process_id: process.process_id.clone(),
@ -151,12 +151,12 @@ async fn perform_anchoring_cycle(config: &Config, db: &Arc<Mutex<Database>>) ->
.and_then(|bytes| bytes.try_into().ok()) .and_then(|bytes| bytes.try_into().ok())
.unwrap_or([0u8; 32]), .unwrap_or([0u8; 32]),
}; };
// Compute anchor hash // Compute anchor hash
let anchor_hash = snapshot.compute_anchor_hash(); let anchor_hash = snapshot.compute_anchor_hash();
info!("🔐 Computed anchor hash: {}", hex::encode(&anchor_hash)); info!("🔐 Computed anchor hash: {}", hex::encode(&anchor_hash));
// Create anchor record // Create anchor record
let anchor = Anchor { let anchor = Anchor {
id: None, id: None,
@ -170,25 +170,25 @@ async fn perform_anchoring_cycle(config: &Config, db: &Arc<Mutex<Database>>) ->
created_at: Utc::now(), created_at: Utc::now(),
status: "PENDING".to_string(), status: "PENDING".to_string(),
}; };
// Save to database // Save to database
let db_lock = db.lock().await; let db_lock = db.lock().await;
let anchor_id = db_lock.insert_anchor(&anchor).await?; let anchor_id = db_lock.insert_anchor(&anchor).await?;
drop(db_lock); drop(db_lock);
info!("💾 Anchor record created with id: {}", anchor_id); info!("💾 Anchor record created with id: {}", anchor_id);
// Create and broadcast Bitcoin transaction // Create and broadcast Bitcoin transaction
match create_and_broadcast_anchor_tx(&anchor_hash, &rpc_client, config).await { match create_and_broadcast_anchor_tx(&anchor_hash, &rpc_client, config).await {
Ok(txid) => { Ok(txid) => {
info!("✅ Anchor broadcasted! Txid: {}", txid); info!("✅ Anchor broadcasted! Txid: {}", txid);
// Update anchor with txid // Update anchor with txid
let db_lock = db.lock().await; let db_lock = db.lock().await;
// TODO: Add update_anchor method to Database // TODO: Add update_anchor method to Database
// For now, the anchor remains with status PENDING // For now, the anchor remains with status PENDING
drop(db_lock); drop(db_lock);
info!("🎉 Process {} successfully anchored on Bitcoin mainnet!", process.process_id); info!("🎉 Process {} successfully anchored on Bitcoin mainnet!", process.process_id);
} }
Err(e) => { Err(e) => {
@ -196,7 +196,7 @@ async fn perform_anchoring_cycle(config: &Config, db: &Arc<Mutex<Database>>) ->
} }
} }
} }
Ok(()) Ok(())
} }
@ -206,25 +206,25 @@ async fn create_and_broadcast_anchor_tx(
config: &Config, config: &Config,
) -> Result<Txid> { ) -> Result<Txid> {
info!("🔨 Creating anchor transaction..."); info!("🔨 Creating anchor transaction...");
// Load wallet // Load wallet
if let Err(e) = rpc.load_wallet(&config.bitcoin.wallet_name) { if let Err(e) = rpc.load_wallet(&config.bitcoin.wallet_name) {
warn!("Wallet load warning (may already be loaded): {}", e); warn!("Wallet load warning (may already be loaded): {}", e);
} }
// Get a new address from our wallet for change // Get a new address from our wallet for change
let change_address = rpc.get_new_address(None, None)?; let change_address = rpc.get_new_address(None, None)?;
// Create OP_RETURN output // Create OP_RETURN output
let op_return_script = create_op_return_anchor(anchor_hash); let op_return_script = create_op_return_anchor(anchor_hash);
let op_return_output = TxOut { let op_return_output = TxOut {
value: Amount::ZERO, value: Amount::ZERO,
script_pubkey: op_return_script, script_pubkey: op_return_script,
}; };
// We need at least one input to pay for fees // We need at least one input to pay for fees
// Use Bitcoin Core's fundrawtransaction to handle this // Use Bitcoin Core's fundrawtransaction to handle this
// Create a basic transaction with just the OP_RETURN output // Create a basic transaction with just the OP_RETURN output
let mut tx = Transaction { let mut tx = Transaction {
version: 2, version: 2,
@ -232,31 +232,31 @@ async fn create_and_broadcast_anchor_tx(
input: vec![], input: vec![],
output: vec![op_return_output], output: vec![op_return_output],
}; };
// Serialize the transaction // Serialize the transaction
let tx_hex = bitcoin::consensus::encode::serialize_hex(&tx); let tx_hex = bitcoin::consensus::encode::serialize_hex(&tx);
// Fund the transaction (adds inputs and change output) // Fund the transaction (adds inputs and change output)
let funded = rpc.fund_raw_transaction(&tx_hex, None, None)?; let funded = rpc.fund_raw_transaction(&tx_hex, None, None)?;
// Sign the transaction // Sign the transaction
let signed = rpc.sign_raw_transaction_with_wallet(&funded.hex, None, None)?; let signed = rpc.sign_raw_transaction_with_wallet(&funded.hex, None, None)?;
if !signed.complete { if !signed.complete {
return Err(anyhow!("Transaction signing incomplete")); return Err(anyhow!("Transaction signing incomplete"));
} }
// Test the transaction first // Test the transaction first
let test_result = rpc.test_mempool_accept(&[signed.hex.clone()])?; let test_result = rpc.test_mempool_accept(&[signed.hex.clone()])?;
if !test_result[0].allowed { if !test_result[0].allowed {
return Err(anyhow!("Transaction rejected by mempool: {:?}", test_result[0].reject_reason)); return Err(anyhow!("Transaction rejected by mempool: {:?}", test_result[0].reject_reason));
} }
// Broadcast // Broadcast
let txid = rpc.send_raw_transaction(&signed.hex)?; let txid = rpc.send_raw_transaction(&signed.hex)?;
info!("📡 Transaction broadcasted: {}", txid); info!("📡 Transaction broadcasted: {}", txid);
Ok(txid) Ok(txid)
} }
@ -264,7 +264,7 @@ fn create_op_return_anchor(hash: &[u8; 32]) -> ScriptBuf {
let mut data = Vec::new(); let mut data = Vec::new();
data.extend_from_slice(b"NKA1"); // Protocol identifier: 4NK Anchor v1 data.extend_from_slice(b"NKA1"); // Protocol identifier: 4NK Anchor v1
data.extend_from_slice(hash); data.extend_from_slice(hash);
ScriptBuf::new_op_return(&data) ScriptBuf::new_op_return(&data)
} }
@ -280,7 +280,7 @@ pub fn verify_anchor(tx: &Transaction, expected_hash: &[u8; 32]) -> bool {
} else { } else {
3 // Multi-byte push 3 // Multi-byte push
}; };
if script_bytes.len() >= data_start + 36 { if script_bytes.len() >= data_start + 36 {
let protocol_id = &script_bytes[data_start..data_start + 4]; let protocol_id = &script_bytes[data_start..data_start + 4];
if protocol_id == b"NKA1" { if protocol_id == b"NKA1" {

View File

@ -8,22 +8,22 @@ use crate::db::Database;
lazy_static! { lazy_static! {
static ref REGISTRY: Registry = Registry::new(); static ref REGISTRY: Registry = Registry::new();
static ref PROCESSES_TOTAL: IntGauge = IntGauge::new( static ref PROCESSES_TOTAL: IntGauge = IntGauge::new(
"4nk_certificator_processes_total", "4nk_certificator_processes_total",
"Total number of monitored processes" "Total number of monitored processes"
).unwrap(); ).unwrap();
static ref ANCHORS_CREATED: IntCounter = IntCounter::new( static ref ANCHORS_CREATED: IntCounter = IntCounter::new(
"4nk_certificator_anchors_created_total", "4nk_certificator_anchors_created_total",
"Total number of anchors created" "Total number of anchors created"
).unwrap(); ).unwrap();
static ref ANCHORS_CONFIRMED: IntCounter = IntCounter::new( static ref ANCHORS_CONFIRMED: IntCounter = IntCounter::new(
"4nk_certificator_anchors_confirmed_total", "4nk_certificator_anchors_confirmed_total",
"Total number of anchors confirmed on-chain" "Total number of anchors confirmed on-chain"
).unwrap(); ).unwrap();
static ref DATA_VOLUME_BYTES: IntCounter = IntCounter::new( static ref DATA_VOLUME_BYTES: IntCounter = IntCounter::new(
"4nk_certificator_data_volume_bytes", "4nk_certificator_data_volume_bytes",
"Total data volume monitored in bytes" "Total data volume monitored in bytes"
@ -43,18 +43,18 @@ pub async fn metrics_handler(db: web::Data<Arc<Mutex<Database>>>) -> HttpRespons
if let Ok(processes) = db_lock.get_all_processes().await { if let Ok(processes) = db_lock.get_all_processes().await {
PROCESSES_TOTAL.set(processes.len() as i64); PROCESSES_TOTAL.set(processes.len() as i64);
} }
// TODO: Count anchors and data volume // TODO: Count anchors and data volume
} }
let encoder = TextEncoder::new(); let encoder = TextEncoder::new();
let metric_families = REGISTRY.gather(); let metric_families = REGISTRY.gather();
let mut buffer = vec![]; let mut buffer = vec![];
if let Err(e) = encoder.encode(&metric_families, &mut buffer) { if let Err(e) = encoder.encode(&metric_families, &mut buffer) {
return HttpResponse::InternalServerError().body(format!("Failed to encode metrics: {}", e)); return HttpResponse::InternalServerError().body(format!("Failed to encode metrics: {}", e));
} }
HttpResponse::Ok() HttpResponse::Ok()
.content_type("text/plain; version=0.0.4") .content_type("text/plain; version=0.0.4")
.body(buffer) .body(buffer)
@ -71,4 +71,3 @@ pub fn increment_anchors_confirmed() {
pub fn add_data_volume(bytes: u64) { pub fn add_data_volume(bytes: u64) {
DATA_VOLUME_BYTES.inc_by(bytes); DATA_VOLUME_BYTES.inc_by(bytes);
} }

View File

@ -10,30 +10,30 @@ pub async fn get_payments(
) -> HttpResponse { ) -> HttpResponse {
let address = path.into_inner(); let address = path.into_inner();
let db = db.lock().await; let db = db.lock().await;
// Get all processes with this address // Get all processes with this address
match db.get_all_processes().await { match db.get_all_processes().await {
Ok(processes) => { Ok(processes) => {
let matching_processes: Vec<_> = processes.iter() let matching_processes: Vec<_> = processes.iter()
.filter(|p| p.btc_address.as_ref() == Some(&address)) .filter(|p| p.btc_address.as_ref() == Some(&address))
.collect(); .collect();
if matching_processes.is_empty() { if matching_processes.is_empty() {
return HttpResponse::NotFound().json(serde_json::json!({ return HttpResponse::NotFound().json(serde_json::json!({
"error": "No processes found with this address" "error": "No processes found with this address"
})); }));
} }
let mut all_payments = vec![]; let mut all_payments = vec![];
let mut total_received: i64 = 0; let mut total_received: i64 = 0;
// TODO: Add get_payments_for_address method to Database // TODO: Add get_payments_for_address method to Database
// For now, return process info // For now, return process info
for process in matching_processes { for process in matching_processes {
total_received += process.total_paid_sats; total_received += process.total_paid_sats;
} }
HttpResponse::Ok().json(serde_json::json!({ HttpResponse::Ok().json(serde_json::json!({
"address": address, "address": address,
"total_received_sats": total_received, "total_received_sats": total_received,
@ -46,4 +46,3 @@ pub async fn get_payments(
})) }))
} }
} }

View File

@ -22,16 +22,16 @@ pub async fn verify_anchor_endpoint(
"error": format!("Invalid anchor hash: {}", e) "error": format!("Invalid anchor hash: {}", e)
})) }))
}; };
if anchor_hash_bytes.len() != 32 { if anchor_hash_bytes.len() != 32 {
return HttpResponse::BadRequest().json(serde_json::json!({ return HttpResponse::BadRequest().json(serde_json::json!({
"error": "Anchor hash must be 32 bytes" "error": "Anchor hash must be 32 bytes"
})); }));
} }
let mut hash_array = [0u8; 32]; let mut hash_array = [0u8; 32];
hash_array.copy_from_slice(&anchor_hash_bytes); hash_array.copy_from_slice(&anchor_hash_bytes);
// Parse txid // Parse txid
let txid = match req.txid.parse() { let txid = match req.txid.parse() {
Ok(t) => t, Ok(t) => t,
@ -39,7 +39,7 @@ pub async fn verify_anchor_endpoint(
"error": format!("Invalid txid: {}", e) "error": format!("Invalid txid: {}", e)
})) }))
}; };
// Connect to Bitcoin RPC // Connect to Bitcoin RPC
let rpc_client = match Client::new( let rpc_client = match Client::new(
&config.bitcoin.rpc_url, &config.bitcoin.rpc_url,
@ -53,7 +53,7 @@ pub async fn verify_anchor_endpoint(
"error": format!("Failed to connect to Bitcoin RPC: {}", e) "error": format!("Failed to connect to Bitcoin RPC: {}", e)
})) }))
}; };
// Get transaction // Get transaction
let tx_info = match rpc_client.get_raw_transaction_info(&txid, None) { let tx_info = match rpc_client.get_raw_transaction_info(&txid, None) {
Ok(info) => info, Ok(info) => info,
@ -65,20 +65,20 @@ pub async fn verify_anchor_endpoint(
} }
})) }))
}; };
let tx = match tx_info.transaction() { let tx = match tx_info.transaction() {
Some(t) => t, Some(t) => t,
None => return HttpResponse::InternalServerError().json(serde_json::json!({ None => return HttpResponse::InternalServerError().json(serde_json::json!({
"error": "Failed to decode transaction" "error": "Failed to decode transaction"
})) }))
}; };
// Verify anchor // Verify anchor
let hash_matches = verify_anchor(&tx, &hash_array); let hash_matches = verify_anchor(&tx, &hash_array);
let confirmations = tx_info.confirmations.unwrap_or(0); let confirmations = tx_info.confirmations.unwrap_or(0);
let block_height = tx_info.blockheight; let block_height = tx_info.blockheight;
HttpResponse::Ok().json(serde_json::json!({ HttpResponse::Ok().json(serde_json::json!({
"valid": hash_matches, "valid": hash_matches,
"details": { "details": {
@ -90,4 +90,3 @@ pub async fn verify_anchor_endpoint(
} }
})) }))
} }

View File

@ -32,11 +32,11 @@ async fn main() -> Result<()> {
// Run migrations // Run migrations
database.run_migrations().await?; database.run_migrations().await?;
info!("✅ Database migrations applied"); info!("✅ Database migrations applied");
// Initialize Prometheus metrics // Initialize Prometheus metrics
api::metrics_prometheus::init_metrics(); api::metrics_prometheus::init_metrics();
info!("📊 Prometheus metrics initialized"); info!("📊 Prometheus metrics initialized");
// Wrap database in Arc<Mutex> for shared state // Wrap database in Arc<Mutex> for shared state
let db = Arc::new(Mutex::new(database)); let db = Arc::new(Mutex::new(database));
@ -48,7 +48,7 @@ async fn main() -> Result<()> {
monitor::start_monitoring(config, db).await monitor::start_monitoring(config, db).await
} }
}); });
let payment_handle = tokio::spawn({ let payment_handle = tokio::spawn({
let config = config.clone(); let config = config.clone();
let db = db.clone(); let db = db.clone();
@ -56,7 +56,7 @@ async fn main() -> Result<()> {
payment_watcher::start_payment_watching(config, db).await payment_watcher::start_payment_watching(config, db).await
} }
}); });
let anchor_handle = tokio::spawn({ let anchor_handle = tokio::spawn({
let config = config.clone(); let config = config.clone();
let db = db.clone(); let db = db.clone();

View File

@ -14,7 +14,7 @@ use crate::models::{Process, Metric, PriceConfig};
pub async fn start_monitoring(config: Config, db: Arc<Mutex<Database>>) -> Result<()> { pub async fn start_monitoring(config: Config, db: Arc<Mutex<Database>>) -> Result<()> {
info!("📡 Starting relay monitoring..."); info!("📡 Starting relay monitoring...");
loop { loop {
match monitor_relay(&config, &db).await { match monitor_relay(&config, &db).await {
Ok(_) => { Ok(_) => {
@ -30,15 +30,15 @@ pub async fn start_monitoring(config: Config, db: Arc<Mutex<Database>>) -> Resul
async fn monitor_relay(config: &Config, db: &Arc<Mutex<Database>>) -> Result<()> { async fn monitor_relay(config: &Config, db: &Arc<Mutex<Database>>) -> Result<()> {
info!("🔌 Connecting to relay at {}", config.relay.websocket_url); info!("🔌 Connecting to relay at {}", config.relay.websocket_url);
let (ws_stream, _) = connect_async(&config.relay.websocket_url) let (ws_stream, _) = connect_async(&config.relay.websocket_url)
.await .await
.map_err(|e| anyhow!("Failed to connect to relay: {}", e))?; .map_err(|e| anyhow!("Failed to connect to relay: {}", e))?;
info!("✅ Connected to relay WebSocket"); info!("✅ Connected to relay WebSocket");
let (mut write, mut read) = ws_stream.split(); let (mut write, mut read) = ws_stream.split();
// Send a ping to keep connection alive // Send a ping to keep connection alive
tokio::spawn(async move { tokio::spawn(async move {
loop { loop {
@ -49,7 +49,7 @@ async fn monitor_relay(config: &Config, db: &Arc<Mutex<Database>>) -> Result<()>
sleep(Duration::from_secs(30)).await; sleep(Duration::from_secs(30)).await;
} }
}); });
// Listen for messages // Listen for messages
while let Some(msg) = read.next().await { while let Some(msg) = read.next().await {
match msg { match msg {
@ -75,19 +75,19 @@ async fn monitor_relay(config: &Config, db: &Arc<Mutex<Database>>) -> Result<()>
_ => {} _ => {}
} }
} }
Ok(()) Ok(())
} }
async fn handle_relay_message(text: &str, db: &Arc<Mutex<Database>>) -> Result<()> { async fn handle_relay_message(text: &str, db: &Arc<Mutex<Database>>) -> Result<()> {
let envelope: Value = serde_json::from_str(text)?; let envelope: Value = serde_json::from_str(text)?;
let flag = envelope.get("flag") let flag = envelope.get("flag")
.and_then(|f| f.as_str()) .and_then(|f| f.as_str())
.unwrap_or("Unknown"); .unwrap_or("Unknown");
debug!("📨 Received message with flag: {}", flag); debug!("📨 Received message with flag: {}", flag);
match flag { match flag {
"Handshake" => { "Handshake" => {
handle_handshake(&envelope, db).await?; handle_handshake(&envelope, db).await?;
@ -105,7 +105,7 @@ async fn handle_relay_message(text: &str, db: &Arc<Mutex<Database>>) -> Result<(
.and_then(|c| c.as_str()) .and_then(|c| c.as_str())
.map(|s| s.len()) .map(|s| s.len())
.unwrap_or(0); .unwrap_or(0);
debug!("Data message size: {} bytes", content_size); debug!("Data message size: {} bytes", content_size);
// TODO: Attribute to specific process if possible // TODO: Attribute to specific process if possible
} }
@ -113,26 +113,26 @@ async fn handle_relay_message(text: &str, db: &Arc<Mutex<Database>>) -> Result<(
debug!("Unhandled message flag: {}", flag); debug!("Unhandled message flag: {}", flag);
} }
} }
Ok(()) Ok(())
} }
async fn handle_handshake(envelope: &Value, db: &Arc<Mutex<Database>>) -> Result<()> { async fn handle_handshake(envelope: &Value, db: &Arc<Mutex<Database>>) -> Result<()> {
info!("🤝 Processing Handshake message"); info!("🤝 Processing Handshake message");
let content = envelope.get("content") let content = envelope.get("content")
.and_then(|c| c.as_str()) .and_then(|c| c.as_str())
.ok_or_else(|| anyhow!("No content in handshake"))?; .ok_or_else(|| anyhow!("No content in handshake"))?;
let handshake: Value = serde_json::from_str(content)?; let handshake: Value = serde_json::from_str(content)?;
// Extract processes list // Extract processes list
if let Some(processes_list) = handshake.get("processes_list") { if let Some(processes_list) = handshake.get("processes_list") {
if let Some(processes_map) = processes_list.as_object() { if let Some(processes_map) = processes_list.as_object() {
info!("📦 Found {} processes in handshake", processes_map.len()); info!("📦 Found {} processes in handshake", processes_map.len());
let db_lock = db.lock().await; let db_lock = db.lock().await;
for (process_id, process_data) in processes_map { for (process_id, process_data) in processes_map {
// Extract process information // Extract process information
if let Err(e) = sync_process_from_handshake(process_id, process_data, &db_lock).await { if let Err(e) = sync_process_from_handshake(process_id, process_data, &db_lock).await {
@ -141,7 +141,7 @@ async fn handle_handshake(envelope: &Value, db: &Arc<Mutex<Database>>) -> Result
} }
} }
} }
Ok(()) Ok(())
} }
@ -151,10 +151,10 @@ async fn sync_process_from_handshake(
db: &Database, db: &Database,
) -> Result<()> { ) -> Result<()> {
debug!("Syncing process: {}", process_id); debug!("Syncing process: {}", process_id);
// Try to extract price configuration from public_data or roles // Try to extract price configuration from public_data or roles
let price_config = extract_price_config(process_data); let price_config = extract_price_config(process_data);
let process = Process { let process = Process {
process_id: process_id.to_string(), process_id: process_id.to_string(),
created_at: Utc::now(), created_at: Utc::now(),
@ -165,17 +165,17 @@ async fn sync_process_from_handshake(
total_paid_sats: 0, total_paid_sats: 0,
state_merkle_root: None, state_merkle_root: None,
}; };
db.upsert_process(&process).await?; db.upsert_process(&process).await?;
info!("✅ Synced process: {}", process_id); info!("✅ Synced process: {}", process_id);
Ok(()) Ok(())
} }
fn extract_price_config(process_data: &Value) -> Option<PriceConfig> { fn extract_price_config(process_data: &Value) -> Option<PriceConfig> {
// Try to find price configuration in the process data // Try to find price configuration in the process data
// It could be in public_data or in the process state // It could be in public_data or in the process state
if let Some(states) = process_data.get("states").and_then(|s| s.as_array()) { if let Some(states) = process_data.get("states").and_then(|s| s.as_array()) {
for state in states { for state in states {
if let Some(public_data) = state.get("public_data") { if let Some(public_data) = state.get("public_data") {
@ -185,23 +185,23 @@ fn extract_price_config(process_data: &Value) -> Option<PriceConfig> {
} }
} }
} }
// Also check root level // Also check root level
if let Some(price) = process_data.get("price") { if let Some(price) = process_data.get("price") {
return parse_price_config(price); return parse_price_config(price);
} }
None None
} }
fn parse_price_config(price_value: &Value) -> Option<PriceConfig> { fn parse_price_config(price_value: &Value) -> Option<PriceConfig> {
let price_mo_sats = price_value.get("priceMoSats") let price_mo_sats = price_value.get("priceMoSats")
.and_then(|v| v.as_u64())?; .and_then(|v| v.as_u64())?;
let btc_address = price_value.get("btcAddress") let btc_address = price_value.get("btcAddress")
.and_then(|v| v.as_str())? .and_then(|v| v.as_str())?
.to_string(); .to_string();
Some(PriceConfig { Some(PriceConfig {
price_mo_sats, price_mo_sats,
btc_address, btc_address,
@ -210,20 +210,20 @@ fn parse_price_config(price_value: &Value) -> Option<PriceConfig> {
async fn handle_commit_message(envelope: &Value, db: &Arc<Mutex<Database>>) -> Result<()> { async fn handle_commit_message(envelope: &Value, db: &Arc<Mutex<Database>>) -> Result<()> {
debug!("📝 Processing Commit message"); debug!("📝 Processing Commit message");
let content = envelope.get("content") let content = envelope.get("content")
.and_then(|c| c.as_str()) .and_then(|c| c.as_str())
.ok_or_else(|| anyhow!("No content in commit"))?; .ok_or_else(|| anyhow!("No content in commit"))?;
let commit_msg: Value = serde_json::from_str(content)?; let commit_msg: Value = serde_json::from_str(content)?;
let process_id = commit_msg.get("process_id") let process_id = commit_msg.get("process_id")
.and_then(|p| p.as_str()) .and_then(|p| p.as_str())
.ok_or_else(|| anyhow!("No process_id in commit"))?; .ok_or_else(|| anyhow!("No process_id in commit"))?;
// Estimate data size from the commit message // Estimate data size from the commit message
let data_size = content.len() as i64; let data_size = content.len() as i64;
// Record metric // Record metric
let metric = Metric { let metric = Metric {
id: None, id: None,
@ -234,11 +234,11 @@ async fn handle_commit_message(envelope: &Value, db: &Arc<Mutex<Database>>) -> R
bytes_received: 0, bytes_received: 0,
message_count: 1, message_count: 1,
}; };
let db_lock = db.lock().await; let db_lock = db.lock().await;
db_lock.insert_metric(&metric).await?; db_lock.insert_metric(&metric).await?;
debug!("📊 Recorded metric for process {}: {} bytes", process_id, data_size); debug!("📊 Recorded metric for process {}: {} bytes", process_id, data_size);
Ok(()) Ok(())
} }

View File

@ -14,7 +14,7 @@ use crate::models::Payment;
pub async fn start_payment_watching(config: Config, db: Arc<Mutex<Database>>) -> Result<()> { pub async fn start_payment_watching(config: Config, db: Arc<Mutex<Database>>) -> Result<()> {
info!("💰 Starting payment watcher..."); info!("💰 Starting payment watcher...");
loop { loop {
match watch_payments(&config, &db).await { match watch_payments(&config, &db).await {
Ok(_) => { Ok(_) => {
@ -30,7 +30,7 @@ pub async fn start_payment_watching(config: Config, db: Arc<Mutex<Database>>) ->
async fn watch_payments(config: &Config, db: &Arc<Mutex<Database>>) -> Result<()> { async fn watch_payments(config: &Config, db: &Arc<Mutex<Database>>) -> Result<()> {
info!("🔌 Connecting to Bitcoin RPC at {}", config.bitcoin.rpc_url); info!("🔌 Connecting to Bitcoin RPC at {}", config.bitcoin.rpc_url);
let rpc_client = Client::new( let rpc_client = Client::new(
&config.bitcoin.rpc_url, &config.bitcoin.rpc_url,
Auth::UserPass( Auth::UserPass(
@ -38,40 +38,40 @@ async fn watch_payments(config: &Config, db: &Arc<Mutex<Database>>) -> Result<()
config.bitcoin.rpc_password.clone(), config.bitcoin.rpc_password.clone(),
), ),
)?; )?;
info!("✅ Connected to Bitcoin RPC"); info!("✅ Connected to Bitcoin RPC");
// Verify connection // Verify connection
let blockchain_info = rpc_client.get_blockchain_info()?; let blockchain_info = rpc_client.get_blockchain_info()?;
info!("📊 Bitcoin network: {}, blocks: {}", info!("📊 Bitcoin network: {}, blocks: {}",
blockchain_info.chain, blockchain_info.chain,
blockchain_info.blocks); blockchain_info.blocks);
let mut last_block_height = blockchain_info.blocks; let mut last_block_height = blockchain_info.blocks;
loop { loop {
// Check for new blocks // Check for new blocks
let current_info = rpc_client.get_blockchain_info()?; let current_info = rpc_client.get_blockchain_info()?;
let current_height = current_info.blocks; let current_height = current_info.blocks;
if current_height > last_block_height { if current_height > last_block_height {
info!("🔔 New block detected: {} -> {}", last_block_height, current_height); info!("🔔 New block detected: {} -> {}", last_block_height, current_height);
// Scan new blocks for payments to our watched addresses // Scan new blocks for payments to our watched addresses
for height in (last_block_height + 1)..=current_height { for height in (last_block_height + 1)..=current_height {
if let Err(e) = scan_block_for_payments(height, &rpc_client, db, config).await { if let Err(e) = scan_block_for_payments(height, &rpc_client, db, config).await {
error!("Failed to scan block {}: {}", height, e); error!("Failed to scan block {}: {}", height, e);
} }
} }
last_block_height = current_height; last_block_height = current_height;
} }
// Check mempool for pending payments // Check mempool for pending payments
if let Err(e) = scan_mempool_for_payments(&rpc_client, db).await { if let Err(e) = scan_mempool_for_payments(&rpc_client, db).await {
warn!("Failed to scan mempool: {}", e); warn!("Failed to scan mempool: {}", e);
} }
sleep(Duration::from_secs(config.relay.monitor_interval_secs)).await; sleep(Duration::from_secs(config.relay.monitor_interval_secs)).await;
} }
} }
@ -83,14 +83,14 @@ async fn scan_block_for_payments(
config: &Config, config: &Config,
) -> Result<()> { ) -> Result<()> {
debug!("🔍 Scanning block {} for payments", block_height); debug!("🔍 Scanning block {} for payments", block_height);
let block_hash = rpc.get_block_hash(block_height)?; let block_hash = rpc.get_block_hash(block_height)?;
let block = rpc.get_block(&block_hash)?; let block = rpc.get_block(&block_hash)?;
let db_lock = db.lock().await; let db_lock = db.lock().await;
let processes = db_lock.get_all_processes().await?; let processes = db_lock.get_all_processes().await?;
drop(db_lock); drop(db_lock);
// Get all addresses we're watching // Get all addresses we're watching
let watched_addresses: Vec<(String, Address)> = processes.iter() let watched_addresses: Vec<(String, Address)> = processes.iter()
.filter_map(|p| { .filter_map(|p| {
@ -101,13 +101,13 @@ async fn scan_block_for_payments(
}) })
}) })
.collect(); .collect();
if watched_addresses.is_empty() { if watched_addresses.is_empty() {
return Ok(()); return Ok(());
} }
debug!("👀 Watching {} addresses", watched_addresses.len()); debug!("👀 Watching {} addresses", watched_addresses.len());
// Scan all transactions in the block // Scan all transactions in the block
for tx in &block.txdata { for tx in &block.txdata {
for (vout_idx, output) in tx.output.iter().enumerate() { for (vout_idx, output) in tx.output.iter().enumerate() {
@ -116,10 +116,10 @@ async fn scan_block_for_payments(
for (process_id, watched_addr) in &watched_addresses { for (process_id, watched_addr) in &watched_addresses {
if addr == *watched_addr { if addr == *watched_addr {
let amount_sats = output.value.to_sat() as i64; let amount_sats = output.value.to_sat() as i64;
info!("💸 Payment detected! Process: {}, Amount: {} sats, Tx: {}", info!("💸 Payment detected! Process: {}, Amount: {} sats, Tx: {}",
process_id, amount_sats, tx.txid()); process_id, amount_sats, tx.txid());
// Record payment // Record payment
let payment = Payment { let payment = Payment {
id: None, id: None,
@ -131,7 +131,7 @@ async fn scan_block_for_payments(
block_height: Some(block_height as i32), block_height: Some(block_height as i32),
confirmations: 1, confirmations: 1,
}; };
let db_lock = db.lock().await; let db_lock = db.lock().await;
if let Err(e) = db_lock.insert_payment(&payment).await { if let Err(e) = db_lock.insert_payment(&payment).await {
error!("Failed to insert payment: {}", e); error!("Failed to insert payment: {}", e);
@ -140,17 +140,17 @@ async fn scan_block_for_payments(
if let Ok(Some(mut process)) = db_lock.get_process(process_id).await { if let Ok(Some(mut process)) = db_lock.get_process(process_id).await {
process.total_paid_sats += amount_sats; process.total_paid_sats += amount_sats;
process.payment_status = if confirmations_sufficient( process.payment_status = if confirmations_sufficient(
1, 1,
config.bitcoin.min_confirmations config.bitcoin.min_confirmations
) { ) {
"PAID".to_string() "PAID".to_string()
} else { } else {
"PENDING".to_string() "PENDING".to_string()
}; };
let _ = db_lock.upsert_process(&process).await; let _ = db_lock.upsert_process(&process).await;
info!("✅ Updated process {} payment status: {}", info!("✅ Updated process {} payment status: {}",
process_id, process.payment_status); process_id, process.payment_status);
} }
} }
@ -160,7 +160,7 @@ async fn scan_block_for_payments(
} }
} }
} }
Ok(()) Ok(())
} }
@ -170,17 +170,17 @@ async fn scan_mempool_for_payments(
) -> Result<()> { ) -> Result<()> {
// Get mempool transactions // Get mempool transactions
let mempool_txids = rpc.get_raw_mempool()?; let mempool_txids = rpc.get_raw_mempool()?;
if mempool_txids.is_empty() { if mempool_txids.is_empty() {
return Ok(()); return Ok(());
} }
debug!("🔍 Scanning {} mempool transactions", mempool_txids.len()); debug!("🔍 Scanning {} mempool transactions", mempool_txids.len());
let db_lock = db.lock().await; let db_lock = db.lock().await;
let processes = db_lock.get_all_processes().await?; let processes = db_lock.get_all_processes().await?;
drop(db_lock); drop(db_lock);
// Get all addresses we're watching // Get all addresses we're watching
let watched_addresses: Vec<(String, Address)> = processes.iter() let watched_addresses: Vec<(String, Address)> = processes.iter()
.filter_map(|p| { .filter_map(|p| {
@ -191,11 +191,11 @@ async fn scan_mempool_for_payments(
}) })
}) })
.collect(); .collect();
if watched_addresses.is_empty() { if watched_addresses.is_empty() {
return Ok(()); return Ok(());
} }
// Scan transactions (limit to avoid overload) // Scan transactions (limit to avoid overload)
for txid in mempool_txids.iter().take(100) { for txid in mempool_txids.iter().take(100) {
if let Ok(tx_info) = rpc.get_raw_transaction_info(txid, None) { if let Ok(tx_info) = rpc.get_raw_transaction_info(txid, None) {
@ -205,10 +205,10 @@ async fn scan_mempool_for_payments(
for (process_id, watched_addr) in &watched_addresses { for (process_id, watched_addr) in &watched_addresses {
if addr == *watched_addr { if addr == *watched_addr {
let amount_sats = output.value.to_sat() as i64; let amount_sats = output.value.to_sat() as i64;
info!("💸 Pending payment in mempool! Process: {}, Amount: {} sats", info!("💸 Pending payment in mempool! Process: {}, Amount: {} sats",
process_id, amount_sats); process_id, amount_sats);
// Update process status to PENDING // Update process status to PENDING
let db_lock = db.lock().await; let db_lock = db.lock().await;
if let Ok(Some(mut process)) = db_lock.get_process(process_id).await { if let Ok(Some(mut process)) = db_lock.get_process(process_id).await {
@ -225,11 +225,10 @@ async fn scan_mempool_for_payments(
} }
} }
} }
Ok(()) Ok(())
} }
fn confirmations_sufficient(current: u32, required: u32) -> bool { fn confirmations_sufficient(current: u32, required: u32) -> bool {
current >= required current >= required
} }