story-research-zapwall/lib/helpers/syncSubscriptionHelper.ts
2026-01-10 09:41:57 +01:00

154 lines
5.1 KiB
TypeScript

/**
* Helper for creating sync subscriptions with relay rotation
* Centralizes the pattern of subscription creation, event handling, and timeout management
*/
import type { Event, Filter } from 'nostr-tools'
import type { SimplePool } from 'nostr-tools'
import type { SimplePoolWithSub } from '@/types/nostr-tools-extended'
import { tryWithRelayRotation } from '../relayRotation'
import { getPrimaryRelaySync } from '../config'
import { createSubscription } from '@/types/nostr-tools-extended'
export interface SyncSubscriptionConfig {
pool: SimplePoolWithSub
filters: Filter[]
onEvent?: (event: Event) => void | Promise<void>
onComplete?: (events: Event[]) => void | Promise<void>
timeout?: number
updateProgress?: (relayUrl: string) => void
eventFilter?: (event: Event) => boolean
}
export interface SyncSubscriptionResult {
subscription: ReturnType<typeof createSubscription>
relayUrl: string
events: Event[]
}
/**
* Create a sync subscription with relay rotation
* Handles relay rotation, progress updates, event collection, and timeout
*/
export async function createSyncSubscription(
config: SyncSubscriptionConfig
): Promise<SyncSubscriptionResult> {
const timeout = config.timeout ?? 10000
const { subscription, relayUrl } = await createSubscriptionWithRelayRotation({
pool: config.pool,
filters: config.filters,
updateProgress: config.updateProgress,
})
return collectSubscriptionEvents({
subscription,
relayUrl,
timeout,
onEvent: config.onEvent,
onComplete: config.onComplete,
eventFilter: config.eventFilter,
})
}
async function createSubscriptionWithRelayRotation(params: {
pool: SimplePoolWithSub
filters: Filter[]
updateProgress: ((relayUrl: string) => void) | undefined
}): Promise<{ subscription: ReturnType<typeof createSubscription>; relayUrl: string }> {
try {
let usedRelayUrl = ''
const subscription = await tryWithRelayRotation(
params.pool as unknown as SimplePool,
async (relayUrl, poolWithSub) => {
usedRelayUrl = relayUrl
await updateSyncProgress(relayUrl, params.updateProgress)
return createSubscription(poolWithSub, [relayUrl], params.filters)
},
5000
)
if (!usedRelayUrl) {
throw new Error('Relay rotation did not return a relay URL')
}
return { subscription, relayUrl: usedRelayUrl }
} catch (error) {
console.warn('[createSyncSubscription] Relay rotation failed, falling back to primary relay:', error)
const relayUrl = getPrimaryRelaySync()
return { subscription: createSubscription(params.pool, [relayUrl], params.filters), relayUrl }
}
}
async function updateSyncProgress(relayUrl: string, updateProgress: ((relayUrl: string) => void) | undefined): Promise<void> {
if (updateProgress) {
updateProgress(relayUrl)
return
}
const { syncProgressManager } = await import('../syncProgressManager')
const currentProgress = syncProgressManager.getProgress()
if (!currentProgress) {
return
}
syncProgressManager.setProgress({
...currentProgress,
currentStep: 0,
currentRelay: relayUrl,
})
}
async function collectSubscriptionEvents(params: {
subscription: ReturnType<typeof createSubscription>
relayUrl: string
timeout: number
onEvent: ((event: Event) => void | Promise<void>) | undefined
onComplete: ((events: Event[]) => void | Promise<void>) | undefined
eventFilter: ((event: Event) => boolean) | undefined
}): Promise<SyncSubscriptionResult> {
const events: Event[] = []
return new Promise<SyncSubscriptionResult>((resolve) => {
const finalize = createFinalizeHandler({ subscription: params.subscription, relayUrl: params.relayUrl, events, resolve, onComplete: params.onComplete })
registerSubscriptionEventHandlers({ subscription: params.subscription, events, onEvent: params.onEvent, eventFilter: params.eventFilter, finalize })
const timeoutId = setTimeout((): void => void finalize(), params.timeout)
timeoutId.unref?.()
})
}
function createFinalizeHandler(params: {
subscription: ReturnType<typeof createSubscription>
relayUrl: string
events: Event[]
resolve: (value: SyncSubscriptionResult) => void
onComplete: ((events: Event[]) => void | Promise<void>) | undefined
}): () => Promise<void> {
let finished = false
return async (): Promise<void> => {
if (finished) {
return
}
finished = true
params.subscription.unsub()
if (params.onComplete) {
await params.onComplete(params.events)
}
params.resolve({ subscription: params.subscription, relayUrl: params.relayUrl, events: params.events })
}
}
function registerSubscriptionEventHandlers(params: {
subscription: ReturnType<typeof createSubscription>
events: Event[]
onEvent: ((event: Event) => void | Promise<void>) | undefined
eventFilter: ((event: Event) => boolean) | undefined
finalize: () => Promise<void>
}): void {
params.subscription.on('event', (event: Event): void => {
if (params.eventFilter && !params.eventFilter(event)) {
return
}
params.events.push(event)
if (params.onEvent) {
void params.onEvent(event)
}
})
params.subscription.on('eose', (): void => {
void params.finalize()
})
}