sdk_relay/src/sync.rs
Nicolas Cantu 4bfc51a284 feat: Ajout du support des relais externes via external_nodes.conf
- Ajout de la fonction load_external_config() pour charger la configuration externe
- Ajout de la fonction parse_external_config() pour parser le fichier TOML
- Modification de discover_relays() pour inclure les relais externes
- Support des relais avec ancienne version (0.9.0) et capacités limitées
- Ajout du fichier EXEMPLES_PRATIQUES.md avec exemples d'utilisation
- Mise à jour de la documentation technique
2025-08-22 16:54:58 +02:00

1244 lines
42 KiB
Rust

use std::{
collections::HashMap,
net::SocketAddr,
sync::{Arc, Mutex, OnceLock},
time::{Duration, Instant, SystemTime, UNIX_EPOCH},
};
use anyhow::{Error, Result};
use log::{debug, error, info, warn};
use tokio::time;
use uuid::Uuid;
use sdk_common::{
network::{AnkFlag, Envelope},
process::lock_processes,
serialization::{OutPointMemberMap, OutPointProcessMap},
MutexExt,
};
use crate::{
commit::lock_members,
message::{broadcast_message, BroadcastType},
CHAIN_TIP, WALLET,
};
// ===== TYPES DE SYNCHRONISATION =====
/// Types de synchronisation
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub enum SyncType {
StateSync, // Synchronisation de l'état global
ProcessSync, // Synchronisation des processus
MemberSync, // Synchronisation des membres
TxSync, // Synchronisation des transactions
BlockSync, // Synchronisation des blocs
PeerSync, // Synchronisation des pairs
RelaySync, // Synchronisation des relais
HealthSync, // État de santé du relais
MetricsSync, // Métriques de performance
ConfigSync, // Synchronisation de configuration
CapabilitySync, // Synchronisation des capacités
}
impl SyncType {
pub fn as_str(&self) -> &str {
match self {
Self::StateSync => "StateSync",
Self::ProcessSync => "ProcessSync",
Self::MemberSync => "MemberSync",
Self::TxSync => "TxSync",
Self::BlockSync => "BlockSync",
Self::PeerSync => "PeerSync",
Self::RelaySync => "RelaySync",
Self::HealthSync => "HealthSync",
Self::MetricsSync => "MetricsSync",
Self::ConfigSync => "ConfigSync",
Self::CapabilitySync => "CapabilitySync",
}
}
}
/// Statut de santé d'un relais
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub enum HealthStatus {
Healthy,
Warning,
Critical,
Offline,
}
/// Informations sur un pair
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct PeerInfo {
pub address: String,
pub sp_address: String,
pub connected_since: u64,
pub last_activity: u64,
pub message_count: u64,
pub capabilities: Vec<String>,
}
/// Informations sur un relais
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct RelayInfo {
pub relay_id: String,
pub address: String,
pub sp_address: String,
pub version: String,
pub uptime: u64,
pub last_seen: u64,
pub capabilities: Vec<String>,
pub health_status: HealthStatus,
}
/// Topologie du réseau
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct NetworkTopology {
pub total_relays: u32,
pub connected_relays: u32,
pub mesh_connections: Vec<MeshConnection>,
pub network_diameter: u32,
pub avg_latency: f64,
}
/// Connexion mesh entre relais
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct MeshConnection {
pub from_relay: String,
pub to_relay: String,
pub latency: f64,
pub bandwidth: u64,
pub last_heartbeat: u64,
}
/// Contenu des messages de synchronisation
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub enum SyncPayload {
// État global
StateData {
chain_tip: u32,
wallet_balance: u64,
active_processes: u32,
connected_peers: u32,
},
// Données de processus
ProcessData {
processes: HashMap<String, String>, // Conversion simplifiée des OutPoint -> Process
last_update: u64,
},
// Données de membres
MemberData {
members: HashMap<String, String>, // Conversion simplifiée des OutPoint -> Member
last_update: u64,
},
// Données de santé
HealthData {
uptime: u64,
memory_usage: u64,
cpu_usage: f64,
active_connections: u32,
last_block_time: u64,
},
// Données de métriques
MetricsData {
messages_processed: u64,
transactions_broadcast: u64,
blocks_scanned: u32,
errors_count: u32,
avg_response_time: f64,
},
// Données de pairs
PeerData {
peers: Vec<PeerInfo>,
last_seen: u64,
},
// Données de relais
RelayData {
relays: Vec<RelayInfo>,
network_topology: NetworkTopology,
},
}
/// Message de synchronisation
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct SyncMessage {
pub sync_type: SyncType,
pub relay_id: String,
pub timestamp: u64,
pub sequence_number: u64,
pub payload: SyncPayload,
pub signature: Option<String>,
}
impl SyncMessage {
pub fn new(sync_type: SyncType, relay_id: String, payload: SyncPayload) -> Self {
Self {
sync_type,
relay_id,
timestamp: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs(),
sequence_number: 0,
payload,
signature: None,
}
}
pub fn to_string(&self) -> String {
serde_json::to_string(self).unwrap()
}
}
/// Requête de synchronisation
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct SyncRequest {
pub request_id: String,
pub relay_id: String,
pub sync_types: Vec<SyncType>,
pub since_timestamp: Option<u64>,
pub max_items: Option<u32>,
}
impl SyncRequest {
pub fn new(request_id: String, relay_id: String, sync_types: Vec<SyncType>) -> Self {
Self {
request_id,
relay_id,
sync_types,
since_timestamp: None,
max_items: None,
}
}
pub fn to_string(&self) -> String {
serde_json::to_string(self).unwrap()
}
}
/// Réponse de synchronisation
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct SyncResponse {
pub request_id: String,
pub relay_id: String,
pub success: bool,
pub messages: Vec<SyncMessage>,
pub error: Option<String>,
}
impl SyncResponse {
pub fn new(request_id: String, relay_id: String, success: bool) -> Self {
Self {
request_id,
relay_id,
success,
messages: Vec::new(),
error: None,
}
}
pub fn to_string(&self) -> String {
serde_json::to_string(self).unwrap()
}
}
// ===== GESTIONNAIRE DE SYNCHRONISATION =====
/// Gestionnaire de synchronisation pour le réseau mesh
#[derive(Debug)]
pub struct SyncManager {
relay_id: String,
sequence_counter: Arc<Mutex<u64>>,
sync_cache: Arc<Mutex<HashMap<String, Instant>>>,
last_sync: Arc<Mutex<HashMap<SyncType, u64>>>,
mesh_connections: Arc<Mutex<HashMap<String, MeshConnection>>>,
known_relays: Arc<Mutex<HashMap<String, RelayInfo>>>,
metrics: Arc<Mutex<SyncMetrics>>,
}
/// Métriques de synchronisation
#[derive(Debug, Clone)]
pub struct SyncMetrics {
pub messages_sent: u64,
pub messages_received: u64,
pub sync_requests: u64,
pub sync_responses: u64,
pub errors: u64,
pub last_sync_time: u64,
pub avg_sync_latency: f64,
}
impl SyncMetrics {
pub fn new() -> Self {
Self {
messages_sent: 0,
messages_received: 0,
sync_requests: 0,
sync_responses: 0,
errors: 0,
last_sync_time: 0,
avg_sync_latency: 0.0,
}
}
}
impl SyncManager {
pub fn new() -> Self {
let relay_id = format!("relay-{}", Uuid::new_v4().to_string().split('-').next().unwrap());
info!("Initialisation du gestionnaire de synchronisation avec ID: {}", relay_id);
Self {
relay_id,
sequence_counter: Arc::new(Mutex::new(0)),
sync_cache: Arc::new(Mutex::new(HashMap::new())),
last_sync: Arc::new(Mutex::new(HashMap::new())),
mesh_connections: Arc::new(Mutex::new(HashMap::new())),
known_relays: Arc::new(Mutex::new(HashMap::new())),
metrics: Arc::new(Mutex::new(SyncMetrics::new())),
}
}
/// Génère un numéro de séquence unique
pub fn next_sequence(&self) -> u64 {
let mut counter = self.sequence_counter.lock().unwrap();
*counter += 1;
*counter
}
/// Vérifie si un message a déjà été traité
pub fn is_duplicate(&self, message_id: &str) -> bool {
let cache = self.sync_cache.lock().unwrap();
cache.contains_key(message_id)
}
/// Ajoute un message au cache
pub fn cache_message(&self, message_id: String) {
let mut cache = self.sync_cache.lock().unwrap();
cache.insert(message_id, Instant::now());
}
/// Nettoie le cache des anciens messages
pub async fn cleanup_cache(&self) {
let mut interval = time::interval(Duration::from_secs(30));
loop {
interval.tick().await;
let mut cache = self.sync_cache.lock().unwrap();
let now = Instant::now();
let retention = Duration::from_secs(300); // 5 minutes
cache.retain(|_, timestamp| now.duration_since(*timestamp) <= retention);
debug!("Cache de synchronisation nettoyé, {} messages conservés", cache.len());
}
}
// ===== CRÉATION DE MESSAGES DE SYNCHRONISATION =====
/// Crée un message de synchronisation d'état
pub fn create_state_sync(&self) -> Result<SyncMessage> {
let chain_tip = CHAIN_TIP.load(std::sync::atomic::Ordering::SeqCst);
let wallet_balance = self.get_wallet_balance()?;
let active_processes = self.get_active_processes_count()?;
let connected_peers = self.get_connected_peers_count()?;
let payload = SyncPayload::StateData {
chain_tip,
wallet_balance,
active_processes,
connected_peers,
};
Ok(SyncMessage {
sync_type: SyncType::StateSync,
relay_id: self.relay_id.clone(),
timestamp: SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs(),
sequence_number: self.next_sequence(),
payload,
signature: None,
})
}
/// Crée un message de synchronisation de processus
pub fn create_process_sync(&self) -> Result<SyncMessage> {
let processes = lock_processes()?;
let last_update = SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs();
// Conversion simplifiée pour l'instant
let processes_map: HashMap<String, String> = processes.iter()
.map(|(k, v)| (format!("{:?}", k), format!("{:?}", v)))
.collect();
let payload = SyncPayload::ProcessData {
processes: processes_map,
last_update,
};
Ok(SyncMessage::new(SyncType::ProcessSync, self.relay_id.clone(), payload))
}
/// Crée un message de synchronisation de membres
pub fn create_member_sync(&self) -> Result<SyncMessage> {
let members = lock_members()?;
let last_update = SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs();
// Conversion simplifiée pour l'instant
let members_map: HashMap<String, String> = members.iter()
.map(|(k, v)| (format!("{:?}", k), format!("{:?}", v)))
.collect();
let payload = SyncPayload::MemberData {
members: members_map,
last_update,
};
Ok(SyncMessage::new(SyncType::MemberSync, self.relay_id.clone(), payload))
}
/// Crée un message de synchronisation de santé
pub fn create_health_sync(&self) -> Result<SyncMessage> {
let uptime = self.get_uptime()?;
let memory_usage = self.get_memory_usage()?;
let cpu_usage = self.get_cpu_usage()?;
let active_connections = self.get_connected_peers_count()?;
let last_block_time = self.get_last_block_time()?;
let payload = SyncPayload::HealthData {
uptime,
memory_usage,
cpu_usage,
active_connections,
last_block_time,
};
Ok(SyncMessage {
sync_type: SyncType::HealthSync,
relay_id: self.relay_id.clone(),
timestamp: SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs(),
sequence_number: self.next_sequence(),
payload,
signature: None,
})
}
/// Crée un message de synchronisation de métriques
pub fn create_metrics_sync(&self) -> Result<SyncMessage> {
let metrics = self.metrics.lock().unwrap();
let payload = SyncPayload::MetricsData {
messages_processed: metrics.messages_received,
transactions_broadcast: 0, // TODO: Implémenter le compteur
blocks_scanned: 0, // TODO: Implémenter le compteur
errors_count: metrics.errors as u32,
avg_response_time: metrics.avg_sync_latency,
};
Ok(SyncMessage {
sync_type: SyncType::MetricsSync,
relay_id: self.relay_id.clone(),
timestamp: SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs(),
sequence_number: self.next_sequence(),
payload,
signature: None,
})
}
/// Crée un message de synchronisation des relais (partage de la liste des relais)
pub fn create_relay_sync(&self) -> Result<SyncMessage> {
let known_relays = self.known_relays.lock().unwrap();
let mesh_connections = self.mesh_connections.lock().unwrap();
let relays: Vec<RelayInfo> = known_relays.values().cloned().collect();
let network_topology = NetworkTopology {
total_relays: known_relays.len() as u32,
connected_relays: mesh_connections.len() as u32,
mesh_connections: mesh_connections.values().cloned().collect(),
network_diameter: self.calculate_network_diameter(),
avg_latency: self.calculate_avg_latency(),
};
let payload = SyncPayload::RelayData {
relays,
network_topology,
};
Ok(SyncMessage::new(SyncType::RelaySync, self.relay_id.clone(), payload))
}
/// Crée un message de synchronisation des pairs
pub fn create_peer_sync(&self) -> Result<SyncMessage> {
let known_relays = self.known_relays.lock().unwrap();
let now = SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs();
let peers: Vec<PeerInfo> = known_relays.values()
.map(|relay| PeerInfo {
address: relay.address.clone(),
sp_address: relay.sp_address.clone(),
connected_since: now,
last_activity: relay.last_seen,
message_count: 0, // TODO: Implémenter le comptage
capabilities: relay.capabilities.clone(),
})
.collect();
let payload = SyncPayload::PeerData {
peers,
last_seen: SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs(),
};
Ok(SyncMessage::new(SyncType::PeerSync, self.relay_id.clone(), payload))
}
// ===== TRAITEMENT DES MESSAGES DE SYNCHRONISATION =====
/// Traite un message de synchronisation reçu
pub fn process_sync_message(&self, sync_msg: SyncMessage, sender_addr: SocketAddr) -> Result<()> {
let message_id = format!("{}-{}-{}", sync_msg.relay_id, sync_msg.sync_type.as_str(), sync_msg.sequence_number);
if self.is_duplicate(&message_id) {
debug!("Message de synchronisation en double ignoré: {}", message_id);
return Ok(());
}
self.cache_message(message_id);
self.update_metrics_received();
info!("Traitement du message de synchronisation {} de {}", sync_msg.sync_type.as_str(), sync_msg.relay_id);
match sync_msg.sync_type {
SyncType::StateSync => self.handle_state_sync(&sync_msg, sender_addr)?,
SyncType::ProcessSync => self.handle_process_sync(&sync_msg, sender_addr)?,
SyncType::MemberSync => self.handle_member_sync(&sync_msg, sender_addr)?,
SyncType::HealthSync => self.handle_health_sync(&sync_msg, sender_addr)?,
SyncType::MetricsSync => self.handle_metrics_sync(&sync_msg, sender_addr)?,
SyncType::PeerSync => self.handle_peer_sync(&sync_msg, sender_addr)?,
SyncType::RelaySync => self.handle_relay_sync(&sync_msg, sender_addr)?,
_ => {
warn!("Type de synchronisation non implémenté: {:?}", sync_msg.sync_type);
}
}
// Relay du message aux autres pairs
self.relay_sync_message(sync_msg, sender_addr)?;
Ok(())
}
/// Traite une requête de synchronisation
pub fn process_sync_request(&self, request: SyncRequest, sender_addr: SocketAddr) -> Result<()> {
info!("Traitement de la requête de synchronisation de {}", request.relay_id);
let mut response = SyncResponse::new(
request.request_id.clone(),
self.relay_id.clone(),
true,
);
for sync_type in &request.sync_types {
match sync_type {
SyncType::StateSync => {
if let Ok(sync_msg) = self.create_state_sync() {
response.messages.push(sync_msg);
}
}
SyncType::ProcessSync => {
if let Ok(sync_msg) = self.create_process_sync() {
response.messages.push(sync_msg);
}
}
SyncType::MemberSync => {
if let Ok(sync_msg) = self.create_member_sync() {
response.messages.push(sync_msg);
}
}
SyncType::HealthSync => {
if let Ok(sync_msg) = self.create_health_sync() {
response.messages.push(sync_msg);
}
}
SyncType::MetricsSync => {
if let Ok(sync_msg) = self.create_metrics_sync() {
response.messages.push(sync_msg);
}
}
_ => {
warn!("Type de synchronisation non supporté dans la requête: {:?}", sync_type);
}
}
}
// Envoi de la réponse
self.send_sync_response(response, sender_addr)?;
Ok(())
}
// ===== HANDLERS SPÉCIFIQUES =====
fn handle_state_sync(&self, sync_msg: &SyncMessage, sender_addr: SocketAddr) -> Result<()> {
if let SyncPayload::StateData { chain_tip, wallet_balance, active_processes, connected_peers } = &sync_msg.payload {
debug!("État reçu de {}: chain_tip={}, balance={}, processes={}, peers={}",
sync_msg.relay_id, chain_tip, wallet_balance, active_processes, connected_peers);
// Mise à jour de la topologie du réseau
self.update_mesh_connection(&sync_msg.relay_id, sender_addr)?;
}
Ok(())
}
fn handle_process_sync(&self, sync_msg: &SyncMessage, _sender_addr: SocketAddr) -> Result<()> {
if let SyncPayload::ProcessData { processes, last_update } = &sync_msg.payload {
debug!("Processus reçus de {}: {} processus, mis à jour à {}",
sync_msg.relay_id, processes.len(), last_update);
// TODO: Fusionner les processus si nécessaire
}
Ok(())
}
fn handle_member_sync(&self, sync_msg: &SyncMessage, _sender_addr: SocketAddr) -> Result<()> {
if let SyncPayload::MemberData { members, last_update } = &sync_msg.payload {
debug!("Membres reçus de {}: {} membres, mis à jour à {}",
sync_msg.relay_id, members.len(), last_update);
// TODO: Fusionner les membres si nécessaire
}
Ok(())
}
fn handle_health_sync(&self, sync_msg: &SyncMessage, sender_addr: SocketAddr) -> Result<()> {
if let SyncPayload::HealthData { uptime, memory_usage, cpu_usage, active_connections, last_block_time: _ } = &sync_msg.payload {
debug!("Santé reçue de {}: uptime={}s, mem={}MB, cpu={}%, connections={}",
sync_msg.relay_id, uptime, memory_usage / 1024 / 1024, cpu_usage, active_connections);
// Mise à jour de la topologie avec les informations de santé
self.update_relay_health(&sync_msg.relay_id, sender_addr, *uptime, *cpu_usage)?;
}
Ok(())
}
fn handle_metrics_sync(&self, sync_msg: &SyncMessage, _sender_addr: SocketAddr) -> Result<()> {
if let SyncPayload::MetricsData { messages_processed, transactions_broadcast, blocks_scanned, errors_count, avg_response_time } = &sync_msg.payload {
debug!("Métriques reçues de {}: msgs={}, txs={}, blocks={}, errors={}, avg_time={}ms",
sync_msg.relay_id, messages_processed, transactions_broadcast, blocks_scanned, errors_count, avg_response_time);
}
Ok(())
}
fn handle_peer_sync(&self, sync_msg: &SyncMessage, _sender_addr: SocketAddr) -> Result<()> {
if let SyncPayload::PeerData { peers, last_seen } = &sync_msg.payload {
debug!("Pairs reçus de {}: {} pairs, dernière vue à {}",
sync_msg.relay_id, peers.len(), last_seen);
// TODO: Mettre à jour la liste des pairs
}
Ok(())
}
fn handle_relay_sync(&self, sync_msg: &SyncMessage, sender_addr: SocketAddr) -> Result<()> {
if let SyncPayload::RelayData { relays, network_topology } = &sync_msg.payload {
debug!("Relais reçus de {}: {} relais, topologie: {} relais connectés",
sync_msg.relay_id, relays.len(), network_topology.connected_relays);
// Mise à jour de la liste des relais connus
self.update_known_relays(relays)?;
// Mise à jour de la topologie du réseau
self.update_network_topology(network_topology)?;
// Mettre à jour la connexion avec le relais émetteur
self.update_mesh_connection(&sync_msg.relay_id, sender_addr)?;
}
Ok(())
}
// ===== UTILITAIRES =====
fn relay_sync_message(&self, sync_msg: SyncMessage, sender_addr: SocketAddr) -> Result<()> {
let envelope = Envelope {
flag: AnkFlag::Sync,
content: sync_msg.to_string(),
};
broadcast_message(
AnkFlag::Sync,
envelope.content,
BroadcastType::ExcludeSender(sender_addr),
)?;
self.update_metrics_sent();
Ok(())
}
fn send_sync_response(&self, response: SyncResponse, sender_addr: SocketAddr) -> Result<()> {
let envelope = Envelope {
flag: AnkFlag::Sync,
content: response.to_string(),
};
broadcast_message(
AnkFlag::Sync,
envelope.content,
BroadcastType::Sender(sender_addr),
)?;
self.update_metrics_sent();
Ok(())
}
fn update_mesh_connection(&self, relay_id: &str, _addr: SocketAddr) -> Result<()> {
let mut connections = self.mesh_connections.lock().unwrap();
let connection = MeshConnection {
from_relay: self.relay_id.clone(),
to_relay: relay_id.to_string(),
latency: 0.0, // TODO: Mesurer la latence
bandwidth: 0, // TODO: Mesurer la bande passante
last_heartbeat: SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs(),
};
connections.insert(relay_id.to_string(), connection);
Ok(())
}
fn update_relay_health(&self, relay_id: &str, _addr: SocketAddr, uptime: u64, cpu_usage: f64) -> Result<()> {
// TODO: Mettre à jour les informations de santé du relais
debug!("Mise à jour de la santé du relais {}: uptime={}s, cpu={}%", relay_id, uptime, cpu_usage);
// Mettre à jour les informations de santé dans known_relays
let mut known_relays = self.known_relays.lock().unwrap();
if let Some(relay_info) = known_relays.get_mut(relay_id) {
relay_info.uptime = uptime;
relay_info.last_seen = SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs();
relay_info.health_status = if cpu_usage > 80.0 {
HealthStatus::Critical
} else if cpu_usage > 60.0 {
HealthStatus::Warning
} else {
HealthStatus::Healthy
};
}
Ok(())
}
/// Met à jour la liste des relais connus
fn update_known_relays(&self, relays: &[RelayInfo]) -> Result<()> {
let mut known_relays = self.known_relays.lock().unwrap();
for relay in relays {
// Ne pas s'ajouter soi-même
if relay.relay_id != self.relay_id {
known_relays.insert(relay.relay_id.clone(), relay.clone());
debug!("Relais ajouté/mis à jour: {} ({})", relay.relay_id, relay.address);
}
}
info!("Liste des relais mise à jour: {} relais connus", known_relays.len());
Ok(())
}
/// Met à jour la topologie du réseau
fn update_network_topology(&self, topology: &NetworkTopology) -> Result<()> {
let mut mesh_connections = self.mesh_connections.lock().unwrap();
// Mettre à jour les connexions mesh
for conn in &topology.mesh_connections {
if conn.from_relay == self.relay_id || conn.to_relay == self.relay_id {
continue; // Ignorer les connexions qui nous concernent directement
}
let key = format!("{}-{}", conn.from_relay, conn.to_relay);
mesh_connections.insert(key, conn.clone());
}
debug!("Topologie réseau mise à jour: {} connexions mesh", mesh_connections.len());
Ok(())
}
/// Calcule le diamètre du réseau (distance max entre deux nœuds)
fn calculate_network_diameter(&self) -> u32 {
let known_relays = self.known_relays.lock().unwrap();
// Implémentation simplifiée - dans un vrai graphe, utiliser un algorithme comme Floyd-Warshall
if known_relays.len() <= 1 {
0
} else {
// Estimation basée sur la racine carrée du nombre de nœuds
((known_relays.len() as f64).sqrt().ceil() as u32).max(1)
}
}
/// Calcule la latence moyenne du réseau
fn calculate_avg_latency(&self) -> f64 {
let mesh_connections = self.mesh_connections.lock().unwrap();
if mesh_connections.is_empty() {
0.0
} else {
let total_latency: f64 = mesh_connections.values()
.map(|conn| conn.latency)
.sum();
total_latency / mesh_connections.len() as f64
}
}
// ===== MÉTRIQUES =====
fn update_metrics_sent(&self) {
let mut metrics = self.metrics.lock().unwrap();
metrics.messages_sent += 1;
metrics.last_sync_time = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
}
fn update_metrics_received(&self) {
let mut metrics = self.metrics.lock().unwrap();
metrics.messages_received += 1;
}
// ===== OBTENTION DES DONNÉES SYSTÈME =====
fn get_wallet_balance(&self) -> Result<u64> {
let _wallet = WALLET.get().ok_or(Error::msg("Wallet non initialisé"))?.lock_anyhow()?;
// TODO: Implémenter la récupération du solde
Ok(0)
}
fn get_active_processes_count(&self) -> Result<u32> {
let processes = lock_processes()?;
Ok(processes.len() as u32)
}
fn get_connected_peers_count(&self) -> Result<u32> {
// TODO: Implémenter l'accès aux pairs connectés
Ok(0)
}
fn get_uptime(&self) -> Result<u64> {
// TODO: Implémenter le calcul de l'uptime
Ok(SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs())
}
fn get_memory_usage(&self) -> Result<u64> {
// TODO: Implémenter la récupération de l'utilisation mémoire
Ok(0)
}
fn get_cpu_usage(&self) -> Result<f64> {
// TODO: Implémenter la récupération de l'utilisation CPU
Ok(0.0)
}
fn get_last_block_time(&self) -> Result<u64> {
// TODO: Implémenter la récupération du temps du dernier bloc
Ok(SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs())
}
// ===== API PUBLIQUE =====
/// Démarre la synchronisation périodique
pub async fn start_periodic_sync(&self) {
let mut interval = time::interval(Duration::from_secs(60)); // Toutes les minutes
info!("Démarrage de la synchronisation périodique");
loop {
interval.tick().await;
if let Err(e) = self.perform_periodic_sync().await {
error!("Erreur lors de la synchronisation périodique: {}", e);
}
}
}
/// Effectue une synchronisation périodique
async fn perform_periodic_sync(&self) -> Result<()> {
debug!("Synchronisation périodique en cours...");
// Envoi des messages de santé
if let Ok(health_msg) = self.create_health_sync() {
let envelope = Envelope {
flag: AnkFlag::Sync,
content: health_msg.to_string(),
};
broadcast_message(
AnkFlag::Sync,
envelope.content,
BroadcastType::ToAll,
)?;
}
// Envoi des métriques
if let Ok(metrics_msg) = self.create_metrics_sync() {
let envelope = Envelope {
flag: AnkFlag::Sync,
content: metrics_msg.to_string(),
};
broadcast_message(
AnkFlag::Sync,
envelope.content,
BroadcastType::ToAll,
)?;
}
// Envoi de la liste des relais (moins fréquent - toutes les 5 minutes)
static mut LAST_RELAY_SYNC: u64 = 0;
let now = SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs();
unsafe {
if now - LAST_RELAY_SYNC > 300 { // 5 minutes
if let Ok(relay_msg) = self.create_relay_sync() {
let envelope = Envelope {
flag: AnkFlag::Sync,
content: relay_msg.to_string(),
};
broadcast_message(
AnkFlag::Sync,
envelope.content,
BroadcastType::ToAll,
)?;
LAST_RELAY_SYNC = now;
debug!("Liste des relais partagée avec le réseau");
}
}
}
Ok(())
}
/// Envoie une requête de synchronisation à un pair
pub fn request_sync(&self, sync_types: Vec<SyncType>, target_addr: SocketAddr) -> Result<()> {
let request_id = Uuid::new_v4().to_string();
let request = SyncRequest::new(request_id, self.relay_id.clone(), sync_types);
let envelope = Envelope {
flag: AnkFlag::Sync,
content: request.to_string(),
};
broadcast_message(
AnkFlag::Sync,
envelope.content,
BroadcastType::Sender(target_addr),
)?;
self.update_metrics_sent();
Ok(())
}
/// Obtient les métriques de synchronisation
pub fn get_metrics(&self) -> SyncMetrics {
self.metrics.lock().unwrap().clone()
}
/// Obtient l'ID du relais
pub fn get_relay_id(&self) -> String {
self.relay_id.clone()
}
/// Obtient la liste des relais connus
pub fn get_known_relays(&self) -> Vec<RelayInfo> {
self.known_relays.lock().unwrap().values().cloned().collect()
}
/// Obtient les informations de topologie du réseau
pub fn get_network_topology(&self) -> NetworkTopology {
let known_relays = self.known_relays.lock().unwrap();
let mesh_connections = self.mesh_connections.lock().unwrap();
NetworkTopology {
total_relays: known_relays.len() as u32 + 1, // +1 pour nous-mêmes
connected_relays: mesh_connections.len() as u32,
mesh_connections: mesh_connections.values().cloned().collect(),
network_diameter: self.calculate_network_diameter(),
avg_latency: self.calculate_avg_latency(),
}
}
/// Ajoute un nouveau relais à la liste des relais connus
pub fn add_relay(&self, relay_info: RelayInfo) -> Result<()> {
if relay_info.relay_id == self.relay_id {
return Ok(()) // Ne pas s'ajouter soi-même
}
let mut known_relays = self.known_relays.lock().unwrap();
known_relays.insert(relay_info.relay_id.clone(), relay_info.clone());
info!("Nouveau relais ajouté: {} ({})", relay_info.relay_id, relay_info.address);
Ok(())
}
/// Découvre automatiquement les autres relais dans le réseau
pub async fn discover_relays(&self) -> Result<()> {
info!("🔍 Découverte automatique des relais...");
// Découverte locale (basée sur les noms de conteneurs Docker)
let relay_hosts = vec![
"sdk_relay_1",
"sdk_relay_2",
"sdk_relay_3",
];
for host in relay_hosts {
if host == self.relay_id {
continue; // Ignorer soi-même
}
let relay_info = RelayInfo {
relay_id: host.to_string(),
address: format!("{}:8090", host),
sp_address: "".to_string(), // Sera rempli lors de la synchronisation
version: "1.0.0".to_string(),
uptime: 0,
last_seen: SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs(),
capabilities: vec!["sync".to_string(), "mesh".to_string()],
health_status: HealthStatus::Healthy,
};
self.add_relay(relay_info)?;
}
// Découverte externe (depuis external_nodes.conf)
if let Ok(external_relays) = self.load_external_config() {
info!("🌐 Chargement de {} relais externes", external_relays.len());
for (relay_id, address) in external_relays {
let relay_info = RelayInfo {
relay_id: relay_id.clone(),
address: address.clone(),
sp_address: "".to_string(), // Sera rempli lors de la synchronisation
version: "0.9.0".to_string(), // Ancienne version pour compatibilité
uptime: 0,
last_seen: SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs(),
capabilities: vec!["basic".to_string()], // Capacités limitées
health_status: HealthStatus::Healthy,
};
self.add_relay(relay_info)?;
info!("✅ Relais externe ajouté: {} -> {}", relay_id, address);
}
} else {
info!("⚠️ Aucun fichier de configuration externe trouvé");
}
info!("✅ Découverte terminée: {} relais connus", self.known_relays.lock().unwrap().len());
Ok(())
}
/// Charge la configuration des relais externes
fn load_external_config(&self) -> Result<HashMap<String, String>> {
use std::fs;
use std::path::Path;
// Chercher le fichier de configuration
let config_paths = vec![
"external_nodes.conf",
"sdk_relay/external_nodes.conf",
"../sdk_relay/external_nodes.conf",
];
for path in config_paths {
if Path::new(path).exists() {
info!("📁 Chargement de la configuration depuis: {}", path);
return self.parse_external_config(path);
}
}
Err(anyhow::anyhow!("Aucun fichier de configuration externe trouvé"))
}
/// Parse le fichier de configuration externe
fn parse_external_config(&self, path: &str) -> Result<HashMap<String, String>> {
use std::fs;
use std::io::{BufRead, BufReader};
let file = fs::File::open(path)?;
let reader = BufReader::new(file);
let mut relays = HashMap::new();
let mut in_relays_section = false;
for line in reader.lines() {
let line = line?;
let line = line.trim();
// Ignorer les commentaires et lignes vides
if line.is_empty() || line.starts_with('#') {
continue;
}
// Détecter la section [relays]
if line == "[relays]" {
in_relays_section = true;
continue;
}
// Sortir de la section [relays] si on rencontre une autre section
if line.starts_with('[') && line != "[relays]" {
in_relays_section = false;
continue;
}
// Parser les entrées de relais
if in_relays_section && line.contains('=') {
let parts: Vec<&str> = line.split('=').collect();
if parts.len() == 2 {
let relay_id = parts[0].trim();
let address = parts[1].trim().trim_matches('"');
if !relay_id.is_empty() && !address.is_empty() {
relays.insert(relay_id.to_string(), address.to_string());
}
}
}
}
Ok(relays)
}
}
// ===== INSTANCE GLOBALE =====
pub static SYNC_MANAGER: OnceLock<SyncManager> = OnceLock::new();
pub fn get_sync_manager() -> &'static SyncManager {
SYNC_MANAGER.get().expect("SyncManager non initialisé")
}
// ===== TRAITEMENT DES MESSAGES =====
/// Traite un message de synchronisation reçu
pub fn process_sync_message(raw_msg: &str, addr: SocketAddr) -> Result<()> {
let sync_manager = get_sync_manager();
// Tentative de parsing comme message de synchronisation
if let Ok(sync_msg) = serde_json::from_str::<SyncMessage>(raw_msg) {
return sync_manager.process_sync_message(sync_msg, addr);
}
// Tentative de parsing comme requête de synchronisation
if let Ok(sync_request) = serde_json::from_str::<SyncRequest>(raw_msg) {
return sync_manager.process_sync_request(sync_request, addr);
}
// Tentative de parsing comme réponse de synchronisation
if let Ok(sync_response) = serde_json::from_str::<SyncResponse>(raw_msg) {
debug!("Réponse de synchronisation reçue de {}: {} messages",
sync_response.relay_id, sync_response.messages.len());
return Ok(());
}
warn!("Message de synchronisation non reconnu: {}", raw_msg);
Ok(())
}
// ===== FONCTIONS DE TEST ET DÉMONSTRATION =====
/// Fonction de test pour démontrer la synchronisation
pub async fn test_sync_system() -> Result<()> {
info!("🧪 Démarrage du test du système de synchronisation");
let sync_manager = get_sync_manager();
let relay_id = sync_manager.get_relay_id();
info!("📡 Relay ID: {}", relay_id);
// Test 1: Création d'un message de synchronisation d'état
info!("📊 Test 1: Création d'un message de synchronisation d'état");
match sync_manager.create_state_sync() {
Ok(state_msg) => {
info!("✅ Message d'état créé: {}", state_msg.to_string());
}
Err(e) => {
error!("❌ Erreur lors de la création du message d'état: {}", e);
}
}
// Test 2: Création d'un message de synchronisation de santé
info!("🏥 Test 2: Création d'un message de synchronisation de santé");
match sync_manager.create_health_sync() {
Ok(health_msg) => {
info!("✅ Message de santé créé: {}", health_msg.to_string());
}
Err(e) => {
error!("❌ Erreur lors de la création du message de santé: {}", e);
}
}
// Test 3: Création d'un message de synchronisation de métriques
info!("📈 Test 3: Création d'un message de synchronisation de métriques");
match sync_manager.create_metrics_sync() {
Ok(metrics_msg) => {
info!("✅ Message de métriques créé: {}", metrics_msg.to_string());
}
Err(e) => {
error!("❌ Erreur lors de la création du message de métriques: {}", e);
}
}
// Test 4: Simulation de réception d'un message
info!("🔄 Test 4: Simulation de réception d'un message");
let test_sync_msg = SyncMessage {
sync_type: SyncType::StateSync,
relay_id: "test-relay-123".to_string(),
timestamp: SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs(),
sequence_number: 1,
payload: SyncPayload::StateData {
chain_tip: 1000,
wallet_balance: 50000,
active_processes: 5,
connected_peers: 3,
},
signature: None,
};
let test_addr: SocketAddr = "127.0.0.1:8080".parse()?;
match sync_manager.process_sync_message(test_sync_msg, test_addr) {
Ok(_) => {
info!("✅ Message de test traité avec succès");
}
Err(e) => {
error!("❌ Erreur lors du traitement du message de test: {}", e);
}
}
// Test 5: Affichage des métriques
info!("📊 Test 5: Affichage des métriques de synchronisation");
let metrics = sync_manager.get_metrics();
info!("📈 Métriques actuelles:");
info!(" - Messages envoyés: {}", metrics.messages_sent);
info!(" - Messages reçus: {}", metrics.messages_received);
info!(" - Erreurs: {}", metrics.errors);
info!(" - Dernière synchronisation: {}", metrics.last_sync_time);
info!("🎉 Test du système de synchronisation terminé avec succès !");
Ok(())
}
/// Fonction pour démarrer un test périodique de synchronisation
pub async fn start_sync_test_loop() {
info!("🚀 Démarrage de la boucle de test de synchronisation");
let mut interval = time::interval(Duration::from_secs(30)); // Toutes les 30 secondes
loop {
interval.tick().await;
if let Err(e) = test_sync_system().await {
error!("❌ Erreur lors du test de synchronisation: {}", e);
}
}
}