story-research-zapwall/lib/helpers/syncSubscriptionHelper.ts
2026-01-07 03:10:40 +01:00

128 lines
3.4 KiB
TypeScript

/**
* Helper for creating sync subscriptions with relay rotation
* Centralizes the pattern of subscription creation, event handling, and timeout management
*/
import type { Event, Filter } from 'nostr-tools'
import type { SimplePool } from 'nostr-tools'
import type { SimplePoolWithSub } from '@/types/nostr-tools-extended'
import { tryWithRelayRotation } from '../relayRotation'
import { getPrimaryRelaySync } from '../config'
import { createSubscription } from '@/types/nostr-tools-extended'
export interface SyncSubscriptionConfig {
pool: SimplePoolWithSub
filters: Filter[]
onEvent?: (event: Event) => void | Promise<void>
onComplete?: (events: Event[]) => void | Promise<void>
timeout?: number
updateProgress?: (relayUrl: string) => void
eventFilter?: (event: Event) => boolean
}
export interface SyncSubscriptionResult {
subscription: ReturnType<typeof createSubscription>
relayUrl: string
events: Event[]
}
/**
* Create a sync subscription with relay rotation
* Handles relay rotation, progress updates, event collection, and timeout
*/
export async function createSyncSubscription(
config: SyncSubscriptionConfig
): Promise<SyncSubscriptionResult> {
const { pool, filters, onEvent, onComplete, timeout = 10000, updateProgress, eventFilter } = config
const events: Event[] = []
let sub: ReturnType<typeof createSubscription> | null = null
let usedRelayUrl = ''
// Try relays with rotation
try {
const result = await tryWithRelayRotation(
pool as unknown as SimplePool,
async (relayUrl, poolWithSub) => {
usedRelayUrl = relayUrl
// Update progress if callback provided
if (updateProgress) {
updateProgress(relayUrl)
} else {
// Default: notify progress manager
const { syncProgressManager } = await import('../syncProgressManager')
const currentProgress = syncProgressManager.getProgress()
if (currentProgress) {
syncProgressManager.setProgress({
...currentProgress,
currentStep: 0,
currentRelay: relayUrl,
})
}
}
return createSubscription(poolWithSub, [relayUrl], filters)
},
5000 // 5 second timeout per relay
)
sub = result
} catch {
// Fallback to primary relay if rotation fails
usedRelayUrl = getPrimaryRelaySync()
sub = createSubscription(pool, [usedRelayUrl], filters)
}
if (!sub) {
throw new Error('Failed to create subscription')
}
return new Promise<SyncSubscriptionResult>((resolve) => {
let finished = false
const done = async (): Promise<void> => {
if (finished) {
return
}
finished = true
sub?.unsub()
// Call onComplete callback if provided
if (onComplete) {
await onComplete(events)
}
resolve({
subscription: sub,
relayUrl: usedRelayUrl,
events,
})
}
// Handle events
sub.on('event', (event: Event): void => {
// Apply event filter if provided
if (eventFilter && !eventFilter(event)) {
return
}
events.push(event)
// Call onEvent callback if provided
if (onEvent) {
void onEvent(event)
}
})
// Handle end of stream
sub.on('eose', (): void => {
void done()
})
// Timeout fallback
setTimeout((): void => {
void done()
}, timeout).unref?.()
})
}