/** * Worker service for republishing unpublished objects to relays * Continuously attempts to publish objects with published === false */ import { objectCache, type ObjectType } from './objectCache' import { relaySessionManager } from './relaySessionManager' import { publishLog } from './publishLog' import { writeService } from './writeService' import { getPublishRelays } from './relaySelection' const REPUBLISH_INTERVAL_MS = 30000 // 30 seconds const MAX_RETRIES_PER_OBJECT = 10 const RETRY_DELAY_MS = 5000 // 5 seconds between retries for same object interface UnpublishedObject { objectType: ObjectType id: string event: import('nostr-tools').Event retryCount: number lastRetryAt: number } class PublishWorkerService { private isRunning = false private intervalId: ReturnType | null = null private unpublishedObjects: Map = new Map() private processing = false /** * Start the publish worker * Can use Service Worker if available, otherwise falls back to setInterval */ async start(): Promise { if (this.isRunning) { return } this.isRunning = true console.warn('[PublishWorker] Starting publish worker') // Try to use Service Worker for background processing if (typeof window !== 'undefined') { try { const { swClient } = await import('./swClient') const isReady = await swClient.isReady() if (isReady) { console.warn('[PublishWorker] Using Service Worker for background processing') await swClient.startPublishWorker() // Still process immediately in main thread void this.processUnpublished() return } } catch (error) { console.warn('[PublishWorker] Service Worker not available, using setInterval:', error) } } // Fallback to setInterval if Service Worker not available // Start processing immediately void this.processUnpublished() // Then process periodically this.intervalId = setInterval(() => { void this.processUnpublished() }, REPUBLISH_INTERVAL_MS) } /** * Stop the publish worker */ async stop(): Promise { if (!this.isRunning) { return } this.isRunning = false // Stop Service Worker if active if (typeof window !== 'undefined') { try { const { swClient } = await import('./swClient') const isReady = await swClient.isReady() if (isReady) { await swClient.stopPublishWorker() } } catch { // Ignore errors } } // Stop local interval if (this.intervalId) { clearInterval(this.intervalId) this.intervalId = null } console.warn('[PublishWorker] Stopped publish worker') } /** * Process all unpublished objects * Made public for Service Worker access */ async processUnpublished(): Promise { if (this.processing) { return } this.processing = true try { const objectTypes = getAllPublishableObjectTypes() const now = Date.now() await this.refreshUnpublishedMap({ objectTypes, now }) await this.processQueuedObjects() } catch (error) { console.error('[PublishWorker] Error processing unpublished objects:', error) } finally { this.processing = false } } private async refreshUnpublishedMap(params: { objectTypes: ObjectType[]; now: number }): Promise { for (const objectType of params.objectTypes) { const unpublished = await objectCache.getUnpublished(objectType) this.upsertUnpublishedObjects({ objectType, unpublished, now: params.now }) } } private upsertUnpublishedObjects(params: { objectType: ObjectType unpublished: Array<{ id: string; event: import('nostr-tools').Event }> now: number }): void { for (const { id, event } of params.unpublished) { const key = buildUnpublishedKey(params.objectType, id) const existing = this.unpublishedObjects.get(key) if (existing && existing.retryCount >= MAX_RETRIES_PER_OBJECT) { console.warn(`[PublishWorker] Max retries reached for ${params.objectType}:${id}, skipping`) } const shouldSkip = Boolean(existing && params.now - existing.lastRetryAt < RETRY_DELAY_MS) if (!shouldSkip && (!existing || existing.retryCount < MAX_RETRIES_PER_OBJECT)) { this.unpublishedObjects.set(key, { objectType: params.objectType, id, event, retryCount: existing?.retryCount ?? 0, lastRetryAt: params.now, }) } } } private async processQueuedObjects(): Promise { const objectsToProcess = Array.from(this.unpublishedObjects.entries()) for (const [key, obj] of objectsToProcess) { await this.attemptPublish({ key, obj }) } } /** * Attempt to publish an unpublished object * Uses websocketService to route events to Service Worker */ private async attemptPublish(params: { key: string; obj: UnpublishedObject }): Promise { const { obj } = params try { const { websocketService } = await import('./websocketService') const relays = await getPublishRelays() console.warn(`[PublishWorker] Attempting to publish ${obj.objectType}:${obj.id} to ${relays.length} relay(s)`) const statuses = await websocketService.publishEvent(obj.event, relays) const successfulRelays = this.processPublishStatuses({ obj, relays, statuses }) await this.finalizePublishAttempt({ key: params.key, obj, successfulRelays }) } catch (error) { console.error(`[PublishWorker] Error publishing ${obj.objectType}:${obj.id}:`, error) this.incrementRetryOrRemove({ key: params.key, fallbackObj: params.obj }) } } private processPublishStatuses(params: { obj: UnpublishedObject relays: string[] statuses: { success: boolean; error?: string }[] }): string[] { const successfulRelays: string[] = [] params.statuses.forEach((status, index) => { const relayUrl = params.relays[index] if (!relayUrl) { return } if (status.success) { successfulRelays.push(relayUrl) void publishLog.logPublication({ eventId: params.obj.event.id, relayUrl, success: true, objectType: params.obj.objectType, objectId: params.obj.id }) return } const errorMessage = status.error ?? 'Unknown error' console.warn(`[PublishWorker] Relay ${relayUrl} failed for ${params.obj.objectType}:${params.obj.id}:`, errorMessage) relaySessionManager.markRelayFailed(relayUrl) void publishLog.logPublication({ eventId: params.obj.event.id, relayUrl, success: false, error: errorMessage, objectType: params.obj.objectType, objectId: params.obj.id }) }) return successfulRelays } private async finalizePublishAttempt(params: { key: string; obj: UnpublishedObject; successfulRelays: string[] }): Promise { if (params.successfulRelays.length > 0) { await writeService.updatePublished(params.obj.objectType, params.obj.id, params.successfulRelays) console.warn(`[PublishWorker] Successfully published ${params.obj.objectType}:${params.obj.id} to ${params.successfulRelays.length} relay(s)`) this.unpublishedObjects.delete(params.key) return } this.incrementRetryOrRemove({ key: params.key, fallbackObj: params.obj }) } private incrementRetryOrRemove(params: { key: string; fallbackObj: UnpublishedObject }): void { const current = this.unpublishedObjects.get(params.key) const base = current ?? params.fallbackObj const next = { ...base, retryCount: base.retryCount + 1, lastRetryAt: Date.now() } this.unpublishedObjects.set(params.key, next) console.warn(`[PublishWorker] All relays failed for ${next.objectType}:${next.id}, retry count: ${next.retryCount}/${MAX_RETRIES_PER_OBJECT}`) if (next.retryCount >= MAX_RETRIES_PER_OBJECT) { this.unpublishedObjects.delete(params.key) } } } export const publishWorker = new PublishWorkerService() function getAllPublishableObjectTypes(): ObjectType[] { return ['author', 'series', 'publication', 'review', 'purchase', 'sponsoring', 'review_tip', 'payment_note'] } function buildUnpublishedKey(objectType: ObjectType, id: string): string { return `${objectType}:${id}` }