/** * Helper for creating sync subscriptions with relay rotation * Centralizes the pattern of subscription creation, event handling, and timeout management */ import type { Event, Filter } from 'nostr-tools' import type { SimplePool } from 'nostr-tools' import type { SimplePoolWithSub } from '@/types/nostr-tools-extended' import { tryWithRelayRotation } from '../relayRotation' import { getPrimaryRelaySync } from '../config' import { createSubscription } from '@/types/nostr-tools-extended' export interface SyncSubscriptionConfig { pool: SimplePoolWithSub filters: Filter[] onEvent?: (event: Event) => void | Promise onComplete?: (events: Event[]) => void | Promise timeout?: number updateProgress?: (relayUrl: string) => void eventFilter?: (event: Event) => boolean } export interface SyncSubscriptionResult { subscription: ReturnType relayUrl: string events: Event[] } /** * Create a sync subscription with relay rotation * Handles relay rotation, progress updates, event collection, and timeout */ export async function createSyncSubscription( config: SyncSubscriptionConfig ): Promise { const { pool, filters, onEvent, onComplete, timeout = 10000, updateProgress, eventFilter } = config const events: Event[] = [] let sub: ReturnType | null = null let usedRelayUrl = '' // Try relays with rotation try { const result = await tryWithRelayRotation( pool as unknown as SimplePool, async (relayUrl, poolWithSub) => { usedRelayUrl = relayUrl // Update progress if callback provided if (updateProgress) { updateProgress(relayUrl) } else { // Default: notify progress manager const { syncProgressManager } = await import('../syncProgressManager') const currentProgress = syncProgressManager.getProgress() if (currentProgress) { syncProgressManager.setProgress({ ...currentProgress, currentStep: 0, currentRelay: relayUrl, }) } } return createSubscription(poolWithSub, [relayUrl], filters) }, 5000 // 5 second timeout per relay ) sub = result } catch { // Fallback to primary relay if rotation fails usedRelayUrl = getPrimaryRelaySync() sub = createSubscription(pool, [usedRelayUrl], filters) } if (!sub) { throw new Error('Failed to create subscription') } return new Promise((resolve) => { let finished = false const done = async (): Promise => { if (finished) { return } finished = true sub?.unsub() // Call onComplete callback if provided if (onComplete) { await onComplete(events) } resolve({ subscription: sub, relayUrl: usedRelayUrl, events, }) } // Handle events sub.on('event', (event: Event): void => { // Apply event filter if provided if (eventFilter && !eventFilter(event)) { return } events.push(event) // Call onEvent callback if provided if (onEvent) { void onEvent(event) } }) // Handle end of stream sub.on('eose', (): void => { void done() }) // Timeout fallback setTimeout((): void => { void done() }, timeout).unref?.() }) }