/** * Background sync service that scans all notes with service='zapwall.fr' tag * and caches them in IndexedDB * Runs in background (non-blocking) and updates cache when new notes are published */ import type { Event } from 'nostr-tools' import type { SimplePoolWithSub } from '@/types/nostr-tools-extended' import { nostrService } from './nostr' import { PLATFORM_SERVICE, MIN_EVENT_DATE } from './platformConfig' import { buildTagFilter, extractTagsFromEvent } from './nostrTagSystem' import { objectCache } from './objectCache' import { parsePresentationEvent } from './articlePublisherHelpersPresentation' import { parseArticleFromEvent, parseSeriesFromEvent, parseReviewFromEvent, parsePurchaseFromEvent, parseReviewTipFromEvent, parseSponsoringFromEvent } from './nostrEventParsing' class PlatformSyncService { private syncInProgress = false private syncSubscription: { unsub: () => void } | null = null private lastSyncTime: number = 0 private readonly SYNC_INTERVAL_MS = 60000 // Sync every minute private readonly SYNC_TIMEOUT_MS = 30000 // 30 seconds timeout per sync /** * Start background sync * Scans all notes with service='zapwall.fr' and caches them * Does not require pubkey - syncs all public notes with service='zapwall.fr' */ async startSync(): Promise { if (this.syncInProgress) { return } const pool = nostrService.getPool() if (!pool) { console.warn('Pool not initialized, cannot start platform sync') return } this.syncInProgress = true // Update progress manager to show sync indicator const { syncProgressManager } = await import('./syncProgressManager') const { relaySessionManager } = await import('./relaySessionManager') const activeRelays = await relaySessionManager.getActiveRelays() const initialRelay = activeRelays[0] ?? 'Connecting...' const totalRelays = activeRelays.length || 1 syncProgressManager.setProgress({ currentStep: 0, totalSteps: totalRelays, completed: false, currentRelay: initialRelay }) try { await this.performSync(pool as unknown as SimplePoolWithSub) // Mark as completed after all relays are processed const finalRelay = activeRelays[activeRelays.length - 1] ?? initialRelay syncProgressManager.setProgress({ currentStep: totalRelays, totalSteps: totalRelays, completed: true, currentRelay: finalRelay }) } catch (error) { console.error('Error in platform sync:', error) syncProgressManager.setProgress(null) } finally { this.syncInProgress = false // Clear progress after a short delay setTimeout(() => { syncProgressManager.setProgress(null) }, 500) } } /** * Perform a sync operation * Scans all notes with service='zapwall.fr' tag from ALL active relays * Starts from January 5, 2026 00:00:00 UTC */ private async performSync(pool: SimplePoolWithSub): Promise { const filters = [ { ...buildTagFilter({ service: PLATFORM_SERVICE, }), since: MIN_EVENT_DATE, // January 5, 2026 00:00:00 UTC limit: 1000, // Get up to 1000 events per sync }, ] const { relaySessionManager } = await import('./relaySessionManager') const { syncProgressManager } = await import('./syncProgressManager') const activeRelays = await relaySessionManager.getActiveRelays() if (activeRelays.length === 0) { throw new Error('No active relays available') } const allEvents: Event[] = [] const processedEventIds = new Set() // Synchronize from all active relays for (let i = 0; i < activeRelays.length; i++) { const relayUrl = activeRelays[i] if (!relayUrl) { continue } // Update progress with current relay syncProgressManager.setProgress({ currentStep: 0, totalSteps: activeRelays.length, completed: false, currentRelay: relayUrl, }) try { console.log(`[PlatformSync] Synchronizing from relay ${i + 1}/${activeRelays.length}: ${relayUrl}`) const { createSubscription } = require('@/types/nostr-tools-extended') const sub = createSubscription(pool, [relayUrl], filters) const relayEvents: Event[] = [] let resolved = false const finalize = (): void => { if (resolved) { return } resolved = true sub.unsub() // Deduplicate events by ID before adding to allEvents for (const event of relayEvents) { if (!processedEventIds.has(event.id)) { processedEventIds.add(event.id) allEvents.push(event) } } console.log(`[PlatformSync] Relay ${relayUrl} completed, received ${relayEvents.length} events`) } await new Promise((resolve) => { sub.on('event', (event: Event): void => { // Only process events with service='zapwall.fr' const tags = extractTagsFromEvent(event) if (tags.service === PLATFORM_SERVICE) { relayEvents.push(event) } }) sub.on('eose', (): void => { finalize() resolve() }) // Timeout after SYNC_TIMEOUT_MS setTimeout((): void => { finalize() resolve() }, this.SYNC_TIMEOUT_MS).unref?.() this.syncSubscription = sub }) // Update progress after each relay syncProgressManager.setProgress({ currentStep: i + 1, totalSteps: activeRelays.length, completed: false, currentRelay: relayUrl, }) } catch (error) { const errorMessage = error instanceof Error ? error.message : String(error) console.warn(`[PlatformSync] Relay ${relayUrl} failed: ${errorMessage}`) // Mark relay as failed but continue with next relay relaySessionManager.markRelayFailed(relayUrl) // Update progress even on failure syncProgressManager.setProgress({ currentStep: i + 1, totalSteps: activeRelays.length, completed: false, currentRelay: relayUrl, }) } } // Process all collected events await this.processAndCacheEvents(allEvents) console.log(`[PlatformSync] Total events collected from all relays: ${allEvents.length}`) this.lastSyncTime = Date.now() } /** * Process events and cache them by type */ private async processAndCacheEvents(events: Event[]): Promise { for (const event of events) { try { await this.processEvent(event) } catch (error) { console.error('Error processing event:', error, event.id) // Continue processing other events even if one fails } } } /** * Process a single event and cache it */ private async processEvent(event: Event): Promise { const tags = extractTagsFromEvent(event) // Skip hidden events if (tags.hidden) { return } // Try to parse and cache by type if (tags.type === 'author') { const parsed = await parsePresentationEvent(event) if (parsed && parsed.hash) { await objectCache.set('author', parsed.hash, event, parsed, tags.version ?? 0, tags.hidden, parsed.index) } } else if (tags.type === 'series') { const parsed = await parseSeriesFromEvent(event) if (parsed && parsed.hash) { await objectCache.set('series', parsed.hash, event, parsed, tags.version ?? 0, tags.hidden, parsed.index) } } else if (tags.type === 'publication') { const parsed = await parseArticleFromEvent(event) if (parsed && parsed.hash) { await objectCache.set('publication', parsed.hash, event, parsed, tags.version ?? 0, tags.hidden, parsed.index) } } else if (tags.type === 'quote') { const parsed = await parseReviewFromEvent(event) if (parsed && parsed.hash) { await objectCache.set('review', parsed.hash, event, parsed, tags.version ?? 0, tags.hidden, parsed.index) } } else if (event.kind === 9735) { // Zap receipts (kind 9735) can be sponsoring, purchase, or review_tip const sponsoring = await parseSponsoringFromEvent(event) if (sponsoring && sponsoring.hash) { await objectCache.set('sponsoring', sponsoring.hash, event, sponsoring, 0, false, sponsoring.index) } else { const purchase = await parsePurchaseFromEvent(event) if (purchase && purchase.hash) { await objectCache.set('purchase', purchase.hash, event, purchase, 0, false, purchase.index) } else { const reviewTip = await parseReviewTipFromEvent(event) if (reviewTip && reviewTip.hash) { await objectCache.set('review_tip', reviewTip.hash, event, reviewTip, 0, false, reviewTip.index) } } } } } /** * Start continuous sync (runs periodically) */ startContinuousSync(): void { // Start initial sync void this.startSync() // Schedule periodic syncs setInterval(() => { if (!this.syncInProgress) { void this.startSync() } }, this.SYNC_INTERVAL_MS) } /** * Stop sync */ stopSync(): void { if (this.syncSubscription) { this.syncSubscription.unsub() this.syncSubscription = null } this.syncInProgress = false } /** * Check if sync is in progress */ isSyncing(): boolean { return this.syncInProgress } /** * Get last sync time */ getLastSyncTime(): number { return this.lastSyncTime } } export const platformSyncService = new PlatformSyncService()