sdk_signer/src/relay-manager.ts
Sosthene 93eb637f1c
Some checks failed
4NK Template Sync / check-and-sync (push) Has been cancelled
Merge branch 'dev'
2025-08-30 11:39:31 +02:00

555 lines
17 KiB
TypeScript

import WebSocket from 'ws';
import type { AnkFlag } from '../pkg/sdk_client';
import { Service } from './service';
interface RelayConnection {
id: string;
ws: WebSocket;
url: string;
spAddress: string;
isConnected: boolean;
lastHeartbeat: number;
reconnectAttempts: number;
maxReconnectAttempts: number;
handshakeCompleted: boolean; // Track if handshake has been completed
handshakePromise?: Promise<void>; // Promise that resolves when handshake completes
handshakeResolve?: () => void; // Resolver for the handshake promise
}
interface QueuedMessage {
id: string;
flag: AnkFlag;
payload: any;
targetRelayId?: string;
timestamp: number;
expiresAt?: number;
retryCount: number;
maxRetries: number;
}
export class RelayManager {
private static instance: RelayManager;
private relays: Map<string, RelayConnection> = new Map();
private messageQueue: QueuedMessage[] = [];
private processingQueue = false;
private heartbeatInterval: NodeJS.Timeout | null = null;
private queueProcessingInterval: NodeJS.Timeout | null = null;
private reconnectInterval: NodeJS.Timeout | null = null;
private handshakeCallback?: (url: string, message: any) => void;
private handshakeCompletedRelays: Set<string> = new Set(); // Track completed handshakes
private constructor() {
this.startHeartbeat();
this.startQueueProcessing();
this.startReconnectMonitoring();
}
static getInstance(): RelayManager {
if (!RelayManager.instance) {
RelayManager.instance = new RelayManager();
}
return RelayManager.instance;
}
// Set callback for handshake messages
public setHandshakeCallback(callback: (url: string, message: any) => void): void {
this.handshakeCallback = callback;
}
/**
* Update the SP address for a relay by URL.
* @param wsurl - The WebSocket URL of the relay.
* @param spAddress - The SP Address of the relay.
*/
public updateRelay(wsurl: string, spAddress: string): void {
// Find the relay by URL and update its SP address
const connectedRelays = this.getConnectedRelays();
const relay = connectedRelays.find(r => r.url === wsurl);
if (relay) {
relay.spAddress = spAddress;
console.log(`Updated relay SP address: ${wsurl} -> ${spAddress}`);
} else {
console.warn(`Relay not found for URL: ${wsurl}`);
}
}
/**
* Retrieve the spAddress for a given wsurl.
* @param wsurl - The WebSocket URL to look up.
* @returns The SP Address if found, or undefined if not.
*/
public getSpAddress(wsurl: string): string | undefined {
const connectedRelays = this.getConnectedRelays();
const relay = connectedRelays.find(r => r.url === wsurl);
return relay?.spAddress;
}
// Relay Management - Outbound Connections
public async connectToRelay(relayId: string, wsUrl: string, spAddress: string): Promise<boolean> {
try {
console.log(`🔗 Connecting to relay ${relayId} at ${wsUrl}`);
const ws = new WebSocket(wsUrl);
const relay: RelayConnection = {
id: relayId,
ws,
url: wsUrl,
spAddress,
isConnected: false,
lastHeartbeat: Date.now(),
reconnectAttempts: 0,
maxReconnectAttempts: 5,
handshakeCompleted: false
};
// Set up WebSocket event handlers
ws.on('open', () => {
console.log(`✅ Connected to relay ${relayId}`);
relay.isConnected = true;
relay.reconnectAttempts = 0;
relay.lastHeartbeat = Date.now();
});
ws.on('message', async (data: WebSocket.Data) => {
try {
const message = JSON.parse(data.toString());
await this.handleRelayMessage(relayId, message);
} catch (error) {
console.error(`❌ Error parsing message from relay ${relayId}:`, error);
}
});
ws.on('close', () => {
console.log(`🔌 Disconnected from relay ${relayId}`);
relay.isConnected = false;
this.scheduleReconnect(relayId);
});
ws.on('error', (error) => {
console.error(`❌ WebSocket error for relay ${relayId}:`, error);
relay.isConnected = false;
});
this.relays.set(relayId, relay);
// Wait for connection to establish
return new Promise((resolve) => {
const timeout = setTimeout(() => {
resolve(false);
}, 5000);
ws.once('open', () => {
clearTimeout(timeout);
resolve(true);
});
});
} catch (error) {
console.error(`❌ Failed to connect to relay ${relayId}:`, error);
return false;
}
}
public disconnectFromRelay(relayId: string): void {
const relay = this.relays.get(relayId);
if (relay) {
relay.isConnected = false;
relay.ws.close();
this.relays.delete(relayId);
console.log(`🔌 Disconnected from relay ${relayId}`);
}
}
public getRelayById(relayId: string): RelayConnection | undefined {
return this.relays.get(relayId);
}
public getConnectedRelays(): RelayConnection[] {
return Array.from(this.relays.values()).filter(relay => relay.isConnected);
}
// Message Sending Methods using AnkFlag
public sendMessage(flag: AnkFlag, payload: any, targetRelayId?: string): void {
const msg: QueuedMessage = {
id: this.generateMessageId(),
flag,
payload,
targetRelayId,
timestamp: Date.now(),
expiresAt: Date.now() + 30000, // 30 seconds
retryCount: 0,
maxRetries: 3
};
this.queueMessage(msg);
}
public sendToRelay(relayId: string, flag: AnkFlag, content: any): boolean {
const relay = this.relays.get(relayId);
if (!relay || !relay.isConnected) {
console.warn(`⚠️ Cannot send to relay ${relayId}: not connected`);
return false;
}
try {
const message = {
flag,
content,
};
relay.ws.send(JSON.stringify(message));
return true;
} catch (error) {
console.error(`❌ Failed to send message to relay ${relayId}:`, error);
return false;
}
}
public broadcastToAllRelays(flag: AnkFlag, payload: any): number {
const connectedRelays = this.getConnectedRelays();
let sentCount = 0;
for (const relay of connectedRelays) {
if (this.sendToRelay(relay.id, flag, payload)) {
sentCount++;
}
}
console.log(`📡 Broadcasted to ${sentCount}/${connectedRelays.length} relays`);
return sentCount;
}
// Protocol-Specific Message Methods
public sendNewTxMessage(message: string, targetRelayId?: string): void {
// Use appropriate AnkFlag for new transaction
this.sendMessage("NewTx" as AnkFlag, message, targetRelayId);
}
public sendCommitMessage(message: string, targetRelayId?: string): void {
// Use appropriate AnkFlag for commit
this.sendMessage("Commit" as AnkFlag, message, targetRelayId);
}
public sendCipherMessages(ciphers: string[], targetRelayId?: string): void {
for (const cipher of ciphers) {
// Use appropriate AnkFlag for cipher
this.sendMessage("Cipher" as AnkFlag, cipher, targetRelayId);
}
}
public sendFaucetMessage(message: string, targetRelayId?: string): void {
// Use appropriate AnkFlag for faucet
console.log(`📨 Sending faucet message to relay ${targetRelayId}:`, message);
this.sendMessage("Faucet" as AnkFlag, message, targetRelayId);
}
// Message Queue Management
private queueMessage(message: QueuedMessage): void {
this.messageQueue.push(message);
console.log(`📬 Queued message ${message.id} with flag ${message.flag}`);
}
private async processQueue(): Promise<void> {
if (this.processingQueue || this.messageQueue.length === 0) {
return;
}
this.processingQueue = true;
try {
const message = this.messageQueue.shift();
if (!message) {
return;
}
// Check if message has expired
if (message.expiresAt && Date.now() > message.expiresAt) {
console.warn(`⏰ Message ${message.id} expired`);
return;
}
await this.deliverMessage(message);
} finally {
this.processingQueue = false;
}
}
private async deliverMessage(message: QueuedMessage): Promise<void> {
try {
let delivered = false;
if (message.targetRelayId) {
// Send to specific relay
delivered = this.sendToRelay(message.targetRelayId, message.flag, message.payload);
} else {
// Broadcast to all connected relays
const sentCount = this.broadcastToAllRelays(message.flag, message.payload);
delivered = sentCount > 0;
}
if (!delivered) {
throw new Error('No suitable relay available');
}
console.log(`✅ Message ${message.id} delivered`);
} catch (error) {
console.error(`❌ Failed to deliver message ${message.id}:`, error);
message.retryCount++;
if (message.retryCount < message.maxRetries) {
// Re-queue with exponential backoff
setTimeout(() => {
this.queueMessage(message);
}, Math.pow(2, message.retryCount) * 1000);
} else {
console.error(`💀 Message ${message.id} failed after ${message.maxRetries} retries`);
}
}
}
// Relay Message Handling
private handleRelayMessage(relayId: string, message: any): void {
console.log(`📨 Received message from relay ${relayId}:`);
if (message.flag === 'Handshake') {
console.log('🔑 Handshake message');
} else {
console.log(`🔑 ${message.flag} message: ${message.content}`);
}
// Handle different types of relay responses
if (message.flag) {
// Handle protocol-specific responses
this.handleProtocolMessage(relayId, message);
} else if (message.type === 'heartbeat') {
// Update heartbeat
const relay = this.relays.get(relayId);
if (relay) {
relay.lastHeartbeat = Date.now();
}
} else if (message.type === 'handshake') {
// Handle handshake messages
if (this.handshakeCallback) {
this.handshakeCallback(this.relays.get(relayId)?.url || 'unknown', message);
}
this.markHandshakeCompleted(relayId);
}
}
private handleProtocolMessage(relayId: string, message: any): void {
// Handle different AnkFlag responses
switch (message.flag) {
case "NewTx":
console.log(`📨 NewTx response from relay ${relayId}`);
setImmediate(() => {
Service.getInstance().parseNewTx(message.content);
});
break;
case "Commit":
console.log(`📨 Commit response from relay ${relayId}`);
// If we receive a commit response, that's basically an error
console.error(`❌ Commit response from relay ${relayId}:`, message.error);
break;
case "Cipher":
console.log(`📨 Cipher response from relay ${relayId}`);
setImmediate(() => {
Service.getInstance().parseCipher(message.content);
});
break;
case "Handshake":
console.log(`📨 Handshake response from relay ${relayId}`);
if (this.handshakeCallback) {
this.handshakeCallback(this.relays.get(relayId)?.url || 'unknown', message);
}
this.markHandshakeCompleted(relayId);
break;
default:
console.log(`📨 Unknown flag response from relay ${relayId}:`, message.flag);
}
}
// Reconnection Logic
private scheduleReconnect(relayId: string): void {
const relay = this.relays.get(relayId);
if (!relay || relay.reconnectAttempts >= relay.maxReconnectAttempts) {
console.log(`💀 Max reconnection attempts reached for relay ${relayId}`);
this.relays.delete(relayId);
return;
}
const delay = Math.pow(2, relay.reconnectAttempts) * 1000; // Exponential backoff
console.log(`🔄 Scheduling reconnect to relay ${relayId} in ${delay}ms (attempt ${relay.reconnectAttempts + 1})`);
setTimeout(async () => {
relay.reconnectAttempts++;
await this.connectToRelay(relayId, relay.url, relay.spAddress);
}, delay);
}
private startReconnectMonitoring(): void {
this.reconnectInterval = setInterval(() => {
// Check for disconnected relays and attempt reconnection
for (const [relayId, relay] of this.relays) {
if (!relay.isConnected && relay.reconnectAttempts < relay.maxReconnectAttempts) {
this.scheduleReconnect(relayId);
}
}
}, 10000); // Check every 10 seconds
}
// Heartbeat Management
private startHeartbeat(): void {
this.heartbeatInterval = setInterval(() => {
const now = Date.now();
const heartbeatMessage = {
type: 'heartbeat',
timestamp: now
};
for (const [relayId, relay] of this.relays) {
if (relay.isConnected) {
try {
relay.ws.send(JSON.stringify(heartbeatMessage));
relay.lastHeartbeat = now;
} catch (error) {
console.error(`❌ Heartbeat failed for relay ${relayId}:`, error);
relay.isConnected = false;
}
}
}
}, 30000); // 30 seconds
}
private startQueueProcessing(): void {
this.queueProcessingInterval = setInterval(() => {
this.processQueue();
}, 100); // Process queue every 100ms
}
// Utility Methods
private generateMessageId(): string {
return `msg_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
}
public getStats(): any {
return {
totalRelays: this.relays.size,
connectedRelays: this.getConnectedRelays().length,
queuedMessages: this.messageQueue.length
};
}
public shutdown(): void {
if (this.heartbeatInterval) {
clearInterval(this.heartbeatInterval);
}
if (this.queueProcessingInterval) {
clearInterval(this.queueProcessingInterval);
}
if (this.reconnectInterval) {
clearInterval(this.reconnectInterval);
}
for (const [relayId] of this.relays) {
this.disconnectFromRelay(relayId);
}
console.log('🛑 Relay manager shutdown complete');
}
/**
* Mark a relay as having completed its handshake.
* @param relayId - The ID of the relay that completed handshake.
*/
private markHandshakeCompleted(relayId: string): void {
const relay = this.relays.get(relayId);
if (relay) {
relay.handshakeCompleted = true;
this.handshakeCompletedRelays.add(relayId);
console.log(`✅ Handshake completed for relay ${relayId}`);
if (relay.handshakeResolve) {
relay.handshakeResolve();
}
}
}
/**
* Wait for handshake completion from at least one relay
* @param timeoutMs - Timeout in milliseconds (default: 10000)
* @returns Promise that resolves when at least one handshake is completed
*/
public async waitForHandshake(timeoutMs: number = 10000): Promise<void> {
const startTime = Date.now();
const pollInterval = 100; // Check every 100ms
return new Promise<void>((resolve, reject) => {
const checkForHandshake = () => {
// Check if we have any completed handshakes
if (this.handshakeCompletedRelays.size > 0) {
console.log(`✅ Handshake completed from ${this.handshakeCompletedRelays.size} relay(s)`);
resolve();
return;
}
// Check timeout
if (Date.now() - startTime >= timeoutMs) {
reject(new Error(`No handshake completed after ${timeoutMs}ms timeout`));
return;
}
// Continue polling
setTimeout(checkForHandshake, pollInterval);
};
checkForHandshake();
});
}
/**
* Wait for handshake completion from a specific relay
* @param relayId - The relay ID to wait for
* @param timeoutMs - Timeout in milliseconds (default: 10000)
* @returns Promise that resolves when the specific relay's handshake is completed
*/
public async waitForRelayHandshake(relayId: string, timeoutMs: number = 10000): Promise<void> {
const relay = this.relays.get(relayId);
if (!relay) {
throw new Error(`Relay ${relayId} not found`);
}
if (relay.handshakeCompleted) {
return Promise.resolve();
}
if (!relay.handshakePromise) {
relay.handshakePromise = new Promise<void>((resolve, reject) => {
relay.handshakeResolve = resolve;
// Set timeout
setTimeout(() => {
reject(new Error(`Handshake timeout for relay ${relayId} after ${timeoutMs}ms`));
}, timeoutMs);
});
}
return relay.handshakePromise;
}
/**
* Check if a relay has completed handshake
* @param relayId - The relay ID to check
* @returns True if handshake is completed, false otherwise
*/
public isHandshakeCompleted(relayId: string): boolean {
return this.handshakeCompletedRelays.has(relayId);
}
/**
* Get all relays that have completed handshake
* @returns Array of relay IDs that have completed handshake
*/
public getHandshakeCompletedRelays(): string[] {
return Array.from(this.handshakeCompletedRelays);
}
}