story-research-zapwall/lib/publishWorker.ts
2026-01-10 09:41:57 +01:00

260 lines
8.5 KiB
TypeScript

/**
* Worker service for republishing unpublished objects to relays
* Continuously attempts to publish objects with published === false
*/
import { objectCache, type ObjectType } from './objectCache'
import { relaySessionManager } from './relaySessionManager'
import { publishLog } from './publishLog'
import { writeService } from './writeService'
const REPUBLISH_INTERVAL_MS = 30000 // 30 seconds
const MAX_RETRIES_PER_OBJECT = 10
const RETRY_DELAY_MS = 5000 // 5 seconds between retries for same object
interface UnpublishedObject {
objectType: ObjectType
id: string
event: import('nostr-tools').Event
retryCount: number
lastRetryAt: number
}
class PublishWorkerService {
private isRunning = false
private intervalId: ReturnType<typeof setInterval> | null = null
private unpublishedObjects: Map<string, UnpublishedObject> = new Map()
private processing = false
/**
* Start the publish worker
* Can use Service Worker if available, otherwise falls back to setInterval
*/
async start(): Promise<void> {
if (this.isRunning) {
return
}
this.isRunning = true
console.warn('[PublishWorker] Starting publish worker')
// Try to use Service Worker for background processing
if (typeof window !== 'undefined') {
try {
const { swClient } = await import('./swClient')
const isReady = await swClient.isReady()
if (isReady) {
console.warn('[PublishWorker] Using Service Worker for background processing')
await swClient.startPublishWorker()
// Still process immediately in main thread
void this.processUnpublished()
return
}
} catch (error) {
console.warn('[PublishWorker] Service Worker not available, using setInterval:', error)
}
}
// Fallback to setInterval if Service Worker not available
// Start processing immediately
void this.processUnpublished()
// Then process periodically
this.intervalId = setInterval(() => {
void this.processUnpublished()
}, REPUBLISH_INTERVAL_MS)
}
/**
* Stop the publish worker
*/
async stop(): Promise<void> {
if (!this.isRunning) {
return
}
this.isRunning = false
// Stop Service Worker if active
if (typeof window !== 'undefined') {
try {
const { swClient } = await import('./swClient')
const isReady = await swClient.isReady()
if (isReady) {
await swClient.stopPublishWorker()
}
} catch {
// Ignore errors
}
}
// Stop local interval
if (this.intervalId) {
clearInterval(this.intervalId)
this.intervalId = null
}
console.warn('[PublishWorker] Stopped publish worker')
}
/**
* Process all unpublished objects
* Made public for Service Worker access
*/
async processUnpublished(): Promise<void> {
if (this.processing) {
return
}
this.processing = true
try {
const objectTypes = getAllPublishableObjectTypes()
const now = Date.now()
await this.refreshUnpublishedMap({ objectTypes, now })
await this.processQueuedObjects()
} catch (error) {
console.error('[PublishWorker] Error processing unpublished objects:', error)
} finally {
this.processing = false
}
}
private async refreshUnpublishedMap(params: { objectTypes: ObjectType[]; now: number }): Promise<void> {
for (const objectType of params.objectTypes) {
const unpublished = await objectCache.getUnpublished(objectType)
this.upsertUnpublishedObjects({ objectType, unpublished, now: params.now })
}
}
private upsertUnpublishedObjects(params: {
objectType: ObjectType
unpublished: Array<{ id: string; event: import('nostr-tools').Event }>
now: number
}): void {
for (const { id, event } of params.unpublished) {
const key = buildUnpublishedKey(params.objectType, id)
const existing = this.unpublishedObjects.get(key)
if (existing && existing.retryCount >= MAX_RETRIES_PER_OBJECT) {
console.warn(`[PublishWorker] Max retries reached for ${params.objectType}:${id}, skipping`)
}
const shouldSkip = Boolean(existing && params.now - existing.lastRetryAt < RETRY_DELAY_MS)
if (!shouldSkip && (!existing || existing.retryCount < MAX_RETRIES_PER_OBJECT)) {
this.unpublishedObjects.set(key, {
objectType: params.objectType,
id,
event,
retryCount: existing?.retryCount ?? 0,
lastRetryAt: params.now,
})
}
}
}
private async processQueuedObjects(): Promise<void> {
const objectsToProcess = Array.from(this.unpublishedObjects.entries())
for (const [key, obj] of objectsToProcess) {
await this.attemptPublish({ key, obj })
}
}
/**
* Attempt to publish an unpublished object
* Uses websocketService to route events to Service Worker
*/
private async attemptPublish(params: { key: string; obj: UnpublishedObject }): Promise<void> {
const {obj} = params
try {
const { websocketService } = await import('./websocketService')
const activeRelays = await relaySessionManager.getActiveRelays()
if (activeRelays.length === 0) {
const { getPrimaryRelaySync } = await import('./config')
const relayUrl = getPrimaryRelaySync()
activeRelays.push(relayUrl)
}
console.warn(`[PublishWorker] Attempting to publish ${obj.objectType}:${obj.id} to ${activeRelays.length} relay(s)`)
// Publish to all active relays via websocketService (routes to Service Worker)
const statuses = await websocketService.publishEvent(obj.event, activeRelays)
const successfulRelays: string[] = []
statuses.forEach((status, index) => {
const relayUrl = activeRelays[index]
if (!relayUrl) {
return
}
if (status.success) {
successfulRelays.push(relayUrl)
// Log successful publication
void publishLog.logPublication({
eventId: obj.event.id,
relayUrl,
success: true,
objectType: obj.objectType,
objectId: obj.id,
})
} else {
const errorMessage = status.error ?? 'Unknown error'
console.warn(`[PublishWorker] Relay ${relayUrl} failed for ${obj.objectType}:${obj.id}:`, errorMessage)
relaySessionManager.markRelayFailed(relayUrl)
// Log failed publication
void publishLog.logPublication({
eventId: obj.event.id,
relayUrl,
success: false,
error: errorMessage,
objectType: obj.objectType,
objectId: obj.id,
})
}
})
// Update published status via writeService
if (successfulRelays.length > 0) {
await writeService.updatePublished(obj.objectType, obj.id, successfulRelays)
console.warn(`[PublishWorker] Successfully published ${obj.objectType}:${obj.id} to ${successfulRelays.length} relay(s)`)
// Remove from unpublished map
this.unpublishedObjects.delete(params.key)
} else {
const current = this.unpublishedObjects.get(params.key)
const next = current
? { ...current, retryCount: current.retryCount + 1, lastRetryAt: Date.now() }
: { ...obj, retryCount: obj.retryCount + 1, lastRetryAt: Date.now() }
this.unpublishedObjects.set(params.key, next)
console.warn(`[PublishWorker] All relays failed for ${obj.objectType}:${obj.id}, retry count: ${next.retryCount}/${MAX_RETRIES_PER_OBJECT}`)
// Remove if max retries reached
if (next.retryCount >= MAX_RETRIES_PER_OBJECT) {
this.unpublishedObjects.delete(params.key)
}
}
} catch (error) {
console.error(`[PublishWorker] Error publishing ${obj.objectType}:${obj.id}:`, error)
const current = this.unpublishedObjects.get(params.key)
const next = current
? { ...current, retryCount: current.retryCount + 1, lastRetryAt: Date.now() }
: { ...params.obj, retryCount: params.obj.retryCount + 1, lastRetryAt: Date.now() }
this.unpublishedObjects.set(params.key, next)
if (next.retryCount >= MAX_RETRIES_PER_OBJECT) {
this.unpublishedObjects.delete(params.key)
}
}
}
}
export const publishWorker = new PublishWorkerService()
function getAllPublishableObjectTypes(): ObjectType[] {
return ['author', 'series', 'publication', 'review', 'purchase', 'sponsoring', 'review_tip', 'payment_note']
}
function buildUnpublishedKey(objectType: ObjectType, id: string): string {
return `${objectType}:${id}`
}