189 lines
5.9 KiB
TypeScript
189 lines
5.9 KiB
TypeScript
import type { Event, Filter } from 'nostr-tools'
|
|
import type { SimplePoolWithSub } from '@/types/nostr-tools-extended'
|
|
import { createSubscription } from '@/types/nostr-tools-extended'
|
|
import { PLATFORM_SERVICE } from '../platformConfig'
|
|
import { extractTagsFromEvent } from '../nostrTagSystem'
|
|
import {
|
|
logRelaySyncProgress,
|
|
logTargetEventAccepted,
|
|
logTargetEventReceived,
|
|
logTargetEventRejected,
|
|
logTargetEventTags,
|
|
} from './debug'
|
|
|
|
export type SyncProgress = { currentStep: number; totalSteps: number; completed: boolean; currentRelay?: string }
|
|
|
|
export type SyncProgressManager = { setProgress: (progress: SyncProgress | null) => void }
|
|
|
|
export type RelaySessionManager = {
|
|
markRelayFailed: (relayUrl: string) => void
|
|
}
|
|
|
|
export type SetSyncSubscription = (sub: { unsub: () => void } | null) => void
|
|
|
|
export async function collectEventsFromRelays(params: {
|
|
pool: SimplePoolWithSub
|
|
activeRelays: string[]
|
|
filters: Filter[]
|
|
relaySessionManager: RelaySessionManager
|
|
syncProgressManager: SyncProgressManager
|
|
syncTimeoutMs: number
|
|
setSyncSubscription: SetSyncSubscription
|
|
}): Promise<Event[]> {
|
|
const allEvents: Event[] = []
|
|
const processedEventIds = new Set<string>()
|
|
|
|
for (let i = 0; i < params.activeRelays.length; i++) {
|
|
const relayUrl = params.activeRelays[i]
|
|
if (relayUrl) {
|
|
await collectEventsFromRelay({
|
|
...params,
|
|
relayUrl,
|
|
relayIndex: i,
|
|
processedEventIds,
|
|
allEvents,
|
|
})
|
|
}
|
|
}
|
|
|
|
return allEvents
|
|
}
|
|
|
|
async function collectEventsFromRelay(params: {
|
|
pool: SimplePoolWithSub
|
|
relayUrl: string
|
|
relayIndex: number
|
|
activeRelays: string[]
|
|
filters: Filter[]
|
|
relaySessionManager: RelaySessionManager
|
|
syncProgressManager: SyncProgressManager
|
|
processedEventIds: Set<string>
|
|
allEvents: Event[]
|
|
syncTimeoutMs: number
|
|
setSyncSubscription: SetSyncSubscription
|
|
}): Promise<void> {
|
|
setRelayProgress(params.syncProgressManager, 0, params.activeRelays.length, params.relayUrl)
|
|
|
|
try {
|
|
const relayEvents = await readEventsFromRelay({
|
|
pool: params.pool,
|
|
relayUrl: params.relayUrl,
|
|
filters: params.filters,
|
|
syncTimeoutMs: params.syncTimeoutMs,
|
|
setSyncSubscription: params.setSyncSubscription,
|
|
})
|
|
|
|
addUniqueEvents({ relayEvents, processedEventIds: params.processedEventIds, allEvents: params.allEvents })
|
|
await updateRelayLastSyncDateIfNeeded({ relayUrl: params.relayUrl, eventCount: relayEvents.length })
|
|
setRelayProgress(params.syncProgressManager, params.relayIndex + 1, params.activeRelays.length, params.relayUrl)
|
|
} catch (error) {
|
|
const errorMessage = error instanceof Error ? error.message : String(error)
|
|
console.warn(`[PlatformSync] Relay ${params.relayUrl} failed: ${errorMessage}`)
|
|
params.relaySessionManager.markRelayFailed(params.relayUrl)
|
|
setRelayProgress(params.syncProgressManager, params.relayIndex + 1, params.activeRelays.length, params.relayUrl)
|
|
}
|
|
}
|
|
|
|
function setRelayProgress(
|
|
syncProgressManager: SyncProgressManager,
|
|
currentStep: number,
|
|
totalSteps: number,
|
|
relayUrl: string
|
|
): void {
|
|
syncProgressManager.setProgress({ currentStep, totalSteps, completed: false, currentRelay: relayUrl })
|
|
}
|
|
|
|
async function readEventsFromRelay(params: {
|
|
pool: SimplePoolWithSub
|
|
relayUrl: string
|
|
filters: Filter[]
|
|
syncTimeoutMs: number
|
|
setSyncSubscription: SetSyncSubscription
|
|
}): Promise<Event[]> {
|
|
console.warn(`[PlatformSync] Synchronizing from relay: ${params.relayUrl}`)
|
|
|
|
const sub = createSubscription(params.pool, [params.relayUrl], params.filters)
|
|
params.setSyncSubscription(sub)
|
|
|
|
const relayEvents: Event[] = []
|
|
let eventCount = 0
|
|
|
|
const finalize = (): void => {
|
|
sub.unsub()
|
|
params.setSyncSubscription(null)
|
|
}
|
|
|
|
await new Promise<void>((resolve) => {
|
|
sub.on('event', (event: Event): void => {
|
|
eventCount = handleRelaySyncEvent({ event, relayUrl: params.relayUrl, relayEvents, eventCount })
|
|
})
|
|
|
|
sub.on('eose', (): void => {
|
|
console.warn(`[PlatformSync] Relay ${params.relayUrl} sent EOSE signal`)
|
|
finalize()
|
|
resolve()
|
|
})
|
|
|
|
const timeoutId = setTimeout((): void => {
|
|
console.warn(`[PlatformSync] Relay ${params.relayUrl} timeout after ${params.syncTimeoutMs}ms`)
|
|
finalize()
|
|
resolve()
|
|
}, params.syncTimeoutMs)
|
|
|
|
timeoutId.unref?.()
|
|
})
|
|
|
|
console.warn(
|
|
`[PlatformSync] Relay ${params.relayUrl} completed: received ${eventCount} total events from relay, ${relayEvents.length} filtered with service='${PLATFORM_SERVICE}'`
|
|
)
|
|
|
|
return relayEvents
|
|
}
|
|
|
|
function handleRelaySyncEvent(params: {
|
|
event: Event
|
|
relayUrl: string
|
|
relayEvents: Event[]
|
|
eventCount: number
|
|
}): number {
|
|
const nextCount = params.eventCount + 1
|
|
|
|
logRelaySyncProgress({ relayUrl: params.relayUrl, eventCount: nextCount })
|
|
logTargetEventReceived({ relayUrl: params.relayUrl, event: params.event, eventCount: nextCount })
|
|
|
|
const tags = extractTagsFromEvent(params.event)
|
|
logTargetEventTags({ event: params.event, tags })
|
|
|
|
if (tags.service === PLATFORM_SERVICE) {
|
|
params.relayEvents.push(params.event)
|
|
logTargetEventAccepted(params.event)
|
|
} else {
|
|
logTargetEventRejected({ event: params.event, tags })
|
|
}
|
|
|
|
return nextCount
|
|
}
|
|
|
|
function addUniqueEvents(params: { relayEvents: Event[]; processedEventIds: Set<string>; allEvents: Event[] }): void {
|
|
for (const event of params.relayEvents) {
|
|
if (!params.processedEventIds.has(event.id)) {
|
|
params.processedEventIds.add(event.id)
|
|
params.allEvents.push(event)
|
|
}
|
|
}
|
|
}
|
|
|
|
async function updateRelayLastSyncDateIfNeeded(params: { relayUrl: string; eventCount: number }): Promise<void> {
|
|
if (params.eventCount <= 0) {
|
|
return
|
|
}
|
|
|
|
const { configStorage } = await import('../configStorage')
|
|
const config = await configStorage.getConfig()
|
|
const relayConfig = config.relays.find((r) => r.url === params.relayUrl)
|
|
if (!relayConfig) {
|
|
return
|
|
}
|
|
await configStorage.updateRelay(relayConfig.id, { lastSyncDate: Date.now() })
|
|
}
|