237 lines
8.2 KiB
TypeScript
237 lines
8.2 KiB
TypeScript
/**
|
|
* Worker service for republishing unpublished objects to relays
|
|
* Continuously attempts to publish objects with published === false
|
|
*/
|
|
|
|
import { objectCache, type ObjectType } from './objectCache'
|
|
import { relaySessionManager } from './relaySessionManager'
|
|
import { publishLog } from './publishLog'
|
|
import { writeService } from './writeService'
|
|
import { getPublishRelays } from './relaySelection'
|
|
|
|
const REPUBLISH_INTERVAL_MS = 30000 // 30 seconds
|
|
const MAX_RETRIES_PER_OBJECT = 10
|
|
const RETRY_DELAY_MS = 5000 // 5 seconds between retries for same object
|
|
|
|
interface UnpublishedObject {
|
|
objectType: ObjectType
|
|
id: string
|
|
event: import('nostr-tools').Event
|
|
retryCount: number
|
|
lastRetryAt: number
|
|
}
|
|
|
|
class PublishWorkerService {
|
|
private isRunning = false
|
|
private intervalId: ReturnType<typeof setInterval> | null = null
|
|
private unpublishedObjects: Map<string, UnpublishedObject> = new Map()
|
|
private processing = false
|
|
|
|
/**
|
|
* Start the publish worker
|
|
* Can use Service Worker if available, otherwise falls back to setInterval
|
|
*/
|
|
async start(): Promise<void> {
|
|
if (this.isRunning) {
|
|
return
|
|
}
|
|
|
|
this.isRunning = true
|
|
console.warn('[PublishWorker] Starting publish worker')
|
|
|
|
// Try to use Service Worker for background processing
|
|
if (typeof window !== 'undefined') {
|
|
try {
|
|
const { swClient } = await import('./swClient')
|
|
const isReady = await swClient.isReady()
|
|
if (isReady) {
|
|
console.warn('[PublishWorker] Using Service Worker for background processing')
|
|
await swClient.startPublishWorker()
|
|
// Still process immediately in main thread
|
|
void this.processUnpublished()
|
|
return
|
|
}
|
|
} catch (error) {
|
|
console.warn('[PublishWorker] Service Worker not available, using setInterval:', error)
|
|
}
|
|
}
|
|
|
|
// Fallback to setInterval if Service Worker not available
|
|
// Start processing immediately
|
|
void this.processUnpublished()
|
|
|
|
// Then process periodically
|
|
this.intervalId = setInterval(() => {
|
|
void this.processUnpublished()
|
|
}, REPUBLISH_INTERVAL_MS)
|
|
}
|
|
|
|
/**
|
|
* Stop the publish worker
|
|
*/
|
|
async stop(): Promise<void> {
|
|
if (!this.isRunning) {
|
|
return
|
|
}
|
|
|
|
this.isRunning = false
|
|
|
|
// Stop Service Worker if active
|
|
if (typeof window !== 'undefined') {
|
|
try {
|
|
const { swClient } = await import('./swClient')
|
|
const isReady = await swClient.isReady()
|
|
if (isReady) {
|
|
await swClient.stopPublishWorker()
|
|
}
|
|
} catch {
|
|
// Ignore errors
|
|
}
|
|
}
|
|
|
|
// Stop local interval
|
|
if (this.intervalId) {
|
|
clearInterval(this.intervalId)
|
|
this.intervalId = null
|
|
}
|
|
console.warn('[PublishWorker] Stopped publish worker')
|
|
}
|
|
|
|
/**
|
|
* Process all unpublished objects
|
|
* Made public for Service Worker access
|
|
*/
|
|
async processUnpublished(): Promise<void> {
|
|
if (this.processing) {
|
|
return
|
|
}
|
|
|
|
this.processing = true
|
|
|
|
try {
|
|
const objectTypes = getAllPublishableObjectTypes()
|
|
const now = Date.now()
|
|
await this.refreshUnpublishedMap({ objectTypes, now })
|
|
await this.processQueuedObjects()
|
|
} catch (error) {
|
|
console.error('[PublishWorker] Error processing unpublished objects:', error)
|
|
} finally {
|
|
this.processing = false
|
|
}
|
|
}
|
|
|
|
private async refreshUnpublishedMap(params: { objectTypes: ObjectType[]; now: number }): Promise<void> {
|
|
for (const objectType of params.objectTypes) {
|
|
const unpublished = await objectCache.getUnpublished(objectType)
|
|
this.upsertUnpublishedObjects({ objectType, unpublished, now: params.now })
|
|
}
|
|
}
|
|
|
|
private upsertUnpublishedObjects(params: {
|
|
objectType: ObjectType
|
|
unpublished: Array<{ id: string; event: import('nostr-tools').Event }>
|
|
now: number
|
|
}): void {
|
|
for (const { id, event } of params.unpublished) {
|
|
const key = buildUnpublishedKey(params.objectType, id)
|
|
const existing = this.unpublishedObjects.get(key)
|
|
|
|
if (existing && existing.retryCount >= MAX_RETRIES_PER_OBJECT) {
|
|
console.warn(`[PublishWorker] Max retries reached for ${params.objectType}:${id}, skipping`)
|
|
}
|
|
|
|
const shouldSkip = Boolean(existing && params.now - existing.lastRetryAt < RETRY_DELAY_MS)
|
|
if (!shouldSkip && (!existing || existing.retryCount < MAX_RETRIES_PER_OBJECT)) {
|
|
this.unpublishedObjects.set(key, {
|
|
objectType: params.objectType,
|
|
id,
|
|
event,
|
|
retryCount: existing?.retryCount ?? 0,
|
|
lastRetryAt: params.now,
|
|
})
|
|
}
|
|
}
|
|
}
|
|
|
|
private async processQueuedObjects(): Promise<void> {
|
|
const objectsToProcess = Array.from(this.unpublishedObjects.entries())
|
|
for (const [key, obj] of objectsToProcess) {
|
|
await this.attemptPublish({ key, obj })
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Attempt to publish an unpublished object
|
|
* Uses websocketService to route events to Service Worker
|
|
*/
|
|
private async attemptPublish(params: { key: string; obj: UnpublishedObject }): Promise<void> {
|
|
const { obj } = params
|
|
try {
|
|
const { websocketService } = await import('./websocketService')
|
|
const relays = await getPublishRelays()
|
|
console.warn(`[PublishWorker] Attempting to publish ${obj.objectType}:${obj.id} to ${relays.length} relay(s)`)
|
|
const statuses = await websocketService.publishEvent(obj.event, relays)
|
|
const successfulRelays = this.processPublishStatuses({ obj, relays, statuses })
|
|
await this.finalizePublishAttempt({ key: params.key, obj, successfulRelays })
|
|
} catch (error) {
|
|
console.error(`[PublishWorker] Error publishing ${obj.objectType}:${obj.id}:`, error)
|
|
this.incrementRetryOrRemove({ key: params.key, fallbackObj: params.obj })
|
|
}
|
|
}
|
|
|
|
private processPublishStatuses(params: {
|
|
obj: UnpublishedObject
|
|
relays: string[]
|
|
statuses: { success: boolean; error?: string }[]
|
|
}): string[] {
|
|
const successfulRelays: string[] = []
|
|
params.statuses.forEach((status, index) => {
|
|
const relayUrl = params.relays[index]
|
|
if (!relayUrl) {
|
|
return
|
|
}
|
|
if (status.success) {
|
|
successfulRelays.push(relayUrl)
|
|
void publishLog.logPublication({ eventId: params.obj.event.id, relayUrl, success: true, objectType: params.obj.objectType, objectId: params.obj.id })
|
|
return
|
|
}
|
|
const errorMessage = status.error ?? 'Unknown error'
|
|
console.warn(`[PublishWorker] Relay ${relayUrl} failed for ${params.obj.objectType}:${params.obj.id}:`, errorMessage)
|
|
relaySessionManager.markRelayFailed(relayUrl)
|
|
void publishLog.logPublication({ eventId: params.obj.event.id, relayUrl, success: false, error: errorMessage, objectType: params.obj.objectType, objectId: params.obj.id })
|
|
})
|
|
return successfulRelays
|
|
}
|
|
|
|
private async finalizePublishAttempt(params: { key: string; obj: UnpublishedObject; successfulRelays: string[] }): Promise<void> {
|
|
if (params.successfulRelays.length > 0) {
|
|
await writeService.updatePublished(params.obj.objectType, params.obj.id, params.successfulRelays)
|
|
console.warn(`[PublishWorker] Successfully published ${params.obj.objectType}:${params.obj.id} to ${params.successfulRelays.length} relay(s)`)
|
|
this.unpublishedObjects.delete(params.key)
|
|
return
|
|
}
|
|
this.incrementRetryOrRemove({ key: params.key, fallbackObj: params.obj })
|
|
}
|
|
|
|
private incrementRetryOrRemove(params: { key: string; fallbackObj: UnpublishedObject }): void {
|
|
const current = this.unpublishedObjects.get(params.key)
|
|
const base = current ?? params.fallbackObj
|
|
const next = { ...base, retryCount: base.retryCount + 1, lastRetryAt: Date.now() }
|
|
this.unpublishedObjects.set(params.key, next)
|
|
console.warn(`[PublishWorker] All relays failed for ${next.objectType}:${next.id}, retry count: ${next.retryCount}/${MAX_RETRIES_PER_OBJECT}`)
|
|
if (next.retryCount >= MAX_RETRIES_PER_OBJECT) {
|
|
this.unpublishedObjects.delete(params.key)
|
|
}
|
|
}
|
|
}
|
|
|
|
export const publishWorker = new PublishWorkerService()
|
|
|
|
function getAllPublishableObjectTypes(): ObjectType[] {
|
|
return ['author', 'series', 'publication', 'review', 'purchase', 'sponsoring', 'review_tip', 'payment_note']
|
|
}
|
|
|
|
function buildUnpublishedKey(objectType: ObjectType, id: string): string {
|
|
return `${objectType}:${id}`
|
|
}
|