2026-01-13 14:49:19 +01:00

227 lines
7.9 KiB
TypeScript

import type { ObjectType } from '../objectCache'
import { isWebWorkerAvailable } from './env'
import type { CreateNotificationParams, LogPublicationParams, WriteObjectParams } from './types'
import { decideCreateNotificationWorkerMessage } from './notificationDecision'
import { createNotificationDirect, logPublicationDirect, updatePublishedDirect, writeObjectDirect } from './direct'
import { isRecord, isWorkerErrorForOperation, isWorkerMessageEnvelope, readWorkerErrorData } from './workerMessage'
import { buildCreateNotificationWorkerPayload, buildWriteObjectWorkerPayload } from './workerPayloads'
export class WriteService {
private writeWorker: Worker | null = null
private initPromise: Promise<void> | null = null
private async init(): Promise<void> {
if (this.writeWorker) {
return
}
if (this.initPromise) {
return this.initPromise
}
this.initPromise = this.createWorker()
try {
await this.initPromise
} catch (error) {
this.initPromise = null
throw error
}
}
private createWorker(): Promise<void> {
return new Promise((resolve) => this.createWorkerOrFallback(resolve))
}
private createWorkerOrFallback(resolve: () => void): void {
if (!isWebWorkerAvailable()) {
console.warn('[WriteService] Web Workers not available, using direct writes')
resolve()
return
}
try {
this.writeWorker = new Worker('/writeWorker.js', { type: 'classic' })
this.registerWorkerListeners(this.writeWorker, resolve)
} catch (error) {
console.warn('[WriteService] Failed to create worker, using direct writes:', error)
resolve()
}
}
private registerWorkerListeners(worker: Worker, resolve: () => void): void {
worker.addEventListener('message', (event: MessageEvent<unknown>) => {
if (!isWorkerMessageEnvelope(event.data)) {
console.error('[WriteService] Received invalid worker message envelope', { data: event.data })
return
}
if (event.data.type === 'ERROR') {
console.error('[WriteService] Worker error:', event.data.data)
}
})
worker.addEventListener('error', (error) => {
console.error('[WriteService] Worker error:', error)
console.warn('[WriteService] Falling back to direct writes')
this.writeWorker = null
resolve()
})
const readyTimeout = setTimeout(() => {
console.warn('[WriteService] Worker ready timeout, using direct writes')
this.writeWorker?.terminate()
this.writeWorker = null
resolve()
}, 2000)
const readyHandler = (event: MessageEvent<unknown>): void => {
if (isWorkerMessageEnvelope(event.data) && event.data.type === 'WORKER_READY') {
clearTimeout(readyTimeout)
this.writeWorker?.removeEventListener('message', readyHandler)
resolve()
}
}
worker.addEventListener('message', readyHandler)
}
async writeObject(params: WriteObjectParams): Promise<void> {
try {
await this.init()
if (this.writeWorker) {
return this.postWriteObjectToWorker(params)
}
await writeObjectDirect(params)
} catch (error) {
console.error('[WriteService] Error writing object:', error)
throw error
}
}
async updatePublished(objectType: ObjectType, id: string, published: false | string[]): Promise<void> {
try {
await this.init()
if (this.writeWorker) {
return this.postUpdatePublishedToWorker({ objectType, id, published })
}
await updatePublishedDirect({ objectType, id, published })
} catch (error) {
console.error('[WriteService] Error updating published status:', error)
throw error
}
}
async createNotification(params: CreateNotificationParams): Promise<void> {
try {
await this.init()
if (this.writeWorker) {
return this.postCreateNotificationToWorker(params)
}
await createNotificationDirect(params)
} catch (error) {
console.error('[WriteService] Error creating notification:', error)
throw error
}
}
async logPublication(params: LogPublicationParams): Promise<void> {
try {
await this.init()
if (this.writeWorker) {
this.writeWorker.postMessage({ type: 'LOG_PUBLICATION', data: { ...params } })
return
}
await logPublicationDirect(params)
} catch (logError) {
console.error('[WriteService] Error logging publication:', logError)
}
}
terminate(): void {
if (this.writeWorker) {
this.writeWorker.terminate()
this.writeWorker = null
this.initPromise = null
}
}
private postWriteObjectToWorker(params: WriteObjectParams): Promise<void> {
const published = params.published ?? false
const payload = buildWriteObjectWorkerPayload({ params, published })
return new Promise((resolve, reject) => {
const timeout = setTimeout(() => reject(new Error('Write operation timeout')), 10000)
const handler = (event: MessageEvent): void => {
if (!isWorkerMessageEnvelope(event.data)) {
return
}
if (event.data.type === 'WRITE_OBJECT_SUCCESS' && isRecord(event.data.data) && event.data.data.hash === params.hash) {
clearTimeout(timeout)
this.writeWorker?.removeEventListener('message', handler)
resolve()
return
}
if (event.data.type === 'ERROR') {
const errorData = readWorkerErrorData(event.data.data)
if (errorData.originalType !== 'WRITE_OBJECT') {
return
}
clearTimeout(timeout)
this.writeWorker?.removeEventListener('message', handler)
reject(new Error(errorData.error ?? 'Write worker error'))
}
}
this.writeWorker?.addEventListener('message', handler)
this.writeWorker?.postMessage(payload)
})
}
private postUpdatePublishedToWorker(params: { objectType: ObjectType; id: string; published: false | string[] }): Promise<void> {
return new Promise((resolve, reject) => {
const timeout = setTimeout(() => reject(new Error('Update published operation timeout')), 10000)
const handler = (event: MessageEvent): void => {
if (!isWorkerMessageEnvelope(event.data)) {
return
}
if (event.data.type === 'UPDATE_PUBLISHED_SUCCESS') {
if (!isRecord(event.data.data) || event.data.data.id !== params.id) {
return
}
clearTimeout(timeout)
this.writeWorker?.removeEventListener('message', handler)
resolve()
return
}
if (event.data.type !== 'ERROR') {
return
}
const errorData = readWorkerErrorData(event.data.data)
if (!isWorkerErrorForOperation(errorData, 'UPDATE_PUBLISHED')) {
return
}
clearTimeout(timeout)
this.writeWorker?.removeEventListener('message', handler)
reject(new Error(errorData.error ?? 'Write worker error'))
}
this.writeWorker?.addEventListener('message', handler)
this.writeWorker?.postMessage({ type: 'UPDATE_PUBLISHED', data: { ...params } })
})
}
private postCreateNotificationToWorker(params: CreateNotificationParams): Promise<void> {
return new Promise((resolve, reject) => {
const timeout = setTimeout(() => reject(new Error('Create notification operation timeout')), 10000)
const handler = (event: MessageEvent): void => {
const decision = decideCreateNotificationWorkerMessage({ eventData: event.data, expectedEventId: params.eventId })
if (decision.status === 'ignore') {
return
}
clearTimeout(timeout)
this.writeWorker?.removeEventListener('message', handler)
if (decision.status === 'resolve') {
resolve()
} else {
reject(new Error(decision.message))
}
}
this.writeWorker?.addEventListener('message', handler)
this.writeWorker?.postMessage(buildCreateNotificationWorkerPayload(params))
})
}
}