199 lines
6.1 KiB
TypeScript
199 lines
6.1 KiB
TypeScript
import type { Event, Filter } from 'nostr-tools'
|
|
import type { SimplePoolWithSub } from '@/types/nostr-tools-extended'
|
|
import { nostrService } from '../nostr'
|
|
import { MIN_EVENT_DATE, PLATFORM_SERVICE } from '../platformConfig'
|
|
import { cachePlatformEvent } from './cacheEvent'
|
|
import { SYNC_INTERVAL_MS, SYNC_TIMEOUT_MS } from './constants'
|
|
import { collectEventsFromRelays, type RelaySessionManager, type SyncProgressManager } from './relayCollection'
|
|
|
|
export class PlatformSyncService {
|
|
private syncInProgress = false
|
|
private syncSubscription: { unsub: () => void } | null = null
|
|
private lastSyncTime = 0
|
|
|
|
/**
|
|
* Start background sync
|
|
* Scans all notes with service='zapwall.fr' and caches them
|
|
* Does not require pubkey - syncs all public notes with service='zapwall.fr'
|
|
*/
|
|
public async startSync(): Promise<void> {
|
|
if (this.syncInProgress) {
|
|
return
|
|
}
|
|
|
|
const pool = nostrService.getPool()
|
|
if (!pool) {
|
|
console.warn('Pool not initialized, cannot start platform sync')
|
|
return
|
|
}
|
|
|
|
this.syncInProgress = true
|
|
|
|
const { relaySessionManager, syncProgressManager, activeRelays } = await this.loadSyncManagers()
|
|
const initialRelay = activeRelays[0] ?? 'Connecting...'
|
|
const totalRelays = activeRelays.length ?? 1
|
|
syncProgressManager.setProgress({ currentStep: 0, totalSteps: totalRelays, completed: false, currentRelay: initialRelay })
|
|
|
|
try {
|
|
await this.performSync({ pool: pool as unknown as SimplePoolWithSub, relaySessionManager, syncProgressManager, activeRelays })
|
|
const finalRelay = activeRelays[activeRelays.length - 1] ?? initialRelay
|
|
syncProgressManager.setProgress({ currentStep: totalRelays, totalSteps: totalRelays, completed: true, currentRelay: finalRelay })
|
|
} catch (error) {
|
|
console.error('Error in platform sync:', error)
|
|
syncProgressManager.setProgress(null)
|
|
} finally {
|
|
this.syncInProgress = false
|
|
setTimeout(() => {
|
|
syncProgressManager.setProgress(null)
|
|
}, 500)
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Start continuous sync (runs periodically)
|
|
* Can use Service Worker if available, otherwise falls back to setInterval
|
|
*/
|
|
public async startContinuousSync(): Promise<void> {
|
|
if (typeof window !== 'undefined') {
|
|
const usedSw = await this.tryStartServiceWorkerSync()
|
|
if (usedSw) {
|
|
void this.startSync()
|
|
return
|
|
}
|
|
}
|
|
|
|
void this.startSync()
|
|
setInterval(() => {
|
|
if (!this.syncInProgress) {
|
|
void this.startSync()
|
|
}
|
|
}, SYNC_INTERVAL_MS)
|
|
}
|
|
|
|
/**
|
|
* Stop sync
|
|
*/
|
|
public async stopSync(): Promise<void> {
|
|
if (typeof window !== 'undefined') {
|
|
await this.tryStopServiceWorkerSync()
|
|
}
|
|
|
|
if (this.syncSubscription) {
|
|
this.syncSubscription.unsub()
|
|
this.syncSubscription = null
|
|
}
|
|
this.syncInProgress = false
|
|
}
|
|
|
|
/**
|
|
* Check if sync is in progress
|
|
*/
|
|
public isSyncing(): boolean {
|
|
return this.syncInProgress
|
|
}
|
|
|
|
/**
|
|
* Get last sync time
|
|
*/
|
|
public getLastSyncTime(): number {
|
|
return this.lastSyncTime
|
|
}
|
|
|
|
/**
|
|
* Perform a sync operation
|
|
* Scans all notes with service='zapwall.fr' tag from ALL active relays
|
|
* Starts from January 5, 2026 00:00:00 UTC
|
|
*/
|
|
private async performSync(params: {
|
|
pool: SimplePoolWithSub
|
|
relaySessionManager: RelaySessionManager
|
|
syncProgressManager: SyncProgressManager
|
|
activeRelays: string[]
|
|
}): Promise<void> {
|
|
const filters = buildSyncFilters()
|
|
logSyncStart(filters)
|
|
|
|
if (params.activeRelays.length === 0) {
|
|
throw new Error('No active relays available')
|
|
}
|
|
|
|
const allEvents = await collectEventsFromRelays({
|
|
pool: params.pool,
|
|
activeRelays: params.activeRelays,
|
|
filters,
|
|
relaySessionManager: params.relaySessionManager,
|
|
syncProgressManager: params.syncProgressManager,
|
|
syncTimeoutMs: SYNC_TIMEOUT_MS,
|
|
setSyncSubscription: (sub): void => {
|
|
this.syncSubscription = sub
|
|
},
|
|
})
|
|
|
|
await processAndCacheEvents(allEvents)
|
|
console.warn(`[PlatformSync] Total events collected from all relays: ${allEvents.length}`)
|
|
this.lastSyncTime = Date.now()
|
|
}
|
|
|
|
private async loadSyncManagers(): Promise<{
|
|
relaySessionManager: RelaySessionManager & { getActiveRelays: () => Promise<string[]> }
|
|
syncProgressManager: SyncProgressManager
|
|
activeRelays: string[]
|
|
}> {
|
|
const { syncProgressManager } = await import('../syncProgressManager')
|
|
const { relaySessionManager } = await import('../relaySessionManager')
|
|
const activeRelays = await relaySessionManager.getActiveRelays()
|
|
return { relaySessionManager, syncProgressManager, activeRelays }
|
|
}
|
|
|
|
private async tryStartServiceWorkerSync(): Promise<boolean> {
|
|
try {
|
|
const { swClient } = await import('../swClient')
|
|
const isReady = await swClient.isReady()
|
|
if (!isReady) {
|
|
return false
|
|
}
|
|
console.warn('[PlatformSync] Using Service Worker for background sync')
|
|
await swClient.startPlatformSync()
|
|
return true
|
|
} catch (error) {
|
|
console.warn('[PlatformSync] Service Worker not available, using setInterval:', error)
|
|
return false
|
|
}
|
|
}
|
|
|
|
private async tryStopServiceWorkerSync(): Promise<void> {
|
|
try {
|
|
const { swClient } = await import('../swClient')
|
|
const isReady = await swClient.isReady()
|
|
if (isReady) {
|
|
await swClient.stopPlatformSync()
|
|
}
|
|
} catch {
|
|
// Ignore errors
|
|
}
|
|
}
|
|
}
|
|
|
|
function buildSyncFilters(): Filter[] {
|
|
return [{ kinds: [1], since: MIN_EVENT_DATE, limit: 1000 }]
|
|
}
|
|
|
|
function logSyncStart(filters: Filter[]): void {
|
|
console.warn(
|
|
`[PlatformSync] Starting sync with filter (no #service filter on relay side):`,
|
|
JSON.stringify(filters, null, 2)
|
|
)
|
|
console.warn(`[PlatformSync] MIN_EVENT_DATE: ${MIN_EVENT_DATE} (${new Date(MIN_EVENT_DATE * 1000).toISOString()})`)
|
|
console.warn(`[PlatformSync] Will filter by service='${PLATFORM_SERVICE}' client-side after receiving events`)
|
|
}
|
|
|
|
async function processAndCacheEvents(events: Event[]): Promise<void> {
|
|
for (const event of events) {
|
|
try {
|
|
await cachePlatformEvent(event)
|
|
} catch (error) {
|
|
console.error('Error processing event:', error, event.id)
|
|
}
|
|
}
|
|
}
|