#!/usr/bin/env python3 """ Script de test des messages WebSocket de synchronisation entre les relais sdk_relay """ import asyncio import websockets import json import time import uuid from datetime import datetime from typing import Dict, List, Optional # Configuration des relais RELAYS = [ {"id": "relay-1", "port": 8090, "name": "Relais 1"}, {"id": "relay-2", "port": 8092, "name": "Relais 2"}, {"id": "relay-3", "port": 8094, "name": "Relais 3"} ] class RelayTester: def __init__(self): self.connections: Dict[str, websockets.WebSocketServerProtocol] = {} self.messages_sent: List[Dict] = [] self.messages_received: List[Dict] = [] self.test_results: Dict[str, bool] = {} async def connect_to_relay(self, relay: Dict) -> bool: """Se connecter à un relais via WebSocket""" try: uri = f"ws://localhost:{relay['port']}" print(f"🔌 Connexion à {relay['name']} sur {uri}...") websocket = await websockets.connect(uri, timeout=10) self.connections[relay['id']] = websocket print(f"✅ Connecté à {relay['name']}") return True except Exception as e: print(f"❌ Échec de connexion à {relay['name']}: {e}") return False async def disconnect_from_relay(self, relay_id: str): """Se déconnecter d'un relais""" if relay_id in self.connections: await self.connections[relay_id].close() del self.connections[relay_id] print(f"🔌 Déconnecté de {relay_id}") async def send_sync_message(self, relay_id: str, message_type: str, payload: Dict = None) -> bool: """Envoyer un message de synchronisation à un relais""" if relay_id not in self.connections: print(f"❌ Pas de connexion à {relay_id}") return False try: message = { "flag": "Sync", "content": { "type": message_type, "relay_id": f"test-client-{uuid.uuid4().hex[:8]}", "timestamp": int(time.time()), "sequence": len(self.messages_sent) + 1, "payload": payload or { "test": True, "message": f"Test {message_type} message", "timestamp": int(time.time()) } } } message_json = json.dumps(message) await self.connections[relay_id].send(message_json) self.messages_sent.append({ "relay_id": relay_id, "message": message, "timestamp": datetime.now() }) print(f"📤 Message {message_type} envoyé à {relay_id}") return True except Exception as e: print(f"❌ Erreur lors de l'envoi à {relay_id}: {e}") return False async def listen_for_messages(self, relay_id: str, timeout: int = 5): """Écouter les messages d'un relais""" if relay_id not in self.connections: return try: print(f"👂 Écoute des messages de {relay_id}...") # Attendre un message avec timeout message = await asyncio.wait_for( self.connections[relay_id].recv(), timeout=timeout ) try: parsed_message = json.loads(message) self.messages_received.append({ "relay_id": relay_id, "message": parsed_message, "timestamp": datetime.now() }) print(f"📥 Message reçu de {relay_id}:") print(f" Type: {parsed_message.get('flag', 'Unknown')}") print(f" Contenu: {json.dumps(parsed_message.get('content', {}), indent=2)}") except json.JSONDecodeError: print(f"📥 Message non-JSON reçu de {relay_id}: {message}") except asyncio.TimeoutError: print(f"⏰ Timeout - Aucun message reçu de {relay_id}") except Exception as e: print(f"❌ Erreur lors de l'écoute de {relay_id}: {e}") async def test_relay_discovery(self): """Test de découverte de relais""" print("\n🔍 Test de découverte de relais") print("=" * 40) for relay in RELAYS: success = await self.send_sync_message( relay['id'], "RelaySync", { "discovery": True, "relay_info": { "id": relay['id'], "port": relay['port'], "capabilities": ["sync", "health", "metrics"] } } ) self.test_results[f"discovery_{relay['id']}"] = success if success: await self.listen_for_messages(relay['id'], timeout=3) async def test_health_sync(self): """Test de synchronisation de santé""" print("\n🏥 Test de synchronisation de santé") print("=" * 40) for relay in RELAYS: success = await self.send_sync_message( relay['id'], "HealthSync", { "health_status": { "status": "healthy", "uptime": int(time.time()), "memory_usage": 1024, "cpu_usage": 5.2 } } ) self.test_results[f"health_{relay['id']}"] = success if success: await self.listen_for_messages(relay['id'], timeout=3) async def test_metrics_sync(self): """Test de synchronisation de métriques""" print("\n📊 Test de synchronisation de métriques") print("=" * 40) for relay in RELAYS: success = await self.send_sync_message( relay['id'], "MetricsSync", { "metrics": { "messages_processed": 100, "connections_active": 5, "sync_requests": 25, "errors": 0 } } ) self.test_results[f"metrics_{relay['id']}"] = success if success: await self.listen_for_messages(relay['id'], timeout=3) async def test_mesh_communication(self): """Test de communication en mesh entre relais""" print("\n🕸️ Test de communication en mesh") print("=" * 40) # Envoyer un message de chaque relais vers les autres for source_relay in RELAYS: print(f"\n📡 {source_relay['name']} envoie des messages aux autres relais...") for target_relay in RELAYS: if source_relay['id'] != target_relay['id']: success = await self.send_sync_message( source_relay['id'], "MeshSync", { "target_relay": target_relay['id'], "mesh_message": { "source": source_relay['id'], "destination": target_relay['id'], "data": f"Message mesh de {source_relay['id']} vers {target_relay['id']}" } } ) self.test_results[f"mesh_{source_relay['id']}_to_{target_relay['id']}"] = success def print_test_summary(self): """Afficher le résumé des tests""" print("\n📋 Résumé des tests") print("=" * 40) total_tests = len(self.test_results) successful_tests = sum(1 for success in self.test_results.values() if success) print(f"Tests réussis: {successful_tests}/{total_tests}") print(f"Taux de réussite: {(successful_tests/total_tests)*100:.1f}%") print("\nDétail par test:") for test_name, success in self.test_results.items(): status = "✅" if success else "❌" print(f" {status} {test_name}") print(f"\nMessages envoyés: {len(self.messages_sent)}") print(f"Messages reçus: {len(self.messages_received)}") async def run_all_tests(self): """Exécuter tous les tests""" print("🧪 Test des Messages WebSocket de Synchronisation") print("=" * 55) print(f"Démarrage: {datetime.now()}") print("") # 1. Connexion aux relais print("🔌 Connexion aux relais...") connection_results = [] for relay in RELAYS: success = await self.connect_to_relay(relay) connection_results.append(success) if not any(connection_results): print("❌ Aucune connexion réussie. Arrêt des tests.") return print(f"✅ {sum(connection_results)}/{len(RELAYS)} relais connectés") # 2. Tests de synchronisation await self.test_relay_discovery() await asyncio.sleep(2) await self.test_health_sync() await asyncio.sleep(2) await self.test_metrics_sync() await asyncio.sleep(2) await self.test_mesh_communication() await asyncio.sleep(2) # 3. Déconnexion print("\n🔌 Déconnexion des relais...") for relay in RELAYS: await self.disconnect_from_relay(relay['id']) # 4. Résumé self.print_test_summary() print(f"\n🏁 Tests terminés: {datetime.now()}") async def main(): """Fonction principale""" tester = RelayTester() await tester.run_all_tests() if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: print("\n⏹️ Tests interrompus par l'utilisateur") except Exception as e: print(f"\n❌ Erreur lors des tests: {e}")