story-research-zapwall/lib/writeService.ts
2026-01-10 10:50:47 +01:00

430 lines
13 KiB
TypeScript

/**
* Write service - manages all write operations to IndexedDB
* Routes writes through Web Worker for IndexedDB operations
* Routes network operations through WebSocket service
*/
import type { NostrEvent } from 'nostr-tools'
import type { ObjectType } from './objectCache'
interface WriteObjectParams {
objectType: ObjectType
hash: string
event: NostrEvent
parsed: unknown
version: number
hidden: boolean
index?: number
published?: false | string[]
}
interface CreateNotificationParams {
type: string
objectType: string
objectId: string
eventId: string
data?: Record<string, unknown>
}
interface LogPublicationParams {
eventId: string
relayUrl: string
success: boolean
error?: string
objectType?: string
objectId?: string
}
interface WorkerMessageEnvelope {
type: string
data?: unknown
}
interface WorkerErrorData {
originalType: string | undefined
taskId: string | undefined
error: string | undefined
}
function isRecord(value: unknown): value is Record<string, unknown> {
return typeof value === 'object' && value !== null
}
function isWorkerMessageEnvelope(value: unknown): value is WorkerMessageEnvelope {
if (!isRecord(value)) {
return false
}
return typeof value.type === 'string'
}
function readString(value: unknown): string | undefined {
return typeof value === 'string' ? value : undefined
}
function readWorkerErrorData(value: unknown): WorkerErrorData {
if (!isRecord(value)) {
return { originalType: undefined, taskId: undefined, error: undefined }
}
return {
originalType: readString(value.originalType),
taskId: readString(value.taskId),
error: readString(value.error),
}
}
function isWorkerErrorForOperation(errorData: WorkerErrorData, operation: string): boolean {
if (errorData.originalType === operation) {
return true
}
return errorData.taskId?.startsWith(operation) === true
}
class WriteService {
private writeWorker: Worker | null = null
private initPromise: Promise<void> | null = null
/**
* Initialize the write worker
*/
private async init(): Promise<void> {
if (this.writeWorker) {
return
}
if (this.initPromise) {
return this.initPromise
}
this.initPromise = this.createWorker()
try {
await this.initPromise
} catch (error) {
this.initPromise = null
throw error
}
}
private createWorker(): Promise<void> {
return new Promise((resolve) => {
this.createWorkerOrFallback(resolve)
})
}
private createWorkerOrFallback(resolve: () => void): void {
if (!isWebWorkerAvailable()) {
console.warn('[WriteService] Web Workers not available, using direct writes')
resolve()
return
}
try {
this.writeWorker = new Worker('/writeWorker.js', { type: 'classic' })
this.registerWorkerListeners(this.writeWorker, resolve)
} catch (error) {
console.warn('[WriteService] Failed to create worker, using direct writes:', error)
resolve()
}
}
private registerWorkerListeners(worker: Worker, resolve: () => void): void {
worker.addEventListener('message', (event: MessageEvent<unknown>) => {
if (!isWorkerMessageEnvelope(event.data)) {
console.error('[WriteService] Received invalid worker message envelope', { data: event.data })
return
}
if (event.data.type === 'ERROR') {
console.error('[WriteService] Worker error:', event.data.data)
}
})
worker.addEventListener('error', (error) => {
console.error('[WriteService] Worker error:', error)
console.warn('[WriteService] Falling back to direct writes')
this.writeWorker = null
resolve()
})
const readyTimeout = setTimeout(() => {
console.warn('[WriteService] Worker ready timeout, using direct writes')
this.writeWorker?.terminate()
this.writeWorker = null
resolve()
}, 2000)
const readyHandler = (event: MessageEvent<unknown>): void => {
if (isWorkerMessageEnvelope(event.data) && event.data.type === 'WORKER_READY') {
clearTimeout(readyTimeout)
this.writeWorker?.removeEventListener('message', readyHandler)
resolve()
}
}
worker.addEventListener('message', readyHandler)
}
/**
* Write object to IndexedDB (via Web Worker)
*/
async writeObject(params: WriteObjectParams): Promise<void> {
try {
await this.init()
if (this.writeWorker) {
return this.postWriteObjectToWorker(params)
}
await this.writeObjectDirect(params)
} catch (error) {
console.error('[WriteService] Error writing object:', error)
throw error
}
}
/**
* Update published status (via Web Worker)
*/
async updatePublished(
objectType: ObjectType,
id: string,
published: false | string[]
): Promise<void> {
try {
await this.init()
if (this.writeWorker) {
return this.postUpdatePublishedToWorker({ objectType, id, published })
}
await this.updatePublishedDirect({ objectType, id, published })
} catch (error) {
console.error('[WriteService] Error updating published status:', error)
throw error
}
}
private postWriteObjectToWorker(params: WriteObjectParams): Promise<void> {
const published = params.published ?? false
return new Promise((resolve, reject) => {
const timeout = setTimeout(() => reject(new Error('Write operation timeout')), 10000)
const handler = (event: MessageEvent): void => {
if (!isWorkerMessageEnvelope(event.data)) {
return
}
if (event.data.type === 'WRITE_OBJECT_SUCCESS' && isRecord(event.data.data) && event.data.data.hash === params.hash) {
clearTimeout(timeout)
this.writeWorker?.removeEventListener('message', handler)
resolve()
return
}
if (event.data.type === 'ERROR') {
const errorData = readWorkerErrorData(event.data.data)
if (errorData.originalType !== 'WRITE_OBJECT') {
return
}
clearTimeout(timeout)
this.writeWorker?.removeEventListener('message', handler)
reject(new Error(errorData.error ?? 'Write worker error'))
}
}
if (this.writeWorker) {
this.writeWorker.addEventListener('message', handler)
this.writeWorker.postMessage({
type: 'WRITE_OBJECT',
data: {
objectType: params.objectType,
hash: params.hash,
event: params.event,
parsed: params.parsed,
version: params.version,
hidden: params.hidden,
index: params.index,
published,
},
})
}
})
}
private async writeObjectDirect(params: WriteObjectParams): Promise<void> {
const { objectCache } = await import('./objectCache')
await objectCache.set({
objectType: params.objectType,
hash: params.hash,
event: params.event,
parsed: params.parsed,
version: params.version,
hidden: params.hidden,
...(params.index !== undefined ? { index: params.index } : {}),
...(params.published !== undefined ? { published: params.published } : {}),
})
}
private postUpdatePublishedToWorker(params: {
objectType: ObjectType
id: string
published: false | string[]
}): Promise<void> {
return new Promise((resolve, reject) => {
const timeout = setTimeout(() => reject(new Error('Update published operation timeout')), 10000)
const handler = (event: MessageEvent): void => {
if (!isWorkerMessageEnvelope(event.data)) {
return
}
if (event.data.type === 'UPDATE_PUBLISHED_SUCCESS') {
if (!isRecord(event.data.data) || event.data.data.id !== params.id) {
return
}
clearTimeout(timeout)
this.writeWorker?.removeEventListener('message', handler)
resolve()
return
}
if (event.data.type !== 'ERROR') {
return
}
const errorData = readWorkerErrorData(event.data.data)
if (!isWorkerErrorForOperation(errorData, 'UPDATE_PUBLISHED')) {
return
}
clearTimeout(timeout)
this.writeWorker?.removeEventListener('message', handler)
reject(new Error(errorData.error ?? 'Write worker error'))
}
if (this.writeWorker) {
this.writeWorker.addEventListener('message', handler)
this.writeWorker.postMessage({ type: 'UPDATE_PUBLISHED', data: { ...params } })
}
})
}
private async updatePublishedDirect(params: { objectType: ObjectType; id: string; published: false | string[] }): Promise<void> {
const { objectCache } = await import('./objectCache')
await objectCache.updatePublished(params.objectType, params.id, params.published)
}
/**
* Create notification (via Web Worker)
*/
async createNotification(params: CreateNotificationParams): Promise<void> {
try {
await this.init()
if (this.writeWorker) {
// Send to worker
return new Promise((resolve, reject) => {
const timeout = setTimeout(() => {
reject(new Error('Create notification operation timeout'))
}, 10000)
const handler = (event: MessageEvent): void => {
if (!isWorkerMessageEnvelope(event.data)) {
return
}
const responseType = event.data.type
const responseData = event.data.data
if (responseType === 'CREATE_NOTIFICATION_SUCCESS') {
if (!isRecord(responseData) || responseData.eventId !== params.eventId) {
return
}
clearTimeout(timeout)
this.writeWorker?.removeEventListener('message', handler)
resolve()
return
}
if (responseType !== 'ERROR') {
return
}
const errorData = readWorkerErrorData(responseData)
if (!isWorkerErrorForOperation(errorData, 'CREATE_NOTIFICATION')) {
return
}
clearTimeout(timeout)
this.writeWorker?.removeEventListener('message', handler)
reject(new Error(errorData.error ?? 'Write worker error'))
}
if (this.writeWorker) {
this.writeWorker.addEventListener('message', handler)
this.writeWorker.postMessage({
type: 'CREATE_NOTIFICATION',
data: {
type: params.type,
objectType: params.objectType,
objectId: params.objectId,
eventId: params.eventId,
notificationData: params.data,
},
})
}
})
}
// Fallback: direct write
const { notificationService } = await import('./notificationService')
const notificationParams: Parameters<typeof notificationService.createNotification>[0] = {
type: params.type as Parameters<typeof notificationService.createNotification>[0]['type'],
objectType: params.objectType,
objectId: params.objectId,
eventId: params.eventId,
}
if (params.data !== undefined) {
notificationParams.data = params.data
}
await notificationService.createNotification(notificationParams)
} catch (error) {
console.error('[WriteService] Error creating notification:', error)
throw error
}
}
/**
* Log publication (via Web Worker)
*/
async logPublication(params: LogPublicationParams): Promise<void> {
try {
await this.init()
if (this.writeWorker) {
// Send to worker
this.writeWorker.postMessage({
type: 'LOG_PUBLICATION',
data: {
eventId: params.eventId,
relayUrl: params.relayUrl,
success: params.success,
error: params.error,
objectType: params.objectType,
objectId: params.objectId,
},
})
// Don't wait for response for logs (fire and forget)
} else {
// Fallback: direct write
const { publishLog } = await import('./publishLog')
await publishLog.logPublicationDirect(params)
}
} catch (logError) {
console.error('[WriteService] Error logging publication:', logError)
// Don't throw for logs
}
}
/**
* Terminate the worker
*/
terminate(): void {
if (this.writeWorker) {
this.writeWorker.terminate()
this.writeWorker = null
this.initPromise = null
}
}
}
function isWebWorkerAvailable(): boolean {
return typeof window !== 'undefined' && Boolean(window.Worker)
}
export const writeService = new WriteService()