story-research-zapwall/lib/publishWorker.ts
2026-01-08 23:04:56 +01:00

223 lines
7.3 KiB
TypeScript

/**
* Worker service for republishing unpublished objects to relays
* Continuously attempts to publish objects with published === false
*/
import { objectCache, type ObjectType } from './objectCache'
import { relaySessionManager } from './relaySessionManager'
import { publishLog } from './publishLog'
import { writeService } from './writeService'
const REPUBLISH_INTERVAL_MS = 30000 // 30 seconds
const MAX_RETRIES_PER_OBJECT = 10
const RETRY_DELAY_MS = 5000 // 5 seconds between retries for same object
interface UnpublishedObject {
objectType: ObjectType
id: string
event: import('nostr-tools').Event
retryCount: number
lastRetryAt: number
}
class PublishWorkerService {
private isRunning = false
private intervalId: ReturnType<typeof setInterval> | null = null
private unpublishedObjects: Map<string, UnpublishedObject> = new Map()
private processing = false
/**
* Start the publish worker
* Can use Service Worker if available, otherwise falls back to setInterval
*/
async start(): Promise<void> {
if (this.isRunning) {
return
}
this.isRunning = true
console.warn('[PublishWorker] Starting publish worker')
// Try to use Service Worker for background processing
if (typeof window !== 'undefined') {
try {
const { swClient } = await import('./swClient')
const isReady = await swClient.isReady()
if (isReady) {
console.warn('[PublishWorker] Using Service Worker for background processing')
await swClient.startPublishWorker()
// Still process immediately in main thread
void this.processUnpublished()
return
}
} catch (error) {
console.warn('[PublishWorker] Service Worker not available, using setInterval:', error)
}
}
// Fallback to setInterval if Service Worker not available
// Start processing immediately
void this.processUnpublished()
// Then process periodically
this.intervalId = setInterval(() => {
void this.processUnpublished()
}, REPUBLISH_INTERVAL_MS)
}
/**
* Stop the publish worker
*/
async stop(): Promise<void> {
if (!this.isRunning) {
return
}
this.isRunning = false
// Stop Service Worker if active
if (typeof window !== 'undefined') {
try {
const { swClient } = await import('./swClient')
const isReady = await swClient.isReady()
if (isReady) {
await swClient.stopPublishWorker()
}
} catch {
// Ignore errors
}
}
// Stop local interval
if (this.intervalId) {
clearInterval(this.intervalId)
this.intervalId = null
}
console.warn('[PublishWorker] Stopped publish worker')
}
/**
* Process all unpublished objects
* Made public for Service Worker access
*/
async processUnpublished(): Promise<void> {
if (this.processing) {
return
}
this.processing = true
try {
// Load unpublished objects from all object types
const objectTypes: ObjectType[] = ['author', 'series', 'publication', 'review', 'purchase', 'sponsoring', 'review_tip', 'payment_note']
for (const objectType of objectTypes) {
const unpublished = await objectCache.getUnpublished(objectType)
for (const { id, event } of unpublished) {
const key = `${objectType}:${id}`
const existing = this.unpublishedObjects.get(key)
// Skip if recently retried or max retries reached
const recentlyRetried = existing && Date.now() - existing.lastRetryAt < RETRY_DELAY_MS
const maxRetriesReached = existing && existing.retryCount >= MAX_RETRIES_PER_OBJECT
if (maxRetriesReached) {
console.warn(`[PublishWorker] Max retries reached for ${objectType}:${id}, skipping`)
} else if (!recentlyRetried) {
// Add or update in map
this.unpublishedObjects.set(key, {
objectType,
id,
event,
retryCount: existing?.retryCount ?? 0,
lastRetryAt: Date.now(),
})
}
}
}
// Process all unpublished objects
const objectsToProcess = Array.from(this.unpublishedObjects.values())
for (const obj of objectsToProcess) {
await this.attemptPublish(obj)
}
} catch (error) {
console.error('[PublishWorker] Error processing unpublished objects:', error)
} finally {
this.processing = false
}
}
/**
* Attempt to publish an unpublished object
* Uses websocketService to route events to Service Worker
*/
private async attemptPublish(obj: UnpublishedObject): Promise<void> {
try {
const { websocketService } = await import('./websocketService')
const activeRelays = await relaySessionManager.getActiveRelays()
if (activeRelays.length === 0) {
const { getPrimaryRelaySync } = await import('./config')
const relayUrl = getPrimaryRelaySync()
activeRelays.push(relayUrl)
}
console.warn(`[PublishWorker] Attempting to publish ${obj.objectType}:${obj.id} to ${activeRelays.length} relay(s)`)
// Publish to all active relays via websocketService (routes to Service Worker)
const statuses = await websocketService.publishEvent(obj.event, activeRelays)
const successfulRelays: string[] = []
statuses.forEach((status, index) => {
const relayUrl = activeRelays[index]
if (!relayUrl) {
return
}
if (status.success) {
successfulRelays.push(relayUrl)
// Log successful publication
void publishLog.logPublication(obj.event.id, relayUrl, true, undefined, obj.objectType, obj.id)
} else {
const errorMessage = status.error ?? 'Unknown error'
console.warn(`[PublishWorker] Relay ${relayUrl} failed for ${obj.objectType}:${obj.id}:`, errorMessage)
relaySessionManager.markRelayFailed(relayUrl)
// Log failed publication
void publishLog.logPublication(obj.event.id, relayUrl, false, errorMessage, obj.objectType, obj.id)
}
})
// Update published status via writeService
if (successfulRelays.length > 0) {
await writeService.updatePublished(obj.objectType, obj.id, successfulRelays)
console.warn(`[PublishWorker] Successfully published ${obj.objectType}:${obj.id} to ${successfulRelays.length} relay(s)`)
// Remove from unpublished map
this.unpublishedObjects.delete(`${obj.objectType}:${obj.id}`)
} else {
// All relays failed, increment retry count
obj.retryCount++
obj.lastRetryAt = Date.now()
console.warn(`[PublishWorker] All relays failed for ${obj.objectType}:${obj.id}, retry count: ${obj.retryCount}/${MAX_RETRIES_PER_OBJECT}`)
// Remove if max retries reached
if (obj.retryCount >= MAX_RETRIES_PER_OBJECT) {
this.unpublishedObjects.delete(`${obj.objectType}:${obj.id}`)
}
}
} catch (error) {
console.error(`[PublishWorker] Error publishing ${obj.objectType}:${obj.id}:`, error)
// Increment retry count on error
obj.retryCount++
obj.lastRetryAt = Date.now()
if (obj.retryCount >= MAX_RETRIES_PER_OBJECT) {
this.unpublishedObjects.delete(`${obj.objectType}:${obj.id}`)
}
}
}
}
export const publishWorker = new PublishWorkerService()