docs: enrichir la spécification technique avec les types de messages détaillés - Ajout de l'architecture des messages avec Envelope et AnkFlag - Documentation complète de tous les types de messages (Handshake, NewTx, Commit, Faucet, Cipher, Sync, Unknown) - Description détaillée des champs et structures de données - Exemples JSON des formats de messages - Documentation du cache de messages et des types de broadcast - Ajout des types de données complexes (Member, Process, Pcd, Roles, Proof, etc.) - Explication du traitement des messages et de la déduplication
This commit is contained in:
parent
28eae8759c
commit
c5597d7249
@ -59,7 +59,7 @@ pub static STORAGE: OnceLock<Mutex<DiskStorage>> = OnceLock::new();
|
|||||||
let mut retry_count = 0;
|
let mut retry_count = 0;
|
||||||
const MAX_RETRIES: u32 = 5;
|
const MAX_RETRIES: u32 = 5;
|
||||||
const RETRY_DELAY_MS: u64 = 2000;
|
const RETRY_DELAY_MS: u64 = 2000;
|
||||||
|
|
||||||
let daemon = loop {
|
let daemon = loop {
|
||||||
match Daemon::connect(
|
match Daemon::connect(
|
||||||
config.core_wallet.clone(),
|
config.core_wallet.clone(),
|
||||||
@ -116,7 +116,7 @@ pub static STORAGE: OnceLock<Mutex<DiskStorage>> = OnceLock::new();
|
|||||||
// Handlers de mise à jour
|
// Handlers de mise à jour
|
||||||
tokio::spawn(handle_scan_updates(scan_rx));
|
tokio::spawn(handle_scan_updates(scan_rx));
|
||||||
tokio::spawn(handle_state_updates(state_rx));
|
tokio::spawn(handle_state_updates(state_rx));
|
||||||
|
|
||||||
// Handler ZMQ
|
// Handler ZMQ
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
handle_zmq(zmq_url, blindbit_url).await;
|
handle_zmq(zmq_url, blindbit_url).await;
|
||||||
@ -126,7 +126,7 @@ pub static STORAGE: OnceLock<Mutex<DiskStorage>> = OnceLock::new();
|
|||||||
6. **Démarrage du serveur WebSocket**
|
6. **Démarrage du serveur WebSocket**
|
||||||
```rust
|
```rust
|
||||||
let listener = TcpListener::bind(config.ws_url).await?;
|
let listener = TcpListener::bind(config.ws_url).await?;
|
||||||
|
|
||||||
while let Ok((stream, addr)) = listener.accept().await {
|
while let Ok((stream, addr)) = listener.accept().await {
|
||||||
tokio::spawn(handle_connection(stream, addr, our_sp_address));
|
tokio::spawn(handle_connection(stream, addr, our_sp_address));
|
||||||
}
|
}
|
||||||
@ -157,7 +157,7 @@ pub struct Config {
|
|||||||
impl Config {
|
impl Config {
|
||||||
pub fn read_from_file(filename: &str) -> Result<Self> {
|
pub fn read_from_file(filename: &str) -> Result<Self> {
|
||||||
let mut file_content = HashMap::new();
|
let mut file_content = HashMap::new();
|
||||||
|
|
||||||
// Lecture ligne par ligne
|
// Lecture ligne par ligne
|
||||||
for line in reader.lines() {
|
for line in reader.lines() {
|
||||||
if let Ok(l) = line {
|
if let Ok(l) = line {
|
||||||
@ -165,21 +165,21 @@ impl Config {
|
|||||||
if l.starts_with('#') || l.trim().is_empty() {
|
if l.starts_with('#') || l.trim().is_empty() {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Parse key=value
|
// Parse key=value
|
||||||
if let Some((k, v)) = l.split_once('=') {
|
if let Some((k, v)) = l.split_once('=') {
|
||||||
file_content.insert(k.to_owned(), v.trim_matches('\"').to_owned());
|
file_content.insert(k.to_owned(), v.trim_matches('\"').to_owned());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Construction de la config avec validation
|
// Construction de la config avec validation
|
||||||
let config = Config {
|
let config = Config {
|
||||||
core_url: file_content.remove("core_url")
|
core_url: file_content.remove("core_url")
|
||||||
.ok_or(Error::msg("No \"core_url\""))?,
|
.ok_or(Error::msg("No \"core_url\""))?,
|
||||||
// ... autres champs
|
// ... autres champs
|
||||||
};
|
};
|
||||||
|
|
||||||
Ok(config)
|
Ok(config)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -224,7 +224,7 @@ impl Daemon {
|
|||||||
final_url.push_str("/wallet/");
|
final_url.push_str("/wallet/");
|
||||||
final_url.push_str(wallet);
|
final_url.push_str(wallet);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Configuration de l'authentification
|
// Configuration de l'authentification
|
||||||
let cookie_path = match cookie_path {
|
let cookie_path = match cookie_path {
|
||||||
Some(path) => path,
|
Some(path) => path,
|
||||||
@ -238,17 +238,17 @@ impl Daemon {
|
|||||||
default_path
|
default_path
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
// Création du client RPC
|
// Création du client RPC
|
||||||
let daemon_auth = SensitiveAuth(Auth::CookieFile(cookie_path));
|
let daemon_auth = SensitiveAuth(Auth::CookieFile(cookie_path));
|
||||||
let builder = jsonrpc::simple_http::SimpleHttpTransport::builder()
|
let builder = jsonrpc::simple_http::SimpleHttpTransport::builder()
|
||||||
.url(&final_url)?
|
.url(&final_url)?
|
||||||
.timeout(Duration::from_secs(30));
|
.timeout(Duration::from_secs(30));
|
||||||
|
|
||||||
let client = Client::from_jsonrpc(jsonrpc::Client::with_transport(
|
let client = Client::from_jsonrpc(jsonrpc::Client::with_transport(
|
||||||
builder.build(),
|
builder.build(),
|
||||||
));
|
));
|
||||||
|
|
||||||
Ok(Box::new(Daemon { client, network }))
|
Ok(Box::new(Daemon { client, network }))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -265,18 +265,18 @@ pub async fn scan_blocks(
|
|||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let daemon = DAEMON.get().unwrap().lock_anyhow()?;
|
let daemon = DAEMON.get().unwrap().lock_anyhow()?;
|
||||||
let current_tip = daemon.get_block_count()?;
|
let current_tip = daemon.get_block_count()?;
|
||||||
|
|
||||||
// Calcul de la hauteur de départ
|
// Calcul de la hauteur de départ
|
||||||
let start_height = current_tip - blocks_to_scan as u64;
|
let start_height = current_tip - blocks_to_scan as u64;
|
||||||
|
|
||||||
// Récupération des filtres depuis Blindbit
|
// Récupération des filtres depuis Blindbit
|
||||||
let filters = get_filters_from_blindbit(blindbit_url, start_height, current_tip).await?;
|
let filters = get_filters_from_blindbit(blindbit_url, start_height, current_tip).await?;
|
||||||
|
|
||||||
// Scan de chaque bloc
|
// Scan de chaque bloc
|
||||||
for (height, filter) in filters {
|
for (height, filter) in filters {
|
||||||
let block_hash = daemon.get_block_hash(height)?;
|
let block_hash = daemon.get_block_hash(height)?;
|
||||||
let block = daemon.get_block(block_hash)?;
|
let block = daemon.get_block(block_hash)?;
|
||||||
|
|
||||||
// Analyse des transactions du bloc
|
// Analyse des transactions du bloc
|
||||||
for tx in block.txdata {
|
for tx in block.txdata {
|
||||||
if let Some(sp_outputs) = check_transaction_alone(&tx, &filter) {
|
if let Some(sp_outputs) = check_transaction_alone(&tx, &filter) {
|
||||||
@ -285,7 +285,7 @@ pub async fn scan_blocks(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
@ -298,7 +298,7 @@ fn check_transaction_alone(
|
|||||||
filter: &BlockFilter,
|
filter: &BlockFilter,
|
||||||
) -> Option<Vec<SpOutput>> {
|
) -> Option<Vec<SpOutput>> {
|
||||||
let mut sp_outputs = Vec::new();
|
let mut sp_outputs = Vec::new();
|
||||||
|
|
||||||
// Vérification de chaque output
|
// Vérification de chaque output
|
||||||
for (vout, output) in tx.output.iter().enumerate() {
|
for (vout, output) in tx.output.iter().enumerate() {
|
||||||
// Test du filtre Blindbit
|
// Test du filtre Blindbit
|
||||||
@ -309,7 +309,7 @@ fn check_transaction_alone(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if sp_outputs.is_empty() {
|
if sp_outputs.is_empty() {
|
||||||
None
|
None
|
||||||
} else {
|
} else {
|
||||||
@ -320,27 +320,294 @@ fn check_transaction_alone(
|
|||||||
|
|
||||||
### 5. Module message.rs - Gestion des messages WebSocket
|
### 5. Module message.rs - Gestion des messages WebSocket
|
||||||
|
|
||||||
#### Structure des messages
|
#### Architecture des messages
|
||||||
|
|
||||||
|
Le système de messages de `sdk_relay` utilise une architecture en couches avec des enveloppes (`Envelope`) qui encapsulent différents types de messages selon leur flag.
|
||||||
|
|
||||||
|
#### Structure Envelope
|
||||||
|
|
||||||
```rust
|
```rust
|
||||||
#[derive(Debug, Serialize, Deserialize)]
|
#[derive(Debug, Serialize, Deserialize)]
|
||||||
pub enum MessageType {
|
pub struct Envelope {
|
||||||
Handshake(HandshakeMessage),
|
pub flag: AnkFlag, // Type de message
|
||||||
NewTx(NewTxMessage),
|
pub content: String, // Contenu JSON du message
|
||||||
Broadcast(BroadcastMessage),
|
|
||||||
BalanceUpdate(BalanceUpdateMessage),
|
|
||||||
TxDetected(TxDetectedMessage),
|
|
||||||
}
|
}
|
||||||
|
```
|
||||||
|
|
||||||
#[derive(Debug, Serialize, Deserialize)]
|
#### Types de messages (AnkFlag)
|
||||||
|
|
||||||
|
```rust
|
||||||
|
#[derive(Debug, Serialize, Deserialize, Tsify)]
|
||||||
|
pub enum AnkFlag {
|
||||||
|
NewTx, // 0 - Transaction Bitcoin
|
||||||
|
Faucet, // 1 - Service de faucet
|
||||||
|
Cipher, // 2 - Messages chiffrés
|
||||||
|
Commit, // 3 - Messages de commit
|
||||||
|
Handshake, // 4 - Poignée de main initiale
|
||||||
|
Sync, // 5 - Synchronisation (non implémenté)
|
||||||
|
Unknown, // - Messages inconnus
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
#### 1. Message Handshake (AnkFlag::Handshake)
|
||||||
|
|
||||||
|
**Structure :**
|
||||||
|
```rust
|
||||||
|
#[derive(Debug, Serialize, Deserialize, Tsify)]
|
||||||
pub struct HandshakeMessage {
|
pub struct HandshakeMessage {
|
||||||
pub version: String,
|
pub sp_address: String, // Adresse Silent Payment du client
|
||||||
pub capabilities: Vec<String>,
|
pub peers_list: OutPointMemberMap, // Liste des pairs connectés
|
||||||
|
pub processes_list: OutPointProcessMap, // Liste des processus actifs
|
||||||
|
pub chain_tip: u32, // Hauteur actuelle de la blockchain
|
||||||
}
|
}
|
||||||
|
```
|
||||||
|
|
||||||
#[derive(Debug, Serialize, Deserialize)]
|
**Champs détaillés :**
|
||||||
|
- **sp_address** : Adresse Silent Payment du client qui se connecte
|
||||||
|
- **peers_list** : Map des pairs connectés (OutPoint → Member)
|
||||||
|
- **processes_list** : Map des processus actifs (OutPoint → Process)
|
||||||
|
- **chain_tip** : Hauteur de la blockchain pour synchronisation
|
||||||
|
|
||||||
|
**Utilisation :** Échange initial lors de la connexion WebSocket pour synchroniser l'état
|
||||||
|
|
||||||
|
#### 2. Message NewTx (AnkFlag::NewTx)
|
||||||
|
|
||||||
|
**Structure :**
|
||||||
|
```rust
|
||||||
|
#[derive(Debug, PartialEq, Serialize, Deserialize, Tsify)]
|
||||||
pub struct NewTxMessage {
|
pub struct NewTxMessage {
|
||||||
pub transaction: String, // Transaction hex
|
pub transaction: String, // Transaction Bitcoin en hex
|
||||||
|
pub tweak_data: Option<String>, // Données de tweak Silent Payment
|
||||||
|
pub error: Option<AnkError>, // Erreur éventuelle
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
**Champs détaillés :**
|
||||||
|
- **transaction** : Transaction Bitcoin sérialisée en format hexadécimal
|
||||||
|
- **tweak_data** : Données de tweak pour les Silent Payments (optionnel)
|
||||||
|
- **error** : Erreur éventuelle lors du traitement
|
||||||
|
|
||||||
|
**Utilisation :** Diffusion de nouvelles transactions Bitcoin à tous les pairs
|
||||||
|
|
||||||
|
#### 3. Message Commit (AnkFlag::Commit)
|
||||||
|
|
||||||
|
**Structure :**
|
||||||
|
```rust
|
||||||
|
#[derive(Debug, PartialEq, Clone, Serialize, Deserialize, Tsify)]
|
||||||
|
pub struct CommitMessage {
|
||||||
|
pub process_id: OutPoint, // Identifiant du processus
|
||||||
|
pub pcd_commitment: PcdCommitments, // Engagements PCD
|
||||||
|
pub roles: Roles, // Rôles des participants
|
||||||
|
pub public_data: Pcd, // Données publiques
|
||||||
|
pub validation_tokens: Vec<Proof>, // Tokens de validation
|
||||||
|
pub error: Option<AnkError>, // Erreur éventuelle
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
**Champs détaillés :**
|
||||||
|
- **process_id** : Identifiant unique du processus (OutPoint)
|
||||||
|
- **pcd_commitment** : Map des engagements PCD (champ → hash)
|
||||||
|
- **roles** : Définition des rôles et permissions
|
||||||
|
- **public_data** : Données publiques du processus
|
||||||
|
- **validation_tokens** : Preuves cryptographiques de validation
|
||||||
|
- **error** : Erreur éventuelle lors du traitement
|
||||||
|
|
||||||
|
**Utilisation :** Gestion des processus collaboratifs et de leurs états
|
||||||
|
|
||||||
|
#### 4. Message Faucet (AnkFlag::Faucet)
|
||||||
|
|
||||||
|
**Structure :**
|
||||||
|
```rust
|
||||||
|
#[derive(Debug, Serialize, Deserialize, Tsify)]
|
||||||
|
pub struct FaucetMessage {
|
||||||
|
pub sp_address: String, // Adresse Silent Payment
|
||||||
|
pub commitment: String, // Engagement cryptographique
|
||||||
|
pub error: Option<AnkError>, // Erreur éventuelle
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
**Champs détaillés :**
|
||||||
|
- **sp_address** : Adresse Silent Payment du demandeur
|
||||||
|
- **commitment** : Engagement cryptographique pour éviter les abus
|
||||||
|
- **error** : Erreur éventuelle lors du traitement
|
||||||
|
|
||||||
|
**Utilisation :** Service de faucet pour obtenir des fonds de test
|
||||||
|
|
||||||
|
#### 5. Message Cipher (AnkFlag::Cipher)
|
||||||
|
|
||||||
|
**Structure :** Le contenu est une chaîne JSON contenant des données chiffrées.
|
||||||
|
|
||||||
|
**Champs détaillés :**
|
||||||
|
- **content** : Données chiffrées en format hexadécimal
|
||||||
|
|
||||||
|
**Utilisation :** Communication chiffrée entre pairs pour les messages privés
|
||||||
|
|
||||||
|
#### 6. Message Sync (AnkFlag::Sync)
|
||||||
|
|
||||||
|
**Structure :** Non implémentée actuellement (todo!)
|
||||||
|
|
||||||
|
**Utilisation :** Synchronisation avancée entre pairs (futur)
|
||||||
|
|
||||||
|
#### 7. Message Unknown (AnkFlag::Unknown)
|
||||||
|
|
||||||
|
**Structure :** Messages de type inconnu
|
||||||
|
|
||||||
|
**Utilisation :** Gestion des messages non reconnus
|
||||||
|
|
||||||
|
#### Cache de messages
|
||||||
|
|
||||||
|
```rust
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub(crate) struct MessageCache {
|
||||||
|
store: Mutex<HashMap<String, Instant>>,
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
**Fonctionnalités :**
|
||||||
|
- **Déduplication** : Évite le traitement multiple du même message
|
||||||
|
- **Expiration** : Messages supprimés après 20 secondes
|
||||||
|
- **Nettoyage automatique** : Toutes les 5 secondes
|
||||||
|
|
||||||
|
#### Types de broadcast
|
||||||
|
|
||||||
|
```rust
|
||||||
|
pub(crate) enum BroadcastType {
|
||||||
|
Sender(SocketAddr), // Envoi au seul expéditeur
|
||||||
|
ExcludeSender(SocketAddr), // Envoi à tous sauf l'expéditeur
|
||||||
|
ToAll, // Envoi à tous les pairs
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
#### Types de données complexes
|
||||||
|
|
||||||
|
##### OutPointMemberMap et OutPointProcessMap
|
||||||
|
|
||||||
|
```rust
|
||||||
|
// Map des pairs connectés
|
||||||
|
pub type OutPointMemberMap = HashMap<OutPoint, Member>;
|
||||||
|
|
||||||
|
// Map des processus actifs
|
||||||
|
pub type OutPointProcessMap = HashMap<OutPoint, Process>;
|
||||||
|
```
|
||||||
|
|
||||||
|
##### Structure Member
|
||||||
|
|
||||||
|
```rust
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize, Tsify)]
|
||||||
|
pub struct Member {
|
||||||
|
pub outpoint: OutPoint, // Identifiant unique du membre
|
||||||
|
pub public_key: PublicKey, // Clé publique du membre
|
||||||
|
pub balance: Amount, // Solde actuel
|
||||||
|
pub last_commit: Option<Commit>, // Dernier commit effectué
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
##### Structure Process
|
||||||
|
|
||||||
|
```rust
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize, Tsify)]
|
||||||
|
pub struct Process {
|
||||||
|
pub process_id: OutPoint, // Identifiant du processus
|
||||||
|
pub state_id: [u8; 32], // ID de l'état actuel
|
||||||
|
pub roles: Roles, // Rôles des participants
|
||||||
|
pub public_data: Pcd, // Données publiques
|
||||||
|
pub validation_tokens: Vec<Proof>, // Tokens de validation
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
##### Structure Pcd (Process Control Data)
|
||||||
|
|
||||||
|
```rust
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize, Tsify)]
|
||||||
|
pub struct Pcd {
|
||||||
|
pub fields: HashMap<String, Field>, // Champs de données
|
||||||
|
pub validation_rules: Vec<ValidationRule>, // Règles de validation
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
##### Structure Roles
|
||||||
|
|
||||||
|
```rust
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize, Tsify)]
|
||||||
|
pub struct Roles {
|
||||||
|
pub role_definitions: HashMap<String, RoleDefinition>, // Définitions des rôles
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
##### Structure Proof
|
||||||
|
|
||||||
|
```rust
|
||||||
|
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
|
||||||
|
pub struct Proof {
|
||||||
|
signature: Signature, // Signature Schnorr
|
||||||
|
message: [u8; 32] // Hash du message signé
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
##### Structure PcdCommitments
|
||||||
|
|
||||||
|
```rust
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize, Tsify)]
|
||||||
|
pub struct PcdCommitments {
|
||||||
|
pub commitments: HashMap<String, [u8; 32]>, // Map champ → hash
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
#### Format des messages JSON
|
||||||
|
|
||||||
|
##### Exemple Handshake
|
||||||
|
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"flag": "Handshake",
|
||||||
|
"content": "{\"sp_address\":\"sp1...\",\"peers_list\":{},\"processes_list\":{},\"chain_tip\":123456}"
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
##### Exemple NewTx
|
||||||
|
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"flag": "NewTx",
|
||||||
|
"content": "{\"transaction\":\"02000000...\",\"tweak_data\":null,\"error\":null}"
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
##### Exemple Commit
|
||||||
|
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"flag": "Commit",
|
||||||
|
"content": "{\"process_id\":\"...\",\"pcd_commitment\":{},\"roles\":{},\"public_data\":{},\"validation_tokens\":[],\"error\":null}"
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
#### Traitement des messages
|
||||||
|
|
||||||
|
```rust
|
||||||
|
pub fn process_message(raw_msg: &str, addr: SocketAddr) {
|
||||||
|
// Vérification du cache pour éviter les doublons
|
||||||
|
let cache = MESSAGECACHE.get().expect("Cache should be initialized");
|
||||||
|
if cache.contains(raw_msg) {
|
||||||
|
log::debug!("Message already processed, dropping");
|
||||||
|
return;
|
||||||
|
} else {
|
||||||
|
cache.insert(raw_msg.to_owned());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Parsing de l'enveloppe
|
||||||
|
match serde_json::from_str::<Envelope>(raw_msg) {
|
||||||
|
Ok(ank_msg) => match ank_msg.flag {
|
||||||
|
AnkFlag::Faucet => process_faucet_message(ank_msg, addr),
|
||||||
|
AnkFlag::NewTx => process_new_tx_message(ank_msg, addr),
|
||||||
|
AnkFlag::Cipher => process_cipher_message(ank_msg, addr),
|
||||||
|
AnkFlag::Commit => process_commit_message(ank_msg, addr),
|
||||||
|
AnkFlag::Unknown => process_unknown_message(ank_msg, addr),
|
||||||
|
AnkFlag::Sync => todo!(),
|
||||||
|
AnkFlag::Handshake => log::debug!("Received init message from {}", addr),
|
||||||
|
},
|
||||||
|
Err(_) => log::error!("Failed to parse network message"),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
@ -360,20 +627,20 @@ async fn handle_connection(
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
// Ajout à la map des peers
|
// Ajout à la map des peers
|
||||||
let (tx, rx) = unbounded_channel();
|
let (tx, rx) = unbounded_channel();
|
||||||
{
|
{
|
||||||
let mut peer_map = PEERMAP.get().unwrap().lock_anyhow().unwrap();
|
let mut peer_map = PEERMAP.get().unwrap().lock_anyhow().unwrap();
|
||||||
peer_map.insert(addr, tx);
|
peer_map.insert(addr, tx);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Boucle de traitement des messages
|
// Boucle de traitement des messages
|
||||||
let (mut ws_sender, mut ws_receiver) = ws_stream.split();
|
let (mut ws_sender, mut ws_receiver) = ws_stream.split();
|
||||||
|
|
||||||
// Handler des messages entrants
|
// Handler des messages entrants
|
||||||
let incoming = rx.map(Ok).forward(ws_sender);
|
let incoming = rx.map(Ok).forward(ws_sender);
|
||||||
|
|
||||||
// Handler des messages sortants
|
// Handler des messages sortants
|
||||||
let outgoing = ws_receiver.try_for_each(|msg| async {
|
let outgoing = ws_receiver.try_for_each(|msg| async {
|
||||||
match msg {
|
match msg {
|
||||||
@ -386,7 +653,7 @@ async fn handle_connection(
|
|||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
});
|
});
|
||||||
|
|
||||||
// Exécution concurrente
|
// Exécution concurrente
|
||||||
pin_mut!(incoming, outgoing);
|
pin_mut!(incoming, outgoing);
|
||||||
future::select(incoming, outgoing).await;
|
future::select(incoming, outgoing).await;
|
||||||
@ -403,34 +670,34 @@ pub async fn process_message(message: MessageType, addr: SocketAddr) -> Result<(
|
|||||||
if handshake.version != "1.0" {
|
if handshake.version != "1.0" {
|
||||||
return Err(Error::msg("Unsupported version"));
|
return Err(Error::msg("Unsupported version"));
|
||||||
}
|
}
|
||||||
|
|
||||||
// Envoi de la confirmation
|
// Envoi de la confirmation
|
||||||
let response = MessageType::Handshake(HandshakeMessage {
|
let response = MessageType::Handshake(HandshakeMessage {
|
||||||
version: "1.0".to_string(),
|
version: "1.0".to_string(),
|
||||||
capabilities: vec!["silent_payments".to_string(), "broadcast".to_string()],
|
capabilities: vec!["silent_payments".to_string(), "broadcast".to_string()],
|
||||||
});
|
});
|
||||||
|
|
||||||
broadcast_to_peer(addr, response).await?;
|
broadcast_to_peer(addr, response).await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
MessageType::NewTx(new_tx) => {
|
MessageType::NewTx(new_tx) => {
|
||||||
// Validation et broadcast de la transaction
|
// Validation et broadcast de la transaction
|
||||||
let tx = deserialize::<Transaction>(&Vec::from_hex(&new_tx.transaction)?)?;
|
let tx = deserialize::<Transaction>(&Vec::from_hex(&new_tx.transaction)?)?;
|
||||||
|
|
||||||
let daemon = DAEMON.get().unwrap().lock_anyhow()?;
|
let daemon = DAEMON.get().unwrap().lock_anyhow()?;
|
||||||
daemon.test_mempool_accept(&tx)?;
|
daemon.test_mempool_accept(&tx)?;
|
||||||
daemon.broadcast(&tx)?;
|
daemon.broadcast(&tx)?;
|
||||||
|
|
||||||
// Notification aux autres peers
|
// Notification aux autres peers
|
||||||
broadcast_message(BroadcastType::NewTx(new_tx), Some(addr)).await?;
|
broadcast_message(BroadcastType::NewTx(new_tx), Some(addr)).await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
MessageType::Broadcast(broadcast) => {
|
MessageType::Broadcast(broadcast) => {
|
||||||
// Relay du message broadcast
|
// Relay du message broadcast
|
||||||
broadcast_message(BroadcastType::Custom(broadcast), Some(addr)).await?;
|
broadcast_message(BroadcastType::Custom(broadcast), Some(addr)).await?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
@ -470,25 +737,25 @@ pub fn lock_members() -> Result<MutexGuard<'static, HashMap<OutPoint, Member>>>
|
|||||||
|
|
||||||
pub fn add_member(outpoint: OutPoint, public_key: PublicKey) -> Result<()> {
|
pub fn add_member(outpoint: OutPoint, public_key: PublicKey) -> Result<()> {
|
||||||
let mut members = lock_members()?;
|
let mut members = lock_members()?;
|
||||||
|
|
||||||
let member = Member {
|
let member = Member {
|
||||||
outpoint,
|
outpoint,
|
||||||
public_key,
|
public_key,
|
||||||
balance: Amount::ZERO,
|
balance: Amount::ZERO,
|
||||||
last_commit: None,
|
last_commit: None,
|
||||||
};
|
};
|
||||||
|
|
||||||
members.insert(outpoint, member);
|
members.insert(outpoint, member);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn update_member_balance(outpoint: OutPoint, amount: Amount) -> Result<()> {
|
pub fn update_member_balance(outpoint: OutPoint, amount: Amount) -> Result<()> {
|
||||||
let mut members = lock_members()?;
|
let mut members = lock_members()?;
|
||||||
|
|
||||||
if let Some(member) = members.get_mut(&outpoint) {
|
if let Some(member) = members.get_mut(&outpoint) {
|
||||||
member.balance = amount;
|
member.balance = amount;
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
@ -520,17 +787,17 @@ impl StateFile {
|
|||||||
.write(true)
|
.write(true)
|
||||||
.truncate(true)
|
.truncate(true)
|
||||||
.open(&self.path)?;
|
.open(&self.path)?;
|
||||||
|
|
||||||
let stringified = serde_json::to_string(&json)?;
|
let stringified = serde_json::to_string(&json)?;
|
||||||
f.write_all(stringified.as_bytes())?;
|
f.write_all(stringified.as_bytes())?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn load(&self) -> Result<Value> {
|
fn load(&self) -> Result<Value> {
|
||||||
let mut f = fs::File::open(&self.path)?;
|
let mut f = fs::File::open(&self.path)?;
|
||||||
let mut content = vec![];
|
let mut content = vec![];
|
||||||
f.read_to_end(&mut content)?;
|
f.read_to_end(&mut content)?;
|
||||||
|
|
||||||
let res: Value = serde_json::from_slice(&content)?;
|
let res: Value = serde_json::from_slice(&content)?;
|
||||||
Ok(res)
|
Ok(res)
|
||||||
}
|
}
|
||||||
@ -542,41 +809,41 @@ impl StateFile {
|
|||||||
### 1. Initialisation du service
|
### 1. Initialisation du service
|
||||||
|
|
||||||
```
|
```
|
||||||
main() → Config::read_from_file() → Daemon::connect() →
|
main() → Config::read_from_file() → Daemon::connect() →
|
||||||
SpWallet::new() → load_persistent_state() →
|
SpWallet::new() → load_persistent_state() →
|
||||||
spawn_handlers() → TcpListener::bind() → accept_loop()
|
spawn_handlers() → TcpListener::bind() → accept_loop()
|
||||||
```
|
```
|
||||||
|
|
||||||
### 2. Traitement d'une connexion WebSocket
|
### 2. Traitement d'une connexion WebSocket
|
||||||
|
|
||||||
```
|
```
|
||||||
TcpListener::accept() → handle_connection() →
|
TcpListener::accept() → handle_connection() →
|
||||||
tokio_tungstenite::accept_async() →
|
tokio_tungstenite::accept_async() →
|
||||||
PEERMAP.insert() → message_loop()
|
PEERMAP.insert() → message_loop()
|
||||||
```
|
```
|
||||||
|
|
||||||
### 3. Traitement d'un message
|
### 3. Traitement d'un message
|
||||||
|
|
||||||
```
|
```
|
||||||
Message::Text() → serde_json::from_str() →
|
Message::Text() → serde_json::from_str() →
|
||||||
process_message() → match MessageType →
|
process_message() → match MessageType →
|
||||||
handle_specific_message() → broadcast_to_peers()
|
handle_specific_message() → broadcast_to_peers()
|
||||||
```
|
```
|
||||||
|
|
||||||
### 4. Scan des blocs
|
### 4. Scan des blocs
|
||||||
|
|
||||||
```
|
```
|
||||||
handle_zmq() → scan_blocks() →
|
handle_zmq() → scan_blocks() →
|
||||||
get_filters_from_blindbit() →
|
get_filters_from_blindbit() →
|
||||||
check_transaction_alone() →
|
check_transaction_alone() →
|
||||||
process_sp_outputs() → update_wallet()
|
process_sp_outputs() → update_wallet()
|
||||||
```
|
```
|
||||||
|
|
||||||
### 5. Broadcast de transaction
|
### 5. Broadcast de transaction
|
||||||
|
|
||||||
```
|
```
|
||||||
NewTxMessage → deserialize() →
|
NewTxMessage → deserialize() →
|
||||||
test_mempool_accept() → broadcast() →
|
test_mempool_accept() → broadcast() →
|
||||||
broadcast_message() → send_to_all_peers()
|
broadcast_message() → send_to_all_peers()
|
||||||
```
|
```
|
||||||
|
|
||||||
@ -697,9 +964,9 @@ const MAX_CONNECTIONS: usize = 1000;
|
|||||||
|
|
||||||
```rust
|
```rust
|
||||||
log::info!("Using wallet with address {}", our_sp_address);
|
log::info!("Using wallet with address {}", our_sp_address);
|
||||||
log::info!("Found {} outputs for a total balance of {}",
|
log::info!("Found {} outputs for a total balance of {}",
|
||||||
sp_wallet.get_outputs().len(), sp_wallet.get_balance());
|
sp_wallet.get_outputs().len(), sp_wallet.get_balance());
|
||||||
log::warn!("Failed to connect to Bitcoin Core (attempt {}/{}): {}",
|
log::warn!("Failed to connect to Bitcoin Core (attempt {}/{}): {}",
|
||||||
retry_count, MAX_RETRIES, e);
|
retry_count, MAX_RETRIES, e);
|
||||||
```
|
```
|
||||||
|
|
||||||
@ -731,23 +998,23 @@ TcpStream::connect(ws_url).await?;
|
|||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_config_parsing() {
|
fn test_config_parsing() {
|
||||||
let config = Config::read_from_file("test.conf").unwrap();
|
let config = Config::read_from_file("test.conf").unwrap();
|
||||||
assert_eq!(config.network, Network::Signet);
|
assert_eq!(config.network, Network::Signet);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_message_serialization() {
|
fn test_message_serialization() {
|
||||||
let message = MessageType::Handshake(HandshakeMessage {
|
let message = MessageType::Handshake(HandshakeMessage {
|
||||||
version: "1.0".to_string(),
|
version: "1.0".to_string(),
|
||||||
capabilities: vec!["silent_payments".to_string()],
|
capabilities: vec!["silent_payments".to_string()],
|
||||||
});
|
});
|
||||||
|
|
||||||
let serialized = serde_json::to_string(&message).unwrap();
|
let serialized = serde_json::to_string(&message).unwrap();
|
||||||
let deserialized: MessageType = serde_json::from_str(&serialized).unwrap();
|
let deserialized: MessageType = serde_json::from_str(&serialized).unwrap();
|
||||||
|
|
||||||
assert!(matches!(deserialized, MessageType::Handshake(_)));
|
assert!(matches!(deserialized, MessageType::Handshake(_)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -761,12 +1028,12 @@ async fn test_websocket_connection() {
|
|||||||
// Setup
|
// Setup
|
||||||
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
|
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
|
||||||
let addr = listener.local_addr().unwrap();
|
let addr = listener.local_addr().unwrap();
|
||||||
|
|
||||||
// Test
|
// Test
|
||||||
let client = tokio_tungstenite::connect_async(
|
let client = tokio_tungstenite::connect_async(
|
||||||
format!("ws://{}", addr)
|
format!("ws://{}", addr)
|
||||||
).await.unwrap();
|
).await.unwrap();
|
||||||
|
|
||||||
// Assertions
|
// Assertions
|
||||||
assert!(client.0.is_ok());
|
assert!(client.0.is_ok());
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user