chore: sync from 4NK_env superproject 2025-10-01T16:23:07Z

This commit is contained in:
4NK Dev 2025-10-01 16:23:07 +00:00
parent 8006ba9986
commit db0e11ce48
7 changed files with 312 additions and 48 deletions

94
.gitignore vendored
View File

@ -1,30 +1,82 @@
# Rust # 4NK Environment - Git Ignore
/target/ # ============================
Cargo.lock
**/*.rs.bk
*.pdb
# Config # Dossiers de sauvegarde des scripts
config.toml **/backup/
**/*backup*
# Logs **/.cargo/
*.log
# Database
# Fichiers temporaires
**/*.tmp*
**/*.temp*
**/*.log*
**/*.pid*
# Fichiers de configuration locale
**/*.env*
**/*.conf*
**/*.yaml*
**/*.yml*
**/*.ini*
**/*.json*
**/*.toml*
**/*.lock*
# Données et logs
**/*.logs*
**/*.data
*.db *.db
*.sqlite *.sqlite
# IDE # Certificats et clés
.idea/ **/*.key
.vscode/ **/*.pem
*.swp **/*.crt
*.swo **/*.p12
*~ **/*.pfx
ssl/
certs/
# Docker
**/*.docker*
# Cache et build
**/*.node_modules/
**/*.dist/
**/*build/
**/*target/
**/*.*.o
**/*.so
**/*.dylib
# IDE et éditeurs
**/*.vscode/
**/*.idea/
**/*.swp
**/*.swo
**/*~
# OS # OS
.DS_Store **/*.DS_Store
Thumbs.db **/*Thumbs.db
**/*tmp*
# Temporary # Git
/tmp/ **/*.git/
/data/ **/*.orig*
# Backup des projets existants
**/*backup*
**/*wallet*
**/*keys*
# Supervisor
supervisor-logs/
**/*node_modules*
**/*cursor*

View File

@ -68,7 +68,7 @@ path = "src/main.rs"
[[bin]] [[bin]]
name = "certificator-cli" name = "certificator-cli"
path = "src/bin/cli.rs" path = "src/cli.rs"
[dev-dependencies] [dev-dependencies]
mockito = "1" mockito = "1"

View File

@ -203,6 +203,104 @@ async fn perform_anchoring_cycle(config: &Config, db: &Arc<Mutex<Database>>) ->
Ok(()) Ok(())
} }
/// Anchor a specific process immediately. If `force` is true, skip payment and interval checks.
pub async fn anchor_process_now(
config: &Config,
db: &Arc<Mutex<Database>>,
target_process_id: &str,
force: bool,
) -> Result<Option<Txid>> {
// Connect RPC
let rpc_client = Client::new(
&config.bitcoin.rpc_url,
Auth::UserPass(
config.bitcoin.rpc_user.clone(),
config.bitcoin.rpc_password.clone(),
),
)?;
let blockchain_info = rpc_client.get_blockchain_info()?;
let current_block = blockchain_info.blocks as u32;
// Load process
let process = {
let db_lock = db.lock().await;
db_lock.get_process(target_process_id).await?
};
let Some(process) = process else { return Ok(None); };
// Payment/interval checks
if !force {
if process.payment_status != "PAID" { return Ok(None); }
}
// Determine period
let period_start = 0u32; // could be last anchor end; simplified for force
let period_end = current_block;
// Metrics
let metrics = {
let db_lock = db.lock().await;
db_lock.get_metrics_for_period(&process.process_id, period_start as i32, period_end as i32).await?
};
if metrics.is_empty() { return Ok(None); }
let total_bytes_sent: i64 = metrics.iter().map(|m| m.bytes_sent).sum();
let total_bytes_received: i64 = metrics.iter().map(|m| m.bytes_received).sum();
let total_messages: i32 = metrics.iter().map(|m| m.message_count).sum();
let total_bytes = (total_bytes_sent + total_bytes_received) as u64;
let total_mb = total_bytes / 1_048_576;
if !force {
if let (Some(price_mo_sats), Some(_addr)) = (process.price_mo_sats, &process.btc_address) {
let required = (price_mo_sats as u64) * total_mb;
if (process.total_paid_sats as u64) < required { return Ok(None); }
}
}
// Snapshot and hash
let snapshot = ProcessStateSnapshot {
process_id: process.process_id.clone(),
period_start_block: period_start,
period_end_block: period_end,
total_bytes_sent: total_bytes_sent as u64,
total_bytes_received: total_bytes_received as u64,
message_count: total_messages as u64,
participants: vec![],
state_merkle_root: process.state_merkle_root
.and_then(|bytes| bytes.try_into().ok())
.unwrap_or([0u8; 32]),
};
let anchor_hash = snapshot.compute_anchor_hash();
// Create anchor row
let anchor = Anchor {
id: None,
process_id: process.process_id.clone(),
anchor_hash: anchor_hash.to_vec(),
period_start_block: period_start as i32,
period_end_block: period_end as i32,
total_mb: total_mb as i64,
anchor_txid: None,
anchor_block: None,
created_at: Utc::now(),
status: "PENDING".to_string(),
};
let anchor_id = {
let db_lock = db.lock().await;
db_lock.insert_anchor(&anchor).await?
};
increment_anchors_created();
// Broadcast
let txid = create_and_broadcast_anchor_tx(&anchor_hash, &rpc_client, config).await?;
let _ = {
let db_lock = db.lock().await;
db_lock.update_anchor_tx(anchor_id, &txid.to_string(), "BROADCASTED", None).await
};
increment_anchors_confirmed();
Ok(Some(txid))
}
async fn create_and_broadcast_anchor_tx( async fn create_and_broadcast_anchor_tx(
anchor_hash: &[u8; 32], anchor_hash: &[u8; 32],
rpc: &Client, rpc: &Client,

View File

@ -4,7 +4,8 @@ use std::path::PathBuf;
use crate::config::Config; use crate::config::Config;
use crate::db::Database; use crate::db::Database;
use crate::models::ProcessStateSnapshot; use crate::anchor::{anchor_process_now, verify_anchor};
use bitcoincore_rpc::{Auth, Client, RpcApi};
#[derive(Parser)] #[derive(Parser)]
#[command(name = "certificator-cli")] #[command(name = "certificator-cli")]
@ -197,36 +198,67 @@ async fn force_anchor(
force: bool, force: bool,
) -> Result<()> { ) -> Result<()> {
println!("⚓ Forcing anchor creation for {}", process_id); println!("⚓ Forcing anchor creation for {}", process_id);
if !force { if !force {
println!("⚠️ Use --force to confirm anchor creation"); println!("⚠️ Use --force to confirm anchor creation");
return Ok(()); return Ok(());
} }
let db_arc = std::sync::Arc::new(tokio::sync::Mutex::new(db.clone()));
// TODO: Implement force anchor logic if let Some(txid) = anchor_process_now(config, &db_arc, process_id, true).await? {
println!("❌ Force anchor not yet implemented"); println!("✅ Anchor broadcasted: {}", txid);
} else {
println!("⚠️ Anchor not created (conditions not met)");
}
Ok(()) Ok(())
} }
async fn verify_anchor(config: &Config, hash: &str, txid: &str) -> Result<()> { async fn verify_anchor(config: &Config, hash: &str, txid: &str) -> Result<()> {
println!("🔍 Verifying anchor..."); println!("🔍 Verifying anchor...\nHash: {}\nTxid: {}", hash, txid);
println!("Hash: {}", hash); // Connect to RPC
println!("Txid: {}", txid); let rpc = Client::new(
&config.bitcoin.rpc_url,
// TODO: Implement verification logic Auth::UserPass(
println!("❌ Verification not yet implemented"); config.bitcoin.rpc_user.clone(),
config.bitcoin.rpc_password.clone(),
),
)?;
let t: bitcoin::Txid = txid.parse()?;
let info = rpc.get_raw_transaction_info(&t, None)?;
let Some(tx) = info.transaction() else { println!("❌ Cannot decode transaction"); return Ok(()); };
let bytes = hex::decode(hash)?;
let mut arr = [0u8; 32];
if bytes.len() != 32 { println!("❌ Invalid hash length"); return Ok(()); }
arr.copy_from_slice(&bytes);
let ok = verify_anchor(&tx, &arr);
println!("valid: {} | confirmations: {:?} | block_height: {:?}", ok, info.confirmations, info.blockheight);
Ok(()) Ok(())
} }
async fn watch_address(config: &Config, address: &str) -> Result<()> { async fn watch_address(config: &Config, address: &str) -> Result<()> {
println!("👀 Watching address: {}", address); println!("👀 Watching address: {}\nPress Ctrl+C to stop...", address);
println!("Press Ctrl+C to stop..."); let rpc = Client::new(
&config.bitcoin.rpc_url,
// TODO: Implement address watching Auth::UserPass(
println!("❌ Address watching not yet implemented"); config.bitcoin.rpc_user.clone(),
config.bitcoin.rpc_password.clone(),
),
)?;
loop {
let txids = rpc.get_raw_mempool().unwrap_or_default();
for txid in txids.iter().take(50) {
if let Ok(info) = rpc.get_raw_transaction_info(txid, None) {
if let Some(tx) = info.transaction() {
for out in tx.output {
if let Ok(addr) = bitcoin::Address::from_script(&out.script_pubkey, bitcoin::Network::Bitcoin) {
if addr.to_string() == address {
println!("💸 mempool: {} sats from {}", out.value.to_sat(), txid);
}
}
}
}
}
}
tokio::time::sleep(std::time::Duration::from_secs(15)).await;
}
Ok(()) Ok(())
} }

View File

@ -288,4 +288,25 @@ impl Database {
Ok(()) Ok(())
} }
pub async fn update_payment_confirmations(
&self,
txid: &str,
confirmations: i32,
block_height: Option<i32>,
) -> Result<()> {
sqlx::query(
r#"
UPDATE payments
SET confirmations = $2, block_height = COALESCE($3, block_height)
WHERE txid = $1
"#
)
.bind(txid)
.bind(confirmations)
.bind(block_height)
.execute(&self.pool)
.await?;
Ok(())
}
} }

View File

@ -72,6 +72,11 @@ async fn watch_payments(config: &Config, db: &Arc<Mutex<Database>>) -> Result<()
warn!("Failed to scan mempool: {}", e); warn!("Failed to scan mempool: {}", e);
} }
// Update confirmations for known payments and process statuses
if let Err(e) = update_confirmations_and_status(&rpc_client, db, config).await {
warn!("Failed updating confirmations: {}", e);
}
sleep(Duration::from_secs(config.relay.monitor_interval_secs)).await; sleep(Duration::from_secs(config.relay.monitor_interval_secs)).await;
} }
} }
@ -232,3 +237,48 @@ async fn scan_mempool_for_payments(
fn confirmations_sufficient(current: u32, required: u32) -> bool { fn confirmations_sufficient(current: u32, required: u32) -> bool {
current >= required current >= required
} }
async fn update_confirmations_and_status(
rpc: &Client,
db: &Arc<Mutex<Database>>,
config: &Config,
) -> Result<()> {
// Strategy: For all processes with PENDING or UNPAID and with btc_address, re-evaluate totals and confirmations
let db_lock = db.lock().await;
let processes = db_lock.get_all_processes().await?;
drop(db_lock);
for mut process in processes.into_iter() {
if process.btc_address.is_none() { continue; }
// Retrieve payments for this address
let payments = {
let db_lock = db.lock().await;
db_lock.get_payments_for_address(process.btc_address.as_ref().unwrap()).await?
};
let mut total_paid: i64 = 0;
for payment in payments.iter() {
// If confirmations unknown or low, refresh via RPC
if payment.confirmations < config.bitcoin.min_confirmations as i32 {
if let Ok(info) = rpc.get_raw_transaction_info(&payment.txid.parse()?, None) {
let conf = info.confirmations.unwrap_or(0) as i32;
let bh = info.blockheight.map(|h| h as i32);
let db_lock = db.lock().await;
let _ = db_lock.update_payment_confirmations(&payment.txid, conf, bh).await;
}
}
total_paid += payment.amount_sats;
}
// Update process totals and status
process.total_paid_sats = total_paid;
if confirmations_sufficient(
payments.iter().map(|p| p.confirmations as u32).max().unwrap_or(0),
config.bitcoin.min_confirmations,
) {
process.payment_status = "PAID".to_string();
} else if total_paid > 0 { process.payment_status = "PENDING".to_string(); }
let db_lock = db.lock().await;
let _ = db_lock.upsert_process(&process).await;
}
Ok(())
}

View File

@ -1,4 +1,5 @@
use actix::{Actor, StreamHandler, Addr, AsyncContext, ContextFutureSpawner, WrapFuture}; use actix::{Actor, StreamHandler, Addr, AsyncContext, ContextFutureSpawner, WrapFuture, Handler};
use actix::Message as ActixMessage;
use actix_web::{web, HttpRequest, HttpResponse, Error}; use actix_web::{web, HttpRequest, HttpResponse, Error};
use actix_web_actors::ws; use actix_web_actors::ws;
use log::{info, debug}; use log::{info, debug};
@ -175,12 +176,22 @@ lazy_static::lazy_static! {
static ref WS_SESSIONS: Arc<Mutex<HashMap<usize, Addr<WsSession>>>> = Arc::new(Mutex::new(HashMap::new())); static ref WS_SESSIONS: Arc<Mutex<HashMap<usize, Addr<WsSession>>>> = Arc::new(Mutex::new(HashMap::new()));
} }
#[derive(ActixMessage)]
#[rtype(result = "()")]
pub struct Broadcast(pub String);
impl Handler<Broadcast> for WsSession {
type Result = ();
fn handle(&mut self, msg: Broadcast, ctx: &mut ws::WebsocketContext<Self>) {
ctx.text(msg.0);
}
}
pub async fn broadcast_event(event: WsEvent) { pub async fn broadcast_event(event: WsEvent) {
let sessions = WS_SESSIONS.lock().await; let sessions = WS_SESSIONS.lock().await;
let event_json = serde_json::to_string(&event).unwrap(); let event_json = serde_json::to_string(&event).unwrap();
for (_id, addr) in sessions.iter() { for (_id, addr) in sessions.iter() {
let msg = ws::Message::Text(event_json.clone()); addr.do_send(Broadcast(event_json.clone()));
addr.do_send(msg);
} }
} }