2026-01-13 14:49:19 +01:00

189 lines
5.9 KiB
TypeScript

import type { Event, Filter } from 'nostr-tools'
import type { SimplePoolWithSub } from '@/types/nostr-tools-extended'
import { createSubscription } from '@/types/nostr-tools-extended'
import { PLATFORM_SERVICE } from '../platformConfig'
import { extractTagsFromEvent } from '../nostrTagSystem'
import {
logRelaySyncProgress,
logTargetEventAccepted,
logTargetEventReceived,
logTargetEventRejected,
logTargetEventTags,
} from './debug'
export type SyncProgress = { currentStep: number; totalSteps: number; completed: boolean; currentRelay?: string }
export type SyncProgressManager = { setProgress: (progress: SyncProgress | null) => void }
export type RelaySessionManager = {
markRelayFailed: (relayUrl: string) => void
}
export type SetSyncSubscription = (sub: { unsub: () => void } | null) => void
export async function collectEventsFromRelays(params: {
pool: SimplePoolWithSub
activeRelays: string[]
filters: Filter[]
relaySessionManager: RelaySessionManager
syncProgressManager: SyncProgressManager
syncTimeoutMs: number
setSyncSubscription: SetSyncSubscription
}): Promise<Event[]> {
const allEvents: Event[] = []
const processedEventIds = new Set<string>()
for (let i = 0; i < params.activeRelays.length; i++) {
const relayUrl = params.activeRelays[i]
if (relayUrl) {
await collectEventsFromRelay({
...params,
relayUrl,
relayIndex: i,
processedEventIds,
allEvents,
})
}
}
return allEvents
}
async function collectEventsFromRelay(params: {
pool: SimplePoolWithSub
relayUrl: string
relayIndex: number
activeRelays: string[]
filters: Filter[]
relaySessionManager: RelaySessionManager
syncProgressManager: SyncProgressManager
processedEventIds: Set<string>
allEvents: Event[]
syncTimeoutMs: number
setSyncSubscription: SetSyncSubscription
}): Promise<void> {
setRelayProgress(params.syncProgressManager, 0, params.activeRelays.length, params.relayUrl)
try {
const relayEvents = await readEventsFromRelay({
pool: params.pool,
relayUrl: params.relayUrl,
filters: params.filters,
syncTimeoutMs: params.syncTimeoutMs,
setSyncSubscription: params.setSyncSubscription,
})
addUniqueEvents({ relayEvents, processedEventIds: params.processedEventIds, allEvents: params.allEvents })
await updateRelayLastSyncDateIfNeeded({ relayUrl: params.relayUrl, eventCount: relayEvents.length })
setRelayProgress(params.syncProgressManager, params.relayIndex + 1, params.activeRelays.length, params.relayUrl)
} catch (error) {
const errorMessage = error instanceof Error ? error.message : String(error)
console.warn(`[PlatformSync] Relay ${params.relayUrl} failed: ${errorMessage}`)
params.relaySessionManager.markRelayFailed(params.relayUrl)
setRelayProgress(params.syncProgressManager, params.relayIndex + 1, params.activeRelays.length, params.relayUrl)
}
}
function setRelayProgress(
syncProgressManager: SyncProgressManager,
currentStep: number,
totalSteps: number,
relayUrl: string
): void {
syncProgressManager.setProgress({ currentStep, totalSteps, completed: false, currentRelay: relayUrl })
}
async function readEventsFromRelay(params: {
pool: SimplePoolWithSub
relayUrl: string
filters: Filter[]
syncTimeoutMs: number
setSyncSubscription: SetSyncSubscription
}): Promise<Event[]> {
console.warn(`[PlatformSync] Synchronizing from relay: ${params.relayUrl}`)
const sub = createSubscription(params.pool, [params.relayUrl], params.filters)
params.setSyncSubscription(sub)
const relayEvents: Event[] = []
let eventCount = 0
const finalize = (): void => {
sub.unsub()
params.setSyncSubscription(null)
}
await new Promise<void>((resolve) => {
sub.on('event', (event: Event): void => {
eventCount = handleRelaySyncEvent({ event, relayUrl: params.relayUrl, relayEvents, eventCount })
})
sub.on('eose', (): void => {
console.warn(`[PlatformSync] Relay ${params.relayUrl} sent EOSE signal`)
finalize()
resolve()
})
const timeoutId = setTimeout((): void => {
console.warn(`[PlatformSync] Relay ${params.relayUrl} timeout after ${params.syncTimeoutMs}ms`)
finalize()
resolve()
}, params.syncTimeoutMs)
timeoutId.unref?.()
})
console.warn(
`[PlatformSync] Relay ${params.relayUrl} completed: received ${eventCount} total events from relay, ${relayEvents.length} filtered with service='${PLATFORM_SERVICE}'`
)
return relayEvents
}
function handleRelaySyncEvent(params: {
event: Event
relayUrl: string
relayEvents: Event[]
eventCount: number
}): number {
const nextCount = params.eventCount + 1
logRelaySyncProgress({ relayUrl: params.relayUrl, eventCount: nextCount })
logTargetEventReceived({ relayUrl: params.relayUrl, event: params.event, eventCount: nextCount })
const tags = extractTagsFromEvent(params.event)
logTargetEventTags({ event: params.event, tags })
if (tags.service === PLATFORM_SERVICE) {
params.relayEvents.push(params.event)
logTargetEventAccepted(params.event)
} else {
logTargetEventRejected({ event: params.event, tags })
}
return nextCount
}
function addUniqueEvents(params: { relayEvents: Event[]; processedEventIds: Set<string>; allEvents: Event[] }): void {
for (const event of params.relayEvents) {
if (!params.processedEventIds.has(event.id)) {
params.processedEventIds.add(event.id)
params.allEvents.push(event)
}
}
}
async function updateRelayLastSyncDateIfNeeded(params: { relayUrl: string; eventCount: number }): Promise<void> {
if (params.eventCount <= 0) {
return
}
const { configStorage } = await import('../configStorage')
const config = await configStorage.getConfig()
const relayConfig = config.relays.find((r) => r.url === params.relayUrl)
if (!relayConfig) {
return
}
await configStorage.updateRelay(relayConfig.id, { lastSyncDate: Date.now() })
}