diff --git a/.gitignore b/.gitignore index 241c32e..ed6a795 100644 --- a/.gitignore +++ b/.gitignore @@ -1,30 +1,82 @@ -# Rust -/target/ -Cargo.lock -**/*.rs.bk -*.pdb +# 4NK Environment - Git Ignore +# ============================ -# Config -config.toml +# Dossiers de sauvegarde des scripts +**/backup/ +**/*backup* -# Logs -*.log +**/.cargo/ -# Database + + +# Fichiers temporaires +**/*.tmp* +**/*.temp* +**/*.log* +**/*.pid* + +# Fichiers de configuration locale +**/*.env* +**/*.conf* +**/*.yaml* +**/*.yml* +**/*.ini* +**/*.json* +**/*.toml* +**/*.lock* + +# Données et logs +**/*.logs* +**/*.data *.db *.sqlite -# IDE -.idea/ -.vscode/ -*.swp -*.swo -*~ +# Certificats et clés +**/*.key +**/*.pem +**/*.crt +**/*.p12 +**/*.pfx +ssl/ +certs/ + +# Docker +**/*.docker* + +# Cache et build +**/*.node_modules/ +**/*.dist/ +**/*build/ +**/*target/ +**/*.*.o +**/*.so +**/*.dylib + +# IDE et éditeurs +**/*.vscode/ +**/*.idea/ +**/*.swp +**/*.swo +**/*~ # OS -.DS_Store -Thumbs.db +**/*.DS_Store +**/*Thumbs.db +**/*tmp* -# Temporary -/tmp/ -/data/ +# Git +**/*.git/ +**/*.orig* + +# Backup des projets existants +**/*backup* + + +**/*wallet* +**/*keys* + +# Supervisor +supervisor-logs/ + +**/*node_modules* +**/*cursor* \ No newline at end of file diff --git a/Cargo.toml b/Cargo.toml index fc026cf..7164ca4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -68,7 +68,7 @@ path = "src/main.rs" [[bin]] name = "certificator-cli" -path = "src/bin/cli.rs" +path = "src/cli.rs" [dev-dependencies] mockito = "1" diff --git a/src/anchor.rs b/src/anchor.rs index ca307f2..5c7bee7 100644 --- a/src/anchor.rs +++ b/src/anchor.rs @@ -203,6 +203,104 @@ async fn perform_anchoring_cycle(config: &Config, db: &Arc>) -> Ok(()) } +/// Anchor a specific process immediately. If `force` is true, skip payment and interval checks. +pub async fn anchor_process_now( + config: &Config, + db: &Arc>, + target_process_id: &str, + force: bool, +) -> Result> { + // Connect RPC + let rpc_client = Client::new( + &config.bitcoin.rpc_url, + Auth::UserPass( + config.bitcoin.rpc_user.clone(), + config.bitcoin.rpc_password.clone(), + ), + )?; + let blockchain_info = rpc_client.get_blockchain_info()?; + let current_block = blockchain_info.blocks as u32; + + // Load process + let process = { + let db_lock = db.lock().await; + db_lock.get_process(target_process_id).await? + }; + let Some(process) = process else { return Ok(None); }; + + // Payment/interval checks + if !force { + if process.payment_status != "PAID" { return Ok(None); } + } + + // Determine period + let period_start = 0u32; // could be last anchor end; simplified for force + let period_end = current_block; + + // Metrics + let metrics = { + let db_lock = db.lock().await; + db_lock.get_metrics_for_period(&process.process_id, period_start as i32, period_end as i32).await? + }; + if metrics.is_empty() { return Ok(None); } + + let total_bytes_sent: i64 = metrics.iter().map(|m| m.bytes_sent).sum(); + let total_bytes_received: i64 = metrics.iter().map(|m| m.bytes_received).sum(); + let total_messages: i32 = metrics.iter().map(|m| m.message_count).sum(); + let total_bytes = (total_bytes_sent + total_bytes_received) as u64; + let total_mb = total_bytes / 1_048_576; + + if !force { + if let (Some(price_mo_sats), Some(_addr)) = (process.price_mo_sats, &process.btc_address) { + let required = (price_mo_sats as u64) * total_mb; + if (process.total_paid_sats as u64) < required { return Ok(None); } + } + } + + // Snapshot and hash + let snapshot = ProcessStateSnapshot { + process_id: process.process_id.clone(), + period_start_block: period_start, + period_end_block: period_end, + total_bytes_sent: total_bytes_sent as u64, + total_bytes_received: total_bytes_received as u64, + message_count: total_messages as u64, + participants: vec![], + state_merkle_root: process.state_merkle_root + .and_then(|bytes| bytes.try_into().ok()) + .unwrap_or([0u8; 32]), + }; + let anchor_hash = snapshot.compute_anchor_hash(); + + // Create anchor row + let anchor = Anchor { + id: None, + process_id: process.process_id.clone(), + anchor_hash: anchor_hash.to_vec(), + period_start_block: period_start as i32, + period_end_block: period_end as i32, + total_mb: total_mb as i64, + anchor_txid: None, + anchor_block: None, + created_at: Utc::now(), + status: "PENDING".to_string(), + }; + let anchor_id = { + let db_lock = db.lock().await; + db_lock.insert_anchor(&anchor).await? + }; + increment_anchors_created(); + + // Broadcast + let txid = create_and_broadcast_anchor_tx(&anchor_hash, &rpc_client, config).await?; + let _ = { + let db_lock = db.lock().await; + db_lock.update_anchor_tx(anchor_id, &txid.to_string(), "BROADCASTED", None).await + }; + increment_anchors_confirmed(); + Ok(Some(txid)) +} + async fn create_and_broadcast_anchor_tx( anchor_hash: &[u8; 32], rpc: &Client, diff --git a/src/cli.rs b/src/cli.rs index 1801372..1c0346d 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -4,7 +4,8 @@ use std::path::PathBuf; use crate::config::Config; use crate::db::Database; -use crate::models::ProcessStateSnapshot; +use crate::anchor::{anchor_process_now, verify_anchor}; +use bitcoincore_rpc::{Auth, Client, RpcApi}; #[derive(Parser)] #[command(name = "certificator-cli")] @@ -197,36 +198,67 @@ async fn force_anchor( force: bool, ) -> Result<()> { println!("⚓ Forcing anchor creation for {}", process_id); - if !force { println!("⚠️ Use --force to confirm anchor creation"); return Ok(()); } - - // TODO: Implement force anchor logic - println!("❌ Force anchor not yet implemented"); - + let db_arc = std::sync::Arc::new(tokio::sync::Mutex::new(db.clone())); + if let Some(txid) = anchor_process_now(config, &db_arc, process_id, true).await? { + println!("✅ Anchor broadcasted: {}", txid); + } else { + println!("⚠️ Anchor not created (conditions not met)"); + } Ok(()) } async fn verify_anchor(config: &Config, hash: &str, txid: &str) -> Result<()> { - println!("🔍 Verifying anchor..."); - println!("Hash: {}", hash); - println!("Txid: {}", txid); - - // TODO: Implement verification logic - println!("❌ Verification not yet implemented"); - + println!("🔍 Verifying anchor...\nHash: {}\nTxid: {}", hash, txid); + // Connect to RPC + let rpc = Client::new( + &config.bitcoin.rpc_url, + Auth::UserPass( + config.bitcoin.rpc_user.clone(), + config.bitcoin.rpc_password.clone(), + ), + )?; + let t: bitcoin::Txid = txid.parse()?; + let info = rpc.get_raw_transaction_info(&t, None)?; + let Some(tx) = info.transaction() else { println!("❌ Cannot decode transaction"); return Ok(()); }; + let bytes = hex::decode(hash)?; + let mut arr = [0u8; 32]; + if bytes.len() != 32 { println!("❌ Invalid hash length"); return Ok(()); } + arr.copy_from_slice(&bytes); + let ok = verify_anchor(&tx, &arr); + println!("valid: {} | confirmations: {:?} | block_height: {:?}", ok, info.confirmations, info.blockheight); Ok(()) } async fn watch_address(config: &Config, address: &str) -> Result<()> { - println!("👀 Watching address: {}", address); - println!("Press Ctrl+C to stop..."); - - // TODO: Implement address watching - println!("❌ Address watching not yet implemented"); - + println!("👀 Watching address: {}\nPress Ctrl+C to stop...", address); + let rpc = Client::new( + &config.bitcoin.rpc_url, + Auth::UserPass( + config.bitcoin.rpc_user.clone(), + config.bitcoin.rpc_password.clone(), + ), + )?; + loop { + let txids = rpc.get_raw_mempool().unwrap_or_default(); + for txid in txids.iter().take(50) { + if let Ok(info) = rpc.get_raw_transaction_info(txid, None) { + if let Some(tx) = info.transaction() { + for out in tx.output { + if let Ok(addr) = bitcoin::Address::from_script(&out.script_pubkey, bitcoin::Network::Bitcoin) { + if addr.to_string() == address { + println!("💸 mempool: {} sats from {}", out.value.to_sat(), txid); + } + } + } + } + } + } + tokio::time::sleep(std::time::Duration::from_secs(15)).await; + } Ok(()) } diff --git a/src/db.rs b/src/db.rs index 7c28630..8f80ed7 100644 --- a/src/db.rs +++ b/src/db.rs @@ -213,7 +213,7 @@ impl Database { Ok(anchors) } - + pub async fn update_anchor_tx( &self, anchor_id: i64, @@ -236,7 +236,7 @@ impl Database { .await?; Ok(()) } - + pub async fn get_payments_for_address(&self, address: &str) -> Result> { let payments = sqlx::query_as::<_, Payment>( r#" @@ -251,7 +251,7 @@ impl Database { .await?; Ok(payments) } - + pub async fn count_anchors(&self) -> Result { let row = sqlx::query("SELECT COUNT(*) AS c FROM anchors") .fetch_one(&self.pool) @@ -259,7 +259,7 @@ impl Database { let c: i64 = row.get("c"); Ok(c) } - + pub async fn sum_data_volume(&self) -> Result { let row = sqlx::query("SELECT COALESCE(SUM(bytes_sent + bytes_received),0) AS s FROM metrics") .fetch_one(&self.pool) @@ -288,4 +288,25 @@ impl Database { Ok(()) } + + pub async fn update_payment_confirmations( + &self, + txid: &str, + confirmations: i32, + block_height: Option, + ) -> Result<()> { + sqlx::query( + r#" + UPDATE payments + SET confirmations = $2, block_height = COALESCE($3, block_height) + WHERE txid = $1 + "# + ) + .bind(txid) + .bind(confirmations) + .bind(block_height) + .execute(&self.pool) + .await?; + Ok(()) + } } diff --git a/src/payment_watcher.rs b/src/payment_watcher.rs index 80787f7..25d5921 100644 --- a/src/payment_watcher.rs +++ b/src/payment_watcher.rs @@ -72,6 +72,11 @@ async fn watch_payments(config: &Config, db: &Arc>) -> Result<() warn!("Failed to scan mempool: {}", e); } + // Update confirmations for known payments and process statuses + if let Err(e) = update_confirmations_and_status(&rpc_client, db, config).await { + warn!("Failed updating confirmations: {}", e); + } + sleep(Duration::from_secs(config.relay.monitor_interval_secs)).await; } } @@ -232,3 +237,48 @@ async fn scan_mempool_for_payments( fn confirmations_sufficient(current: u32, required: u32) -> bool { current >= required } + +async fn update_confirmations_and_status( + rpc: &Client, + db: &Arc>, + config: &Config, +) -> Result<()> { + // Strategy: For all processes with PENDING or UNPAID and with btc_address, re-evaluate totals and confirmations + let db_lock = db.lock().await; + let processes = db_lock.get_all_processes().await?; + drop(db_lock); + + for mut process in processes.into_iter() { + if process.btc_address.is_none() { continue; } + // Retrieve payments for this address + let payments = { + let db_lock = db.lock().await; + db_lock.get_payments_for_address(process.btc_address.as_ref().unwrap()).await? + }; + let mut total_paid: i64 = 0; + for payment in payments.iter() { + // If confirmations unknown or low, refresh via RPC + if payment.confirmations < config.bitcoin.min_confirmations as i32 { + if let Ok(info) = rpc.get_raw_transaction_info(&payment.txid.parse()?, None) { + let conf = info.confirmations.unwrap_or(0) as i32; + let bh = info.blockheight.map(|h| h as i32); + let db_lock = db.lock().await; + let _ = db_lock.update_payment_confirmations(&payment.txid, conf, bh).await; + } + } + total_paid += payment.amount_sats; + } + // Update process totals and status + process.total_paid_sats = total_paid; + if confirmations_sufficient( + payments.iter().map(|p| p.confirmations as u32).max().unwrap_or(0), + config.bitcoin.min_confirmations, + ) { + process.payment_status = "PAID".to_string(); + } else if total_paid > 0 { process.payment_status = "PENDING".to_string(); } + + let db_lock = db.lock().await; + let _ = db_lock.upsert_process(&process).await; + } + Ok(()) +} diff --git a/src/websocket.rs b/src/websocket.rs index 2fc5502..cdf3478 100644 --- a/src/websocket.rs +++ b/src/websocket.rs @@ -1,4 +1,5 @@ -use actix::{Actor, StreamHandler, Addr, AsyncContext, ContextFutureSpawner, WrapFuture}; +use actix::{Actor, StreamHandler, Addr, AsyncContext, ContextFutureSpawner, WrapFuture, Handler}; +use actix::Message as ActixMessage; use actix_web::{web, HttpRequest, HttpResponse, Error}; use actix_web_actors::ws; use log::{info, debug}; @@ -175,12 +176,22 @@ lazy_static::lazy_static! { static ref WS_SESSIONS: Arc>>> = Arc::new(Mutex::new(HashMap::new())); } +#[derive(ActixMessage)] +#[rtype(result = "()")] +pub struct Broadcast(pub String); + +impl Handler for WsSession { + type Result = (); + fn handle(&mut self, msg: Broadcast, ctx: &mut ws::WebsocketContext) { + ctx.text(msg.0); + } +} + pub async fn broadcast_event(event: WsEvent) { let sessions = WS_SESSIONS.lock().await; let event_json = serde_json::to_string(&event).unwrap(); for (_id, addr) in sessions.iter() { - let msg = ws::Message::Text(event_json.clone()); - addr.do_send(msg); + addr.do_send(Broadcast(event_json.clone())); } }