226 lines
7.3 KiB
TypeScript
226 lines
7.3 KiB
TypeScript
/**
|
|
* Worker service for republishing unpublished objects to relays
|
|
* Continuously attempts to publish objects with published === false
|
|
*/
|
|
|
|
import { objectCache, type ObjectType } from './objectCache'
|
|
import { relaySessionManager } from './relaySessionManager'
|
|
import { publishLog } from './publishLog'
|
|
import { writeService } from './writeService'
|
|
|
|
const REPUBLISH_INTERVAL_MS = 30000 // 30 seconds
|
|
const MAX_RETRIES_PER_OBJECT = 10
|
|
const RETRY_DELAY_MS = 5000 // 5 seconds between retries for same object
|
|
|
|
interface UnpublishedObject {
|
|
objectType: ObjectType
|
|
id: string
|
|
event: import('nostr-tools').Event
|
|
retryCount: number
|
|
lastRetryAt: number
|
|
}
|
|
|
|
class PublishWorkerService {
|
|
private isRunning = false
|
|
private intervalId: NodeJS.Timeout | null = null
|
|
private unpublishedObjects: Map<string, UnpublishedObject> = new Map()
|
|
private processing = false
|
|
|
|
/**
|
|
* Start the publish worker
|
|
* Can use Service Worker if available, otherwise falls back to setInterval
|
|
*/
|
|
async start(): Promise<void> {
|
|
if (this.isRunning) {
|
|
return
|
|
}
|
|
|
|
this.isRunning = true
|
|
console.warn('[PublishWorker] Starting publish worker')
|
|
|
|
// Try to use Service Worker for background processing
|
|
if (typeof window !== 'undefined') {
|
|
try {
|
|
const { swClient } = await import('./swClient')
|
|
const isReady = await swClient.isReady()
|
|
if (isReady) {
|
|
console.warn('[PublishWorker] Using Service Worker for background processing')
|
|
await swClient.startPublishWorker()
|
|
// Still process immediately in main thread
|
|
void this.processUnpublished()
|
|
return
|
|
}
|
|
} catch (error) {
|
|
console.warn('[PublishWorker] Service Worker not available, using setInterval:', error)
|
|
}
|
|
}
|
|
|
|
// Fallback to setInterval if Service Worker not available
|
|
// Start processing immediately
|
|
void this.processUnpublished()
|
|
|
|
// Then process periodically
|
|
this.intervalId = setInterval(() => {
|
|
void this.processUnpublished()
|
|
}, REPUBLISH_INTERVAL_MS)
|
|
}
|
|
|
|
/**
|
|
* Stop the publish worker
|
|
*/
|
|
async stop(): Promise<void> {
|
|
if (!this.isRunning) {
|
|
return
|
|
}
|
|
|
|
this.isRunning = false
|
|
|
|
// Stop Service Worker if active
|
|
if (typeof window !== 'undefined') {
|
|
try {
|
|
const { swClient } = await import('./swClient')
|
|
const isReady = await swClient.isReady()
|
|
if (isReady) {
|
|
await swClient.stopPublishWorker()
|
|
}
|
|
} catch (_error) {
|
|
// Ignore errors
|
|
}
|
|
}
|
|
|
|
// Stop local interval
|
|
if (this.intervalId) {
|
|
clearInterval(this.intervalId)
|
|
this.intervalId = null
|
|
}
|
|
console.warn('[PublishWorker] Stopped publish worker')
|
|
}
|
|
|
|
/**
|
|
* Process all unpublished objects
|
|
* Made public for Service Worker access
|
|
*/
|
|
async processUnpublished(): Promise<void> {
|
|
if (this.processing) {
|
|
return
|
|
}
|
|
|
|
this.processing = true
|
|
|
|
try {
|
|
// Load unpublished objects from all object types
|
|
const objectTypes: ObjectType[] = ['author', 'series', 'publication', 'review', 'purchase', 'sponsoring', 'review_tip', 'payment_note']
|
|
|
|
for (const objectType of objectTypes) {
|
|
const unpublished = await objectCache.getUnpublished(objectType)
|
|
|
|
for (const { id, event } of unpublished) {
|
|
const key = `${objectType}:${id}`
|
|
const existing = this.unpublishedObjects.get(key)
|
|
|
|
// Skip if recently retried
|
|
if (existing && Date.now() - existing.lastRetryAt < RETRY_DELAY_MS) {
|
|
continue
|
|
}
|
|
|
|
// Skip if max retries reached
|
|
if (existing && existing.retryCount >= MAX_RETRIES_PER_OBJECT) {
|
|
console.warn(`[PublishWorker] Max retries reached for ${objectType}:${id}, skipping`)
|
|
continue
|
|
}
|
|
|
|
// Add or update in map
|
|
this.unpublishedObjects.set(key, {
|
|
objectType,
|
|
id,
|
|
event,
|
|
retryCount: existing?.retryCount ?? 0,
|
|
lastRetryAt: Date.now(),
|
|
})
|
|
}
|
|
}
|
|
|
|
// Process all unpublished objects
|
|
const objectsToProcess = Array.from(this.unpublishedObjects.values())
|
|
for (const obj of objectsToProcess) {
|
|
await this.attemptPublish(obj)
|
|
}
|
|
} catch (error) {
|
|
console.error('[PublishWorker] Error processing unpublished objects:', error)
|
|
} finally {
|
|
this.processing = false
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Attempt to publish an unpublished object
|
|
* Uses websocketService to route events to Service Worker
|
|
*/
|
|
private async attemptPublish(obj: UnpublishedObject): Promise<void> {
|
|
try {
|
|
const { websocketService } = await import('./websocketService')
|
|
|
|
const activeRelays = await relaySessionManager.getActiveRelays()
|
|
if (activeRelays.length === 0) {
|
|
const { getPrimaryRelaySync } = await import('./config')
|
|
const relayUrl = getPrimaryRelaySync()
|
|
activeRelays.push(relayUrl)
|
|
}
|
|
|
|
console.warn(`[PublishWorker] Attempting to publish ${obj.objectType}:${obj.id} to ${activeRelays.length} relay(s)`)
|
|
|
|
// Publish to all active relays via websocketService (routes to Service Worker)
|
|
const statuses = await websocketService.publishEvent(obj.event, activeRelays)
|
|
|
|
const successfulRelays: string[] = []
|
|
statuses.forEach((status, index) => {
|
|
const relayUrl = activeRelays[index]
|
|
if (!relayUrl) {
|
|
return
|
|
}
|
|
|
|
if (status.success) {
|
|
successfulRelays.push(relayUrl)
|
|
// Log successful publication
|
|
void publishLog.logPublication(obj.event.id, relayUrl, true, undefined, obj.objectType, obj.id)
|
|
} else {
|
|
const errorMessage = status.error ?? 'Unknown error'
|
|
console.warn(`[PublishWorker] Relay ${relayUrl} failed for ${obj.objectType}:${obj.id}:`, errorMessage)
|
|
relaySessionManager.markRelayFailed(relayUrl)
|
|
// Log failed publication
|
|
void publishLog.logPublication(obj.event.id, relayUrl, false, errorMessage, obj.objectType, obj.id)
|
|
}
|
|
})
|
|
|
|
// Update published status via writeService
|
|
if (successfulRelays.length > 0) {
|
|
await writeService.updatePublished(obj.objectType, obj.id, successfulRelays)
|
|
console.warn(`[PublishWorker] Successfully published ${obj.objectType}:${obj.id} to ${successfulRelays.length} relay(s)`)
|
|
// Remove from unpublished map
|
|
this.unpublishedObjects.delete(`${obj.objectType}:${obj.id}`)
|
|
} else {
|
|
// All relays failed, increment retry count
|
|
obj.retryCount++
|
|
obj.lastRetryAt = Date.now()
|
|
console.warn(`[PublishWorker] All relays failed for ${obj.objectType}:${obj.id}, retry count: ${obj.retryCount}/${MAX_RETRIES_PER_OBJECT}`)
|
|
|
|
// Remove if max retries reached
|
|
if (obj.retryCount >= MAX_RETRIES_PER_OBJECT) {
|
|
this.unpublishedObjects.delete(`${obj.objectType}:${obj.id}`)
|
|
}
|
|
}
|
|
} catch (error) {
|
|
console.error(`[PublishWorker] Error publishing ${obj.objectType}:${obj.id}:`, error)
|
|
// Increment retry count on error
|
|
obj.retryCount++
|
|
obj.lastRetryAt = Date.now()
|
|
|
|
if (obj.retryCount >= MAX_RETRIES_PER_OBJECT) {
|
|
this.unpublishedObjects.delete(`${obj.objectType}:${obj.id}`)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
export const publishWorker = new PublishWorkerService()
|