import type { Event, Filter } from 'nostr-tools' import type { SimplePoolWithSub } from '@/types/nostr-tools-extended' import { createSubscription } from '@/types/nostr-tools-extended' import { PLATFORM_SERVICE } from '../platformConfig' import { extractTagsFromEvent } from '../nostrTagSystem' import { logRelaySyncProgress, logTargetEventAccepted, logTargetEventReceived, logTargetEventRejected, logTargetEventTags, } from './debug' export type SyncProgress = { currentStep: number; totalSteps: number; completed: boolean; currentRelay?: string } export type SyncProgressManager = { setProgress: (progress: SyncProgress | null) => void } export type RelaySessionManager = { markRelayFailed: (relayUrl: string) => void } export type SetSyncSubscription = (sub: { unsub: () => void } | null) => void export async function collectEventsFromRelays(params: { pool: SimplePoolWithSub activeRelays: string[] filters: Filter[] relaySessionManager: RelaySessionManager syncProgressManager: SyncProgressManager syncTimeoutMs: number setSyncSubscription: SetSyncSubscription }): Promise { const allEvents: Event[] = [] const processedEventIds = new Set() for (let i = 0; i < params.activeRelays.length; i++) { const relayUrl = params.activeRelays[i] if (relayUrl) { await collectEventsFromRelay({ ...params, relayUrl, relayIndex: i, processedEventIds, allEvents, }) } } return allEvents } async function collectEventsFromRelay(params: { pool: SimplePoolWithSub relayUrl: string relayIndex: number activeRelays: string[] filters: Filter[] relaySessionManager: RelaySessionManager syncProgressManager: SyncProgressManager processedEventIds: Set allEvents: Event[] syncTimeoutMs: number setSyncSubscription: SetSyncSubscription }): Promise { setRelayProgress(params.syncProgressManager, 0, params.activeRelays.length, params.relayUrl) try { const relayEvents = await readEventsFromRelay({ pool: params.pool, relayUrl: params.relayUrl, filters: params.filters, syncTimeoutMs: params.syncTimeoutMs, setSyncSubscription: params.setSyncSubscription, }) addUniqueEvents({ relayEvents, processedEventIds: params.processedEventIds, allEvents: params.allEvents }) await updateRelayLastSyncDateIfNeeded({ relayUrl: params.relayUrl, eventCount: relayEvents.length }) setRelayProgress(params.syncProgressManager, params.relayIndex + 1, params.activeRelays.length, params.relayUrl) } catch (error) { const errorMessage = error instanceof Error ? error.message : String(error) console.warn(`[PlatformSync] Relay ${params.relayUrl} failed: ${errorMessage}`) params.relaySessionManager.markRelayFailed(params.relayUrl) setRelayProgress(params.syncProgressManager, params.relayIndex + 1, params.activeRelays.length, params.relayUrl) } } function setRelayProgress( syncProgressManager: SyncProgressManager, currentStep: number, totalSteps: number, relayUrl: string ): void { syncProgressManager.setProgress({ currentStep, totalSteps, completed: false, currentRelay: relayUrl }) } async function readEventsFromRelay(params: { pool: SimplePoolWithSub relayUrl: string filters: Filter[] syncTimeoutMs: number setSyncSubscription: SetSyncSubscription }): Promise { console.warn(`[PlatformSync] Synchronizing from relay: ${params.relayUrl}`) const sub = createSubscription(params.pool, [params.relayUrl], params.filters) params.setSyncSubscription(sub) const relayEvents: Event[] = [] let eventCount = 0 const finalize = (): void => { sub.unsub() params.setSyncSubscription(null) } await new Promise((resolve) => { sub.on('event', (event: Event): void => { eventCount = handleRelaySyncEvent({ event, relayUrl: params.relayUrl, relayEvents, eventCount }) }) sub.on('eose', (): void => { console.warn(`[PlatformSync] Relay ${params.relayUrl} sent EOSE signal`) finalize() resolve() }) const timeoutId = setTimeout((): void => { console.warn(`[PlatformSync] Relay ${params.relayUrl} timeout after ${params.syncTimeoutMs}ms`) finalize() resolve() }, params.syncTimeoutMs) timeoutId.unref?.() }) console.warn( `[PlatformSync] Relay ${params.relayUrl} completed: received ${eventCount} total events from relay, ${relayEvents.length} filtered with service='${PLATFORM_SERVICE}'` ) return relayEvents } function handleRelaySyncEvent(params: { event: Event relayUrl: string relayEvents: Event[] eventCount: number }): number { const nextCount = params.eventCount + 1 logRelaySyncProgress({ relayUrl: params.relayUrl, eventCount: nextCount }) logTargetEventReceived({ relayUrl: params.relayUrl, event: params.event, eventCount: nextCount }) const tags = extractTagsFromEvent(params.event) logTargetEventTags({ event: params.event, tags }) if (tags.service === PLATFORM_SERVICE) { params.relayEvents.push(params.event) logTargetEventAccepted(params.event) } else { logTargetEventRejected({ event: params.event, tags }) } return nextCount } function addUniqueEvents(params: { relayEvents: Event[]; processedEventIds: Set; allEvents: Event[] }): void { for (const event of params.relayEvents) { if (!params.processedEventIds.has(event.id)) { params.processedEventIds.add(event.id) params.allEvents.push(event) } } } async function updateRelayLastSyncDateIfNeeded(params: { relayUrl: string; eventCount: number }): Promise { if (params.eventCount <= 0) { return } const { configStorage } = await import('../configStorage') const config = await configStorage.getConfig() const relayConfig = config.relays.find((r) => r.url === params.relayUrl) if (!relayConfig) { return } await configStorage.updateRelay(relayConfig.id, { lastSyncDate: Date.now() }) }