128 lines
3.4 KiB
TypeScript
128 lines
3.4 KiB
TypeScript
/**
|
|
* Helper for creating sync subscriptions with relay rotation
|
|
* Centralizes the pattern of subscription creation, event handling, and timeout management
|
|
*/
|
|
|
|
import type { Event, Filter } from 'nostr-tools'
|
|
import type { SimplePool } from 'nostr-tools'
|
|
import type { SimplePoolWithSub } from '@/types/nostr-tools-extended'
|
|
import { tryWithRelayRotation } from '../relayRotation'
|
|
import { getPrimaryRelaySync } from '../config'
|
|
import { createSubscription } from '@/types/nostr-tools-extended'
|
|
|
|
export interface SyncSubscriptionConfig {
|
|
pool: SimplePoolWithSub
|
|
filters: Filter[]
|
|
onEvent?: (event: Event) => void | Promise<void>
|
|
onComplete?: (events: Event[]) => void | Promise<void>
|
|
timeout?: number
|
|
updateProgress?: (relayUrl: string) => void
|
|
eventFilter?: (event: Event) => boolean
|
|
}
|
|
|
|
export interface SyncSubscriptionResult {
|
|
subscription: ReturnType<typeof createSubscription>
|
|
relayUrl: string
|
|
events: Event[]
|
|
}
|
|
|
|
/**
|
|
* Create a sync subscription with relay rotation
|
|
* Handles relay rotation, progress updates, event collection, and timeout
|
|
*/
|
|
export async function createSyncSubscription(
|
|
config: SyncSubscriptionConfig
|
|
): Promise<SyncSubscriptionResult> {
|
|
const { pool, filters, onEvent, onComplete, timeout = 10000, updateProgress, eventFilter } = config
|
|
|
|
const events: Event[] = []
|
|
let sub: ReturnType<typeof createSubscription> | null = null
|
|
let usedRelayUrl = ''
|
|
|
|
// Try relays with rotation
|
|
try {
|
|
const result = await tryWithRelayRotation(
|
|
pool as unknown as SimplePool,
|
|
async (relayUrl, poolWithSub) => {
|
|
usedRelayUrl = relayUrl
|
|
|
|
// Update progress if callback provided
|
|
if (updateProgress) {
|
|
updateProgress(relayUrl)
|
|
} else {
|
|
// Default: notify progress manager
|
|
const { syncProgressManager } = await import('../syncProgressManager')
|
|
const currentProgress = syncProgressManager.getProgress()
|
|
if (currentProgress) {
|
|
syncProgressManager.setProgress({
|
|
...currentProgress,
|
|
currentStep: 0,
|
|
currentRelay: relayUrl,
|
|
})
|
|
}
|
|
}
|
|
|
|
return createSubscription(poolWithSub, [relayUrl], filters)
|
|
},
|
|
5000 // 5 second timeout per relay
|
|
)
|
|
sub = result
|
|
} catch {
|
|
// Fallback to primary relay if rotation fails
|
|
usedRelayUrl = getPrimaryRelaySync()
|
|
sub = createSubscription(pool, [usedRelayUrl], filters)
|
|
}
|
|
|
|
if (!sub) {
|
|
throw new Error('Failed to create subscription')
|
|
}
|
|
|
|
return new Promise<SyncSubscriptionResult>((resolve) => {
|
|
let finished = false
|
|
|
|
const done = async (): Promise<void> => {
|
|
if (finished) {
|
|
return
|
|
}
|
|
finished = true
|
|
sub?.unsub()
|
|
|
|
// Call onComplete callback if provided
|
|
if (onComplete) {
|
|
await onComplete(events)
|
|
}
|
|
|
|
resolve({
|
|
subscription: sub,
|
|
relayUrl: usedRelayUrl,
|
|
events,
|
|
})
|
|
}
|
|
|
|
// Handle events
|
|
sub.on('event', (event: Event): void => {
|
|
// Apply event filter if provided
|
|
if (eventFilter && !eventFilter(event)) {
|
|
return
|
|
}
|
|
|
|
events.push(event)
|
|
|
|
// Call onEvent callback if provided
|
|
if (onEvent) {
|
|
void onEvent(event)
|
|
}
|
|
})
|
|
|
|
// Handle end of stream
|
|
sub.on('eose', (): void => {
|
|
void done()
|
|
})
|
|
|
|
// Timeout fallback
|
|
setTimeout((): void => {
|
|
void done()
|
|
}, timeout).unref?.()
|
|
})
|
|
}
|