story-research-zapwall/lib/platformSync.ts
2026-01-07 00:05:01 +01:00

367 lines
14 KiB
TypeScript

/**
* Background sync service that scans all notes with service='zapwall.fr' tag
* and caches them in IndexedDB
* Runs in background (non-blocking) and updates cache when new notes are published
*/
import type { Event } from 'nostr-tools'
import type { SimplePoolWithSub } from '@/types/nostr-tools-extended'
import { nostrService } from './nostr'
import { PLATFORM_SERVICE, MIN_EVENT_DATE } from './platformConfig'
import { extractTagsFromEvent } from './nostrTagSystem'
import { objectCache } from './objectCache'
import { parsePresentationEvent } from './articlePublisherHelpersPresentation'
import { parseArticleFromEvent, parseSeriesFromEvent, parseReviewFromEvent, parsePurchaseFromEvent, parseReviewTipFromEvent, parseSponsoringFromEvent } from './nostrEventParsing'
class PlatformSyncService {
private syncInProgress = false
private syncSubscription: { unsub: () => void } | null = null
private lastSyncTime: number = 0
private readonly SYNC_INTERVAL_MS = 60000 // Sync every minute
private readonly SYNC_TIMEOUT_MS = 60000 // 60 seconds timeout per relay (increased from 30s)
/**
* Start background sync
* Scans all notes with service='zapwall.fr' and caches them
* Does not require pubkey - syncs all public notes with service='zapwall.fr'
*/
async startSync(): Promise<void> {
if (this.syncInProgress) {
return
}
const pool = nostrService.getPool()
if (!pool) {
console.warn('Pool not initialized, cannot start platform sync')
return
}
this.syncInProgress = true
// Update progress manager to show sync indicator
const { syncProgressManager } = await import('./syncProgressManager')
const { relaySessionManager } = await import('./relaySessionManager')
const activeRelays = await relaySessionManager.getActiveRelays()
const initialRelay = activeRelays[0] ?? 'Connecting...'
const totalRelays = activeRelays.length ?? 1
syncProgressManager.setProgress({ currentStep: 0, totalSteps: totalRelays, completed: false, currentRelay: initialRelay })
try {
await this.performSync(pool as unknown as SimplePoolWithSub)
// Mark as completed after all relays are processed
const finalRelay = activeRelays[activeRelays.length - 1] ?? initialRelay
syncProgressManager.setProgress({ currentStep: totalRelays, totalSteps: totalRelays, completed: true, currentRelay: finalRelay })
} catch (error) {
console.error('Error in platform sync:', error)
syncProgressManager.setProgress(null)
} finally {
this.syncInProgress = false
// Clear progress after a short delay
setTimeout(() => {
syncProgressManager.setProgress(null)
}, 500)
}
}
/**
* Perform a sync operation
* Scans all notes with service='zapwall.fr' tag from ALL active relays
* Starts from January 5, 2026 00:00:00 UTC
*/
private async performSync(pool: SimplePoolWithSub): Promise<void> {
// Don't filter by #service on relay side - some relays don't support it well
// Instead, fetch all kind 1 events since MIN_EVENT_DATE and filter client-side
const filters = [
{
kinds: [1],
since: MIN_EVENT_DATE, // January 5, 2026 00:00:00 UTC
limit: 1000, // Get up to 1000 events per sync
},
]
console.warn(`[PlatformSync] Starting sync with filter (no #service filter on relay side):`, JSON.stringify(filters, null, 2))
console.warn(`[PlatformSync] MIN_EVENT_DATE: ${MIN_EVENT_DATE} (${new Date(MIN_EVENT_DATE * 1000).toISOString()})`)
console.warn(`[PlatformSync] Will filter by service='${PLATFORM_SERVICE}' client-side after receiving events`)
const { relaySessionManager } = await import('./relaySessionManager')
const { syncProgressManager } = await import('./syncProgressManager')
const activeRelays = await relaySessionManager.getActiveRelays()
if (activeRelays.length === 0) {
throw new Error('No active relays available')
}
const allEvents: Event[] = []
const processedEventIds = new Set<string>()
// Synchronize from all active relays
for (let i = 0; i < activeRelays.length; i++) {
const relayUrl = activeRelays[i]
if (!relayUrl) {
continue
}
// Update progress with current relay
syncProgressManager.setProgress({
currentStep: 0,
totalSteps: activeRelays.length,
completed: false,
currentRelay: relayUrl,
})
try {
console.warn(`[PlatformSync] Synchronizing from relay ${i + 1}/${activeRelays.length}: ${relayUrl}`)
const { createSubscription } = require('@/types/nostr-tools-extended')
const sub = createSubscription(pool, [relayUrl], filters)
const relayEvents: Event[] = []
let resolved = false
let eventCount = 0
const finalize = (): void => {
if (resolved) {
return
}
resolved = true
sub.unsub()
// Deduplicate events by ID before adding to allEvents
for (const event of relayEvents) {
if (!processedEventIds.has(event.id)) {
processedEventIds.add(event.id)
allEvents.push(event)
}
}
console.warn(`[PlatformSync] Relay ${relayUrl} completed: received ${eventCount} total events from relay, ${relayEvents.length} filtered with service='${PLATFORM_SERVICE}'`)
}
await new Promise<void>((resolve) => {
sub.on('event', (event: Event): void => {
eventCount++
// Log every 100th event to track progress (reduced frequency since we'll get more events)
if (eventCount % 100 === 0) {
console.warn(`[PlatformSync] Received ${eventCount} events from relay ${relayUrl} (client-side filtering in progress)`)
}
// Log target event for debugging (always log if we receive it)
if (event.id === '527d83e0af20bf23c3e104974090ccc21536ece72c24eb784b3642890f63b763') {
console.warn(`[PlatformSync] ✅ Received target event from relay ${relayUrl} (event #${eventCount}):`, {
id: event.id,
created_at: event.created_at,
created_at_date: new Date(event.created_at * 1000).toISOString(),
pubkey: event.pubkey,
allTags: event.tags,
serviceTags: event.tags.filter((tag) => tag[0] === 'service'),
})
}
// Filter client-side: only process events with service='zapwall.fr'
const tags = extractTagsFromEvent(event)
if (event.id === '527d83e0af20bf23c3e104974090ccc21536ece72c24eb784b3642890f63b763') {
console.warn(`[PlatformSync] Extracted tags for target event:`, {
extractedTags: tags,
hasServiceTag: tags.service === PLATFORM_SERVICE,
serviceValue: tags.service,
expectedService: PLATFORM_SERVICE,
})
}
if (tags.service === PLATFORM_SERVICE) {
relayEvents.push(event)
if (event.id === '527d83e0af20bf23c3e104974090ccc21536ece72c24eb784b3642890f63b763') {
console.warn(`[PlatformSync] Target event accepted and added to relayEvents`)
}
} else {
// Log events that match filter but don't have service tag
if (event.id === '527d83e0af20bf23c3e104974090ccc21536ece72c24eb784b3642890f63b763') {
console.warn(`[PlatformSync] Event ${event.id} rejected: service tag is "${tags.service}", expected "${PLATFORM_SERVICE}"`)
}
}
})
sub.on('eose', (): void => {
console.warn(`[PlatformSync] Relay ${relayUrl} sent EOSE signal`)
finalize()
resolve()
})
// Timeout after SYNC_TIMEOUT_MS
const timeoutId = setTimeout((): void => {
console.warn(`[PlatformSync] Relay ${relayUrl} timeout after ${this.SYNC_TIMEOUT_MS}ms`)
finalize()
resolve()
}, this.SYNC_TIMEOUT_MS)
timeoutId.unref?.()
this.syncSubscription = sub
})
// Update progress after each relay
syncProgressManager.setProgress({
currentStep: i + 1,
totalSteps: activeRelays.length,
completed: false,
currentRelay: relayUrl,
})
} catch (error) {
const errorMessage = error instanceof Error ? error.message : String(error)
console.warn(`[PlatformSync] Relay ${relayUrl} failed: ${errorMessage}`)
// Mark relay as failed but continue with next relay
relaySessionManager.markRelayFailed(relayUrl)
// Update progress even on failure
syncProgressManager.setProgress({
currentStep: i + 1,
totalSteps: activeRelays.length,
completed: false,
currentRelay: relayUrl,
})
}
}
// Process all collected events
await this.processAndCacheEvents(allEvents)
console.warn(`[PlatformSync] Total events collected from all relays: ${allEvents.length}`)
this.lastSyncTime = Date.now()
}
/**
* Process events and cache them by type
*/
private async processAndCacheEvents(events: Event[]): Promise<void> {
for (const event of events) {
try {
await this.processEvent(event)
} catch (error) {
console.error('Error processing event:', error, event.id)
// Continue processing other events even if one fails
}
}
}
/**
* Process a single event and cache it
*/
private async processEvent(event: Event): Promise<void> {
const tags = extractTagsFromEvent(event)
// Log target event for debugging
if (event.id === '527d83e0af20bf23c3e104974090ccc21536ece72c24eb784b3642890f63b763') {
console.warn(`[PlatformSync] Processing target event:`, {
id: event.id,
type: tags.type,
hidden: tags.hidden,
service: tags.service,
version: tags.version,
})
}
// Skip hidden events
if (tags.hidden) {
if (event.id === '527d83e0af20bf23c3e104974090ccc21536ece72c24eb784b3642890f63b763') {
console.warn(`[PlatformSync] Target event skipped: hidden=${tags.hidden}`)
}
return
}
// Try to parse and cache by type
if (tags.type === 'author') {
if (event.id === '527d83e0af20bf23c3e104974090ccc21536ece72c24eb784b3642890f63b763') {
console.warn(`[PlatformSync] Attempting to parse target event as author presentation`)
}
const parsed = await parsePresentationEvent(event)
if (event.id === '527d83e0af20bf23c3e104974090ccc21536ece72c24eb784b3642890f63b763') {
console.warn(`[PlatformSync] parsePresentationEvent result for target event:`, {
parsed: parsed !== null,
hasHash: parsed?.hash !== undefined,
hash: parsed?.hash,
})
}
if (parsed && parsed.hash) {
await objectCache.set('author', parsed.hash, event, parsed, tags.version ?? 0, tags.hidden, parsed.index)
if (event.id === '527d83e0af20bf23c3e104974090ccc21536ece72c24eb784b3642890f63b763') {
console.warn(`[PlatformSync] Target event cached successfully as author with hash:`, parsed.hash)
}
} else if (event.id === '527d83e0af20bf23c3e104974090ccc21536ece72c24eb784b3642890f63b763') {
console.warn(`[PlatformSync] Target event NOT cached: parsed=${parsed !== null}, hasHash=${parsed?.hash !== undefined}`)
}
} else if (tags.type === 'series') {
const parsed = await parseSeriesFromEvent(event)
if (parsed && parsed.hash) {
await objectCache.set('series', parsed.hash, event, parsed, tags.version ?? 0, tags.hidden, parsed.index)
}
} else if (tags.type === 'publication') {
const parsed = await parseArticleFromEvent(event)
if (parsed && parsed.hash) {
await objectCache.set('publication', parsed.hash, event, parsed, tags.version ?? 0, tags.hidden, parsed.index)
}
} else if (tags.type === 'quote') {
const parsed = await parseReviewFromEvent(event)
if (parsed && parsed.hash) {
await objectCache.set('review', parsed.hash, event, parsed, tags.version ?? 0, tags.hidden, parsed.index)
}
} else if (event.kind === 9735) {
// Zap receipts (kind 9735) can be sponsoring, purchase, or review_tip
const sponsoring = await parseSponsoringFromEvent(event)
if (sponsoring && sponsoring.hash) {
await objectCache.set('sponsoring', sponsoring.hash, event, sponsoring, 0, false, sponsoring.index)
} else {
const purchase = await parsePurchaseFromEvent(event)
if (purchase && purchase.hash) {
await objectCache.set('purchase', purchase.hash, event, purchase, 0, false, purchase.index)
} else {
const reviewTip = await parseReviewTipFromEvent(event)
if (reviewTip && reviewTip.hash) {
await objectCache.set('review_tip', reviewTip.hash, event, reviewTip, 0, false, reviewTip.index)
}
}
}
}
}
/**
* Start continuous sync (runs periodically)
*/
startContinuousSync(): void {
// Start initial sync
void this.startSync()
// Schedule periodic syncs
setInterval(() => {
if (!this.syncInProgress) {
void this.startSync()
}
}, this.SYNC_INTERVAL_MS)
}
/**
* Stop sync
*/
stopSync(): void {
if (this.syncSubscription) {
this.syncSubscription.unsub()
this.syncSubscription = null
}
this.syncInProgress = false
}
/**
* Check if sync is in progress
*/
isSyncing(): boolean {
return this.syncInProgress
}
/**
* Get last sync time
*/
getLastSyncTime(): number {
return this.lastSyncTime
}
}
export const platformSyncService = new PlatformSyncService()