diff --git a/spec-technique.md b/spec-technique.md index 9868145..97712a9 100644 --- a/spec-technique.md +++ b/spec-technique.md @@ -59,7 +59,7 @@ pub static STORAGE: OnceLock> = OnceLock::new(); let mut retry_count = 0; const MAX_RETRIES: u32 = 5; const RETRY_DELAY_MS: u64 = 2000; - + let daemon = loop { match Daemon::connect( config.core_wallet.clone(), @@ -116,7 +116,7 @@ pub static STORAGE: OnceLock> = OnceLock::new(); // Handlers de mise à jour tokio::spawn(handle_scan_updates(scan_rx)); tokio::spawn(handle_state_updates(state_rx)); - + // Handler ZMQ tokio::spawn(async move { handle_zmq(zmq_url, blindbit_url).await; @@ -126,7 +126,7 @@ pub static STORAGE: OnceLock> = OnceLock::new(); 6. **Démarrage du serveur WebSocket** ```rust let listener = TcpListener::bind(config.ws_url).await?; - + while let Ok((stream, addr)) = listener.accept().await { tokio::spawn(handle_connection(stream, addr, our_sp_address)); } @@ -157,7 +157,7 @@ pub struct Config { impl Config { pub fn read_from_file(filename: &str) -> Result { let mut file_content = HashMap::new(); - + // Lecture ligne par ligne for line in reader.lines() { if let Ok(l) = line { @@ -165,21 +165,21 @@ impl Config { if l.starts_with('#') || l.trim().is_empty() { continue; } - + // Parse key=value if let Some((k, v)) = l.split_once('=') { file_content.insert(k.to_owned(), v.trim_matches('\"').to_owned()); } } } - + // Construction de la config avec validation let config = Config { core_url: file_content.remove("core_url") .ok_or(Error::msg("No \"core_url\""))?, // ... autres champs }; - + Ok(config) } } @@ -224,7 +224,7 @@ impl Daemon { final_url.push_str("/wallet/"); final_url.push_str(wallet); } - + // Configuration de l'authentification let cookie_path = match cookie_path { Some(path) => path, @@ -238,17 +238,17 @@ impl Daemon { default_path } }; - + // Création du client RPC let daemon_auth = SensitiveAuth(Auth::CookieFile(cookie_path)); let builder = jsonrpc::simple_http::SimpleHttpTransport::builder() .url(&final_url)? .timeout(Duration::from_secs(30)); - + let client = Client::from_jsonrpc(jsonrpc::Client::with_transport( builder.build(), )); - + Ok(Box::new(Daemon { client, network })) } } @@ -265,18 +265,18 @@ pub async fn scan_blocks( ) -> Result<()> { let daemon = DAEMON.get().unwrap().lock_anyhow()?; let current_tip = daemon.get_block_count()?; - + // Calcul de la hauteur de départ let start_height = current_tip - blocks_to_scan as u64; - + // Récupération des filtres depuis Blindbit let filters = get_filters_from_blindbit(blindbit_url, start_height, current_tip).await?; - + // Scan de chaque bloc for (height, filter) in filters { let block_hash = daemon.get_block_hash(height)?; let block = daemon.get_block(block_hash)?; - + // Analyse des transactions du bloc for tx in block.txdata { if let Some(sp_outputs) = check_transaction_alone(&tx, &filter) { @@ -285,7 +285,7 @@ pub async fn scan_blocks( } } } - + Ok(()) } ``` @@ -298,7 +298,7 @@ fn check_transaction_alone( filter: &BlockFilter, ) -> Option> { let mut sp_outputs = Vec::new(); - + // Vérification de chaque output for (vout, output) in tx.output.iter().enumerate() { // Test du filtre Blindbit @@ -309,7 +309,7 @@ fn check_transaction_alone( } } } - + if sp_outputs.is_empty() { None } else { @@ -320,27 +320,294 @@ fn check_transaction_alone( ### 5. Module message.rs - Gestion des messages WebSocket -#### Structure des messages +#### Architecture des messages + +Le système de messages de `sdk_relay` utilise une architecture en couches avec des enveloppes (`Envelope`) qui encapsulent différents types de messages selon leur flag. + +#### Structure Envelope ```rust #[derive(Debug, Serialize, Deserialize)] -pub enum MessageType { - Handshake(HandshakeMessage), - NewTx(NewTxMessage), - Broadcast(BroadcastMessage), - BalanceUpdate(BalanceUpdateMessage), - TxDetected(TxDetectedMessage), +pub struct Envelope { + pub flag: AnkFlag, // Type de message + pub content: String, // Contenu JSON du message } +``` -#[derive(Debug, Serialize, Deserialize)] +#### Types de messages (AnkFlag) + +```rust +#[derive(Debug, Serialize, Deserialize, Tsify)] +pub enum AnkFlag { + NewTx, // 0 - Transaction Bitcoin + Faucet, // 1 - Service de faucet + Cipher, // 2 - Messages chiffrés + Commit, // 3 - Messages de commit + Handshake, // 4 - Poignée de main initiale + Sync, // 5 - Synchronisation (non implémenté) + Unknown, // - Messages inconnus +} +``` + +#### 1. Message Handshake (AnkFlag::Handshake) + +**Structure :** +```rust +#[derive(Debug, Serialize, Deserialize, Tsify)] pub struct HandshakeMessage { - pub version: String, - pub capabilities: Vec, + pub sp_address: String, // Adresse Silent Payment du client + pub peers_list: OutPointMemberMap, // Liste des pairs connectés + pub processes_list: OutPointProcessMap, // Liste des processus actifs + pub chain_tip: u32, // Hauteur actuelle de la blockchain } +``` -#[derive(Debug, Serialize, Deserialize)] +**Champs détaillés :** +- **sp_address** : Adresse Silent Payment du client qui se connecte +- **peers_list** : Map des pairs connectés (OutPoint → Member) +- **processes_list** : Map des processus actifs (OutPoint → Process) +- **chain_tip** : Hauteur de la blockchain pour synchronisation + +**Utilisation :** Échange initial lors de la connexion WebSocket pour synchroniser l'état + +#### 2. Message NewTx (AnkFlag::NewTx) + +**Structure :** +```rust +#[derive(Debug, PartialEq, Serialize, Deserialize, Tsify)] pub struct NewTxMessage { - pub transaction: String, // Transaction hex + pub transaction: String, // Transaction Bitcoin en hex + pub tweak_data: Option, // Données de tweak Silent Payment + pub error: Option, // Erreur éventuelle +} +``` + +**Champs détaillés :** +- **transaction** : Transaction Bitcoin sérialisée en format hexadécimal +- **tweak_data** : Données de tweak pour les Silent Payments (optionnel) +- **error** : Erreur éventuelle lors du traitement + +**Utilisation :** Diffusion de nouvelles transactions Bitcoin à tous les pairs + +#### 3. Message Commit (AnkFlag::Commit) + +**Structure :** +```rust +#[derive(Debug, PartialEq, Clone, Serialize, Deserialize, Tsify)] +pub struct CommitMessage { + pub process_id: OutPoint, // Identifiant du processus + pub pcd_commitment: PcdCommitments, // Engagements PCD + pub roles: Roles, // Rôles des participants + pub public_data: Pcd, // Données publiques + pub validation_tokens: Vec, // Tokens de validation + pub error: Option, // Erreur éventuelle +} +``` + +**Champs détaillés :** +- **process_id** : Identifiant unique du processus (OutPoint) +- **pcd_commitment** : Map des engagements PCD (champ → hash) +- **roles** : Définition des rôles et permissions +- **public_data** : Données publiques du processus +- **validation_tokens** : Preuves cryptographiques de validation +- **error** : Erreur éventuelle lors du traitement + +**Utilisation :** Gestion des processus collaboratifs et de leurs états + +#### 4. Message Faucet (AnkFlag::Faucet) + +**Structure :** +```rust +#[derive(Debug, Serialize, Deserialize, Tsify)] +pub struct FaucetMessage { + pub sp_address: String, // Adresse Silent Payment + pub commitment: String, // Engagement cryptographique + pub error: Option, // Erreur éventuelle +} +``` + +**Champs détaillés :** +- **sp_address** : Adresse Silent Payment du demandeur +- **commitment** : Engagement cryptographique pour éviter les abus +- **error** : Erreur éventuelle lors du traitement + +**Utilisation :** Service de faucet pour obtenir des fonds de test + +#### 5. Message Cipher (AnkFlag::Cipher) + +**Structure :** Le contenu est une chaîne JSON contenant des données chiffrées. + +**Champs détaillés :** +- **content** : Données chiffrées en format hexadécimal + +**Utilisation :** Communication chiffrée entre pairs pour les messages privés + +#### 6. Message Sync (AnkFlag::Sync) + +**Structure :** Non implémentée actuellement (todo!) + +**Utilisation :** Synchronisation avancée entre pairs (futur) + +#### 7. Message Unknown (AnkFlag::Unknown) + +**Structure :** Messages de type inconnu + +**Utilisation :** Gestion des messages non reconnus + +#### Cache de messages + +```rust +#[derive(Debug)] +pub(crate) struct MessageCache { + store: Mutex>, +} +``` + +**Fonctionnalités :** +- **Déduplication** : Évite le traitement multiple du même message +- **Expiration** : Messages supprimés après 20 secondes +- **Nettoyage automatique** : Toutes les 5 secondes + +#### Types de broadcast + +```rust +pub(crate) enum BroadcastType { + Sender(SocketAddr), // Envoi au seul expéditeur + ExcludeSender(SocketAddr), // Envoi à tous sauf l'expéditeur + ToAll, // Envoi à tous les pairs +} +``` + +#### Types de données complexes + +##### OutPointMemberMap et OutPointProcessMap + +```rust +// Map des pairs connectés +pub type OutPointMemberMap = HashMap; + +// Map des processus actifs +pub type OutPointProcessMap = HashMap; +``` + +##### Structure Member + +```rust +#[derive(Debug, Clone, Serialize, Deserialize, Tsify)] +pub struct Member { + pub outpoint: OutPoint, // Identifiant unique du membre + pub public_key: PublicKey, // Clé publique du membre + pub balance: Amount, // Solde actuel + pub last_commit: Option, // Dernier commit effectué +} +``` + +##### Structure Process + +```rust +#[derive(Debug, Clone, Serialize, Deserialize, Tsify)] +pub struct Process { + pub process_id: OutPoint, // Identifiant du processus + pub state_id: [u8; 32], // ID de l'état actuel + pub roles: Roles, // Rôles des participants + pub public_data: Pcd, // Données publiques + pub validation_tokens: Vec, // Tokens de validation +} +``` + +##### Structure Pcd (Process Control Data) + +```rust +#[derive(Debug, Clone, Serialize, Deserialize, Tsify)] +pub struct Pcd { + pub fields: HashMap, // Champs de données + pub validation_rules: Vec, // Règles de validation +} +``` + +##### Structure Roles + +```rust +#[derive(Debug, Clone, Serialize, Deserialize, Tsify)] +pub struct Roles { + pub role_definitions: HashMap, // Définitions des rôles +} +``` + +##### Structure Proof + +```rust +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)] +pub struct Proof { + signature: Signature, // Signature Schnorr + message: [u8; 32] // Hash du message signé +} +``` + +##### Structure PcdCommitments + +```rust +#[derive(Debug, Clone, Serialize, Deserialize, Tsify)] +pub struct PcdCommitments { + pub commitments: HashMap, // Map champ → hash +} +``` + +#### Format des messages JSON + +##### Exemple Handshake + +```json +{ + "flag": "Handshake", + "content": "{\"sp_address\":\"sp1...\",\"peers_list\":{},\"processes_list\":{},\"chain_tip\":123456}" +} +``` + +##### Exemple NewTx + +```json +{ + "flag": "NewTx", + "content": "{\"transaction\":\"02000000...\",\"tweak_data\":null,\"error\":null}" +} +``` + +##### Exemple Commit + +```json +{ + "flag": "Commit", + "content": "{\"process_id\":\"...\",\"pcd_commitment\":{},\"roles\":{},\"public_data\":{},\"validation_tokens\":[],\"error\":null}" +} +``` + +#### Traitement des messages + +```rust +pub fn process_message(raw_msg: &str, addr: SocketAddr) { + // Vérification du cache pour éviter les doublons + let cache = MESSAGECACHE.get().expect("Cache should be initialized"); + if cache.contains(raw_msg) { + log::debug!("Message already processed, dropping"); + return; + } else { + cache.insert(raw_msg.to_owned()); + } + + // Parsing de l'enveloppe + match serde_json::from_str::(raw_msg) { + Ok(ank_msg) => match ank_msg.flag { + AnkFlag::Faucet => process_faucet_message(ank_msg, addr), + AnkFlag::NewTx => process_new_tx_message(ank_msg, addr), + AnkFlag::Cipher => process_cipher_message(ank_msg, addr), + AnkFlag::Commit => process_commit_message(ank_msg, addr), + AnkFlag::Unknown => process_unknown_message(ank_msg, addr), + AnkFlag::Sync => todo!(), + AnkFlag::Handshake => log::debug!("Received init message from {}", addr), + }, + Err(_) => log::error!("Failed to parse network message"), + } } ``` @@ -360,20 +627,20 @@ async fn handle_connection( return; } }; - + // Ajout à la map des peers let (tx, rx) = unbounded_channel(); { let mut peer_map = PEERMAP.get().unwrap().lock_anyhow().unwrap(); peer_map.insert(addr, tx); } - + // Boucle de traitement des messages let (mut ws_sender, mut ws_receiver) = ws_stream.split(); - + // Handler des messages entrants let incoming = rx.map(Ok).forward(ws_sender); - + // Handler des messages sortants let outgoing = ws_receiver.try_for_each(|msg| async { match msg { @@ -386,7 +653,7 @@ async fn handle_connection( } Ok(()) }); - + // Exécution concurrente pin_mut!(incoming, outgoing); future::select(incoming, outgoing).await; @@ -403,34 +670,34 @@ pub async fn process_message(message: MessageType, addr: SocketAddr) -> Result<( if handshake.version != "1.0" { return Err(Error::msg("Unsupported version")); } - + // Envoi de la confirmation let response = MessageType::Handshake(HandshakeMessage { version: "1.0".to_string(), capabilities: vec!["silent_payments".to_string(), "broadcast".to_string()], }); - + broadcast_to_peer(addr, response).await?; } - + MessageType::NewTx(new_tx) => { // Validation et broadcast de la transaction let tx = deserialize::(&Vec::from_hex(&new_tx.transaction)?)?; - + let daemon = DAEMON.get().unwrap().lock_anyhow()?; daemon.test_mempool_accept(&tx)?; daemon.broadcast(&tx)?; - + // Notification aux autres peers broadcast_message(BroadcastType::NewTx(new_tx), Some(addr)).await?; } - + MessageType::Broadcast(broadcast) => { // Relay du message broadcast broadcast_message(BroadcastType::Custom(broadcast), Some(addr)).await?; } } - + Ok(()) } ``` @@ -470,25 +737,25 @@ pub fn lock_members() -> Result>> pub fn add_member(outpoint: OutPoint, public_key: PublicKey) -> Result<()> { let mut members = lock_members()?; - + let member = Member { outpoint, public_key, balance: Amount::ZERO, last_commit: None, }; - + members.insert(outpoint, member); Ok(()) } pub fn update_member_balance(outpoint: OutPoint, amount: Amount) -> Result<()> { let mut members = lock_members()?; - + if let Some(member) = members.get_mut(&outpoint) { member.balance = amount; } - + Ok(()) } ``` @@ -520,17 +787,17 @@ impl StateFile { .write(true) .truncate(true) .open(&self.path)?; - + let stringified = serde_json::to_string(&json)?; f.write_all(stringified.as_bytes())?; Ok(()) } - + fn load(&self) -> Result { let mut f = fs::File::open(&self.path)?; let mut content = vec![]; f.read_to_end(&mut content)?; - + let res: Value = serde_json::from_slice(&content)?; Ok(res) } @@ -542,41 +809,41 @@ impl StateFile { ### 1. Initialisation du service ``` -main() → Config::read_from_file() → Daemon::connect() → -SpWallet::new() → load_persistent_state() → +main() → Config::read_from_file() → Daemon::connect() → +SpWallet::new() → load_persistent_state() → spawn_handlers() → TcpListener::bind() → accept_loop() ``` ### 2. Traitement d'une connexion WebSocket ``` -TcpListener::accept() → handle_connection() → -tokio_tungstenite::accept_async() → +TcpListener::accept() → handle_connection() → +tokio_tungstenite::accept_async() → PEERMAP.insert() → message_loop() ``` ### 3. Traitement d'un message ``` -Message::Text() → serde_json::from_str() → -process_message() → match MessageType → +Message::Text() → serde_json::from_str() → +process_message() → match MessageType → handle_specific_message() → broadcast_to_peers() ``` ### 4. Scan des blocs ``` -handle_zmq() → scan_blocks() → -get_filters_from_blindbit() → -check_transaction_alone() → +handle_zmq() → scan_blocks() → +get_filters_from_blindbit() → +check_transaction_alone() → process_sp_outputs() → update_wallet() ``` ### 5. Broadcast de transaction ``` -NewTxMessage → deserialize() → -test_mempool_accept() → broadcast() → +NewTxMessage → deserialize() → +test_mempool_accept() → broadcast() → broadcast_message() → send_to_all_peers() ``` @@ -697,9 +964,9 @@ const MAX_CONNECTIONS: usize = 1000; ```rust log::info!("Using wallet with address {}", our_sp_address); -log::info!("Found {} outputs for a total balance of {}", +log::info!("Found {} outputs for a total balance of {}", sp_wallet.get_outputs().len(), sp_wallet.get_balance()); -log::warn!("Failed to connect to Bitcoin Core (attempt {}/{}): {}", +log::warn!("Failed to connect to Bitcoin Core (attempt {}/{}): {}", retry_count, MAX_RETRIES, e); ``` @@ -731,23 +998,23 @@ TcpStream::connect(ws_url).await?; #[cfg(test)] mod tests { use super::*; - + #[test] fn test_config_parsing() { let config = Config::read_from_file("test.conf").unwrap(); assert_eq!(config.network, Network::Signet); } - + #[test] fn test_message_serialization() { let message = MessageType::Handshake(HandshakeMessage { version: "1.0".to_string(), capabilities: vec!["silent_payments".to_string()], }); - + let serialized = serde_json::to_string(&message).unwrap(); let deserialized: MessageType = serde_json::from_str(&serialized).unwrap(); - + assert!(matches!(deserialized, MessageType::Handshake(_))); } } @@ -761,12 +1028,12 @@ async fn test_websocket_connection() { // Setup let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); let addr = listener.local_addr().unwrap(); - + // Test let client = tokio_tungstenite::connect_async( format!("ws://{}", addr) ).await.unwrap(); - + // Assertions assert!(client.0.is_ok()); }