/** * Helper for creating sync subscriptions with relay rotation * Centralizes the pattern of subscription creation, event handling, and timeout management */ import type { Event, Filter } from 'nostr-tools' import type { SimplePool } from 'nostr-tools' import type { SimplePoolWithSub } from '@/types/nostr-tools-extended' import { tryWithRelayRotation } from '../relayRotation' import { getPrimaryRelaySync } from '../config' import { createSubscription } from '@/types/nostr-tools-extended' export interface SyncSubscriptionConfig { pool: SimplePoolWithSub filters: Filter[] onEvent?: (event: Event) => void | Promise onComplete?: (events: Event[]) => void | Promise timeout?: number updateProgress?: (relayUrl: string) => void eventFilter?: (event: Event) => boolean } export interface SyncSubscriptionResult { subscription: ReturnType relayUrl: string events: Event[] } /** * Create a sync subscription with relay rotation * Handles relay rotation, progress updates, event collection, and timeout */ export async function createSyncSubscription( config: SyncSubscriptionConfig ): Promise { const timeout = config.timeout ?? 10000 const { subscription, relayUrl } = await createSubscriptionWithRelayRotation({ pool: config.pool, filters: config.filters, updateProgress: config.updateProgress, }) return collectSubscriptionEvents({ subscription, relayUrl, timeout, onEvent: config.onEvent, onComplete: config.onComplete, eventFilter: config.eventFilter, }) } async function createSubscriptionWithRelayRotation(params: { pool: SimplePoolWithSub filters: Filter[] updateProgress: ((relayUrl: string) => void) | undefined }): Promise<{ subscription: ReturnType; relayUrl: string }> { try { let usedRelayUrl = '' const subscription = await tryWithRelayRotation( params.pool as unknown as SimplePool, async (relayUrl, poolWithSub) => { usedRelayUrl = relayUrl await updateSyncProgress(relayUrl, params.updateProgress) return createSubscription(poolWithSub, [relayUrl], params.filters) }, 5000 ) if (!usedRelayUrl) { throw new Error('Relay rotation did not return a relay URL') } return { subscription, relayUrl: usedRelayUrl } } catch (error) { console.warn('[createSyncSubscription] Relay rotation failed, falling back to primary relay:', error) const relayUrl = getPrimaryRelaySync() return { subscription: createSubscription(params.pool, [relayUrl], params.filters), relayUrl } } } async function updateSyncProgress(relayUrl: string, updateProgress: ((relayUrl: string) => void) | undefined): Promise { if (updateProgress) { updateProgress(relayUrl) return } const { syncProgressManager } = await import('../syncProgressManager') const currentProgress = syncProgressManager.getProgress() if (!currentProgress) { return } syncProgressManager.setProgress({ ...currentProgress, currentStep: 0, currentRelay: relayUrl, }) } async function collectSubscriptionEvents(params: { subscription: ReturnType relayUrl: string timeout: number onEvent: ((event: Event) => void | Promise) | undefined onComplete: ((events: Event[]) => void | Promise) | undefined eventFilter: ((event: Event) => boolean) | undefined }): Promise { const events: Event[] = [] return new Promise((resolve) => { const finalize = createFinalizeHandler({ subscription: params.subscription, relayUrl: params.relayUrl, events, resolve, onComplete: params.onComplete }) registerSubscriptionEventHandlers({ subscription: params.subscription, events, onEvent: params.onEvent, eventFilter: params.eventFilter, finalize }) const timeoutId = setTimeout((): void => void finalize(), params.timeout) timeoutId.unref?.() }) } function createFinalizeHandler(params: { subscription: ReturnType relayUrl: string events: Event[] resolve: (value: SyncSubscriptionResult) => void onComplete: ((events: Event[]) => void | Promise) | undefined }): () => Promise { let finished = false return async (): Promise => { if (finished) { return } finished = true params.subscription.unsub() if (params.onComplete) { await params.onComplete(params.events) } params.resolve({ subscription: params.subscription, relayUrl: params.relayUrl, events: params.events }) } } function registerSubscriptionEventHandlers(params: { subscription: ReturnType events: Event[] onEvent: ((event: Event) => void | Promise) | undefined eventFilter: ((event: Event) => boolean) | undefined finalize: () => Promise }): void { params.subscription.on('event', (event: Event): void => { if (params.eventFilter && !params.eventFilter(event)) { return } params.events.push(event) if (params.onEvent) { void params.onEvent(event) } }) params.subscription.on('eose', (): void => { void params.finalize() }) }