4NK_node/tests/connectivity/test_websocket_messages.py
Nicolas Cantu b935cbab20 Réorganisation complète : tests, documentation et nettoyage
- Réorganisation des tests par catégorie (unit, integration, connectivity, external)
- Création de scripts d'exécution automatisés pour les tests
- Création de guides techniques complets (ARCHITECTURE.md, API.md)
- Transfert des informations depuis specs/ vers docs/
- Nettoyage et archivage des fichiers obsolètes
- Documentation complète des tests avec exemples
- Scripts de maintenance et nettoyage automatique
- Structure professionnelle prête pour l'évolution
2025-08-25 14:13:26 +02:00

293 lines
10 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Script de test des messages WebSocket de synchronisation entre les relais sdk_relay
"""
import asyncio
import websockets
import json
import time
import uuid
from datetime import datetime
from typing import Dict, List, Optional
# Configuration des relais
RELAYS = [
{"id": "relay-1", "port": 8090, "name": "Relais 1"},
{"id": "relay-2", "port": 8092, "name": "Relais 2"},
{"id": "relay-3", "port": 8094, "name": "Relais 3"}
]
class RelayTester:
def __init__(self):
self.connections: Dict[str, websockets.WebSocketServerProtocol] = {}
self.messages_sent: List[Dict] = []
self.messages_received: List[Dict] = []
self.test_results: Dict[str, bool] = {}
async def connect_to_relay(self, relay: Dict) -> bool:
"""Se connecter à un relais via WebSocket"""
try:
uri = f"ws://localhost:{relay['port']}"
print(f"🔌 Connexion à {relay['name']} sur {uri}...")
websocket = await websockets.connect(uri, timeout=10)
self.connections[relay['id']] = websocket
print(f"✅ Connecté à {relay['name']}")
return True
except Exception as e:
print(f"❌ Échec de connexion à {relay['name']}: {e}")
return False
async def disconnect_from_relay(self, relay_id: str):
"""Se déconnecter d'un relais"""
if relay_id in self.connections:
await self.connections[relay_id].close()
del self.connections[relay_id]
print(f"🔌 Déconnecté de {relay_id}")
async def send_sync_message(self, relay_id: str, message_type: str, payload: Dict = None) -> bool:
"""Envoyer un message de synchronisation à un relais"""
if relay_id not in self.connections:
print(f"❌ Pas de connexion à {relay_id}")
return False
try:
message = {
"flag": "Sync",
"content": {
"type": message_type,
"relay_id": f"test-client-{uuid.uuid4().hex[:8]}",
"timestamp": int(time.time()),
"sequence": len(self.messages_sent) + 1,
"payload": payload or {
"test": True,
"message": f"Test {message_type} message",
"timestamp": int(time.time())
}
}
}
message_json = json.dumps(message)
await self.connections[relay_id].send(message_json)
self.messages_sent.append({
"relay_id": relay_id,
"message": message,
"timestamp": datetime.now()
})
print(f"📤 Message {message_type} envoyé à {relay_id}")
return True
except Exception as e:
print(f"❌ Erreur lors de l'envoi à {relay_id}: {e}")
return False
async def listen_for_messages(self, relay_id: str, timeout: int = 5):
"""Écouter les messages d'un relais"""
if relay_id not in self.connections:
return
try:
print(f"👂 Écoute des messages de {relay_id}...")
# Attendre un message avec timeout
message = await asyncio.wait_for(
self.connections[relay_id].recv(),
timeout=timeout
)
try:
parsed_message = json.loads(message)
self.messages_received.append({
"relay_id": relay_id,
"message": parsed_message,
"timestamp": datetime.now()
})
print(f"📥 Message reçu de {relay_id}:")
print(f" Type: {parsed_message.get('flag', 'Unknown')}")
print(f" Contenu: {json.dumps(parsed_message.get('content', {}), indent=2)}")
except json.JSONDecodeError:
print(f"📥 Message non-JSON reçu de {relay_id}: {message}")
except asyncio.TimeoutError:
print(f"⏰ Timeout - Aucun message reçu de {relay_id}")
except Exception as e:
print(f"❌ Erreur lors de l'écoute de {relay_id}: {e}")
async def test_relay_discovery(self):
"""Test de découverte de relais"""
print("\n🔍 Test de découverte de relais")
print("=" * 40)
for relay in RELAYS:
success = await self.send_sync_message(
relay['id'],
"RelaySync",
{
"discovery": True,
"relay_info": {
"id": relay['id'],
"port": relay['port'],
"capabilities": ["sync", "health", "metrics"]
}
}
)
self.test_results[f"discovery_{relay['id']}"] = success
if success:
await self.listen_for_messages(relay['id'], timeout=3)
async def test_health_sync(self):
"""Test de synchronisation de santé"""
print("\n🏥 Test de synchronisation de santé")
print("=" * 40)
for relay in RELAYS:
success = await self.send_sync_message(
relay['id'],
"HealthSync",
{
"health_status": {
"status": "healthy",
"uptime": int(time.time()),
"memory_usage": 1024,
"cpu_usage": 5.2
}
}
)
self.test_results[f"health_{relay['id']}"] = success
if success:
await self.listen_for_messages(relay['id'], timeout=3)
async def test_metrics_sync(self):
"""Test de synchronisation de métriques"""
print("\n📊 Test de synchronisation de métriques")
print("=" * 40)
for relay in RELAYS:
success = await self.send_sync_message(
relay['id'],
"MetricsSync",
{
"metrics": {
"messages_processed": 100,
"connections_active": 5,
"sync_requests": 25,
"errors": 0
}
}
)
self.test_results[f"metrics_{relay['id']}"] = success
if success:
await self.listen_for_messages(relay['id'], timeout=3)
async def test_mesh_communication(self):
"""Test de communication en mesh entre relais"""
print("\n🕸️ Test de communication en mesh")
print("=" * 40)
# Envoyer un message de chaque relais vers les autres
for source_relay in RELAYS:
print(f"\n📡 {source_relay['name']} envoie des messages aux autres relais...")
for target_relay in RELAYS:
if source_relay['id'] != target_relay['id']:
success = await self.send_sync_message(
source_relay['id'],
"MeshSync",
{
"target_relay": target_relay['id'],
"mesh_message": {
"source": source_relay['id'],
"destination": target_relay['id'],
"data": f"Message mesh de {source_relay['id']} vers {target_relay['id']}"
}
}
)
self.test_results[f"mesh_{source_relay['id']}_to_{target_relay['id']}"] = success
def print_test_summary(self):
"""Afficher le résumé des tests"""
print("\n📋 Résumé des tests")
print("=" * 40)
total_tests = len(self.test_results)
successful_tests = sum(1 for success in self.test_results.values() if success)
print(f"Tests réussis: {successful_tests}/{total_tests}")
print(f"Taux de réussite: {(successful_tests/total_tests)*100:.1f}%")
print("\nDétail par test:")
for test_name, success in self.test_results.items():
status = "" if success else ""
print(f" {status} {test_name}")
print(f"\nMessages envoyés: {len(self.messages_sent)}")
print(f"Messages reçus: {len(self.messages_received)}")
async def run_all_tests(self):
"""Exécuter tous les tests"""
print("🧪 Test des Messages WebSocket de Synchronisation")
print("=" * 55)
print(f"Démarrage: {datetime.now()}")
print("")
# 1. Connexion aux relais
print("🔌 Connexion aux relais...")
connection_results = []
for relay in RELAYS:
success = await self.connect_to_relay(relay)
connection_results.append(success)
if not any(connection_results):
print("❌ Aucune connexion réussie. Arrêt des tests.")
return
print(f"{sum(connection_results)}/{len(RELAYS)} relais connectés")
# 2. Tests de synchronisation
await self.test_relay_discovery()
await asyncio.sleep(2)
await self.test_health_sync()
await asyncio.sleep(2)
await self.test_metrics_sync()
await asyncio.sleep(2)
await self.test_mesh_communication()
await asyncio.sleep(2)
# 3. Déconnexion
print("\n🔌 Déconnexion des relais...")
for relay in RELAYS:
await self.disconnect_from_relay(relay['id'])
# 4. Résumé
self.print_test_summary()
print(f"\n🏁 Tests terminés: {datetime.now()}")
async def main():
"""Fonction principale"""
tester = RelayTester()
await tester.run_all_tests()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\n⏹️ Tests interrompus par l'utilisateur")
except Exception as e:
print(f"\n❌ Erreur lors des tests: {e}")