story-research-zapwall/lib/userContentSync.ts
2026-01-06 21:44:43 +01:00

816 lines
25 KiB
TypeScript

/**
* Synchronize user content (profile, series, publications) to IndexedDB cache
* Called after key import to ensure all user content is cached locally
*/
import type { Event } from 'nostr-tools'
import { nostrService } from './nostr'
import { fetchAuthorPresentationFromPool } from './articlePublisherHelpersPresentation'
import { extractTagsFromEvent } from './nostrTagSystem'
import { extractSeriesFromEvent, extractPublicationFromEvent, extractPurchaseFromEvent, extractSponsoringFromEvent, extractReviewTipFromEvent } from './metadataExtractor'
import { objectCache } from './objectCache'
import { getLatestVersion } from './versionManager'
import { buildTagFilter } from './nostrTagSystemFilter'
import { getPrimaryRelaySync } from './config'
import { tryWithRelayRotation } from './relayRotation'
import { PLATFORM_SERVICE } from './platformConfig'
import { parseObjectId } from './urlGenerator'
import type { SimplePoolWithSub } from '@/types/nostr-tools-extended'
/**
* Fetch all publications by an author and cache them
*/
async function fetchAndCachePublications(
pool: SimplePoolWithSub,
authorPubkey: string
): Promise<void> {
const { getLastSyncDate } = await import('./syncStorage')
const lastSyncDate = await getLastSyncDate()
const filters = [
{
...buildTagFilter({
type: 'publication',
authorPubkey,
service: PLATFORM_SERVICE,
}),
since: lastSyncDate,
limit: 1000, // Get all publications
},
]
// Try relays with rotation (no retry on failure, just move to next)
const { createSubscription } = require('@/types/nostr-tools-extended')
let sub: ReturnType<typeof createSubscription> | null = null
let usedRelayUrl = ''
try {
const result = await tryWithRelayRotation(
pool as unknown as import('nostr-tools').SimplePool,
async (relayUrl, poolWithSub) => {
usedRelayUrl = relayUrl
// Notify progress manager that we're starting with a new relay (reset step counter)
const { syncProgressManager } = await import('./syncProgressManager')
const currentProgress = syncProgressManager.getProgress()
if (currentProgress) {
syncProgressManager.setProgress({
...currentProgress,
currentStep: 0,
currentRelay: relayUrl,
})
}
return createSubscription(poolWithSub, [relayUrl], filters)
},
5000 // 5 second timeout per relay
)
sub = result
} catch (rotationError) {
// Fallback to primary relay if rotation fails
usedRelayUrl = getPrimaryRelaySync()
sub = createSubscription(pool, [usedRelayUrl], filters)
}
if (!sub) {
throw new Error('Failed to create subscription')
}
const events: Event[] = []
return new Promise<void>((resolve) => {
let finished = false
const done = async (): Promise<void> => {
if (finished) {
return
}
finished = true
sub.unsub()
// Group events by hash ID and cache the latest version of each
const eventsByHashId = new Map<string, Event[]>()
for (const event of events) {
const tags = extractTagsFromEvent(event)
if (tags.id) {
// Extract hash from id (can be <hash>_<index>_<version> or just hash)
const parsed = parseObjectId(tags.id)
const hash = parsed.hash ?? tags.id
if (!eventsByHashId.has(hash)) {
eventsByHashId.set(hash, [])
}
eventsByHashId.get(hash)!.push(event)
}
}
// Cache each publication
for (const [_hash, hashEvents] of eventsByHashId.entries()) {
const latestEvent = getLatestVersion(hashEvents)
if (latestEvent) {
const extracted = await extractPublicationFromEvent(latestEvent)
if (extracted) {
const publicationParsed = parseObjectId(extracted.id)
const extractedHash = publicationParsed.hash ?? extracted.id
const extractedIndex = publicationParsed.index ?? 0
const tags = extractTagsFromEvent(latestEvent)
await objectCache.set(
'publication',
extractedHash,
latestEvent,
extracted,
tags.version ?? 0,
tags.hidden ?? false,
extractedIndex
)
}
}
}
resolve()
}
sub.on('event', (event: Event): void => {
const tags = extractTagsFromEvent(event)
if (tags.type === 'publication' && !tags.hidden) {
events.push(event)
}
})
sub.on('eose', (): void => {
void done()
})
setTimeout((): void => {
void done()
}, 10000).unref?.()
})
}
/**
* Fetch all series by an author and cache them
*/
async function fetchAndCacheSeries(
pool: SimplePoolWithSub,
authorPubkey: string
): Promise<void> {
const { getLastSyncDate } = await import('./syncStorage')
const lastSyncDate = await getLastSyncDate()
// Fetch all events for series to cache them properly
const filters = [
{
...buildTagFilter({
type: 'series',
authorPubkey,
service: PLATFORM_SERVICE,
}),
since: lastSyncDate,
limit: 1000, // Get all series events
},
]
// Try relays with rotation (no retry on failure, just move to next)
const { createSubscription } = require('@/types/nostr-tools-extended')
let sub: ReturnType<typeof createSubscription> | null = null
let usedRelayUrl = ''
try {
const result = await tryWithRelayRotation(
pool as unknown as import('nostr-tools').SimplePool,
async (relayUrl, poolWithSub) => {
usedRelayUrl = relayUrl
// Notify progress manager that we're starting with a new relay (reset step counter)
const { syncProgressManager } = await import('./syncProgressManager')
const currentProgress = syncProgressManager.getProgress()
if (currentProgress) {
syncProgressManager.setProgress({
...currentProgress,
currentStep: 0,
currentRelay: relayUrl,
})
}
return createSubscription(poolWithSub, [relayUrl], filters)
},
5000 // 5 second timeout per relay
)
sub = result
} catch (rotationError) {
// Fallback to primary relay if rotation fails
usedRelayUrl = getPrimaryRelaySync()
sub = createSubscription(pool, [usedRelayUrl], filters)
}
if (!sub) {
throw new Error('Failed to create subscription')
}
const events: Event[] = []
return new Promise<void>((resolve) => {
let finished = false
const done = async (): Promise<void> => {
if (finished) {
return
}
finished = true
sub.unsub()
// Group events by hash ID and cache the latest version of each
const eventsByHashId = new Map<string, Event[]>()
for (const event of events) {
const tags = extractTagsFromEvent(event)
if (tags.id) {
// Extract hash from id (can be <hash>_<index>_<version> or just hash)
const seriesParsed = parseObjectId(tags.id)
const hash = seriesParsed.hash ?? tags.id
if (!eventsByHashId.has(hash)) {
eventsByHashId.set(hash, [])
}
eventsByHashId.get(hash)!.push(event)
}
}
// Cache each series
for (const [_hash, hashEvents] of eventsByHashId.entries()) {
const latestEvent = getLatestVersion(hashEvents)
if (latestEvent) {
const extracted = await extractSeriesFromEvent(latestEvent)
if (extracted) {
const publicationParsed = parseObjectId(extracted.id)
const extractedHash = publicationParsed.hash ?? extracted.id
const extractedIndex = publicationParsed.index ?? 0
const tags = extractTagsFromEvent(latestEvent)
await objectCache.set(
'series',
extractedHash,
latestEvent,
extracted,
tags.version ?? 0,
tags.hidden ?? false,
extractedIndex
)
}
}
}
resolve()
}
sub.on('event', (event: Event): void => {
const tags = extractTagsFromEvent(event)
if (tags.type === 'series' && !tags.hidden) {
console.log('[Sync] Received series event:', event.id)
events.push(event)
}
})
sub.on('eose', (): void => {
console.log(`[Sync] EOSE for series, received ${events.length} events`)
void done()
})
setTimeout((): void => {
if (!finished) {
console.log(`[Sync] Timeout for series, received ${events.length} events`)
}
void done()
}, 10000).unref?.()
})
}
/**
* Fetch all purchases by a payer and cache them
*/
async function fetchAndCachePurchases(
pool: SimplePoolWithSub,
payerPubkey: string
): Promise<void> {
const { getLastSyncDate } = await import('./syncStorage')
const lastSyncDate = await getLastSyncDate()
const filters = [
{
kinds: [9735], // Zap receipt
authors: [payerPubkey],
'#kind_type': ['purchase'],
since: lastSyncDate,
limit: 1000,
},
]
// Try relays with rotation (no retry on failure, just move to next)
const { createSubscription } = require('@/types/nostr-tools-extended')
let sub: ReturnType<typeof createSubscription> | null = null
let usedRelayUrl = ''
try {
const result = await tryWithRelayRotation(
pool as unknown as import('nostr-tools').SimplePool,
async (relayUrl, poolWithSub) => {
usedRelayUrl = relayUrl
// Notify progress manager that we're starting with a new relay (reset step counter)
const { syncProgressManager } = await import('./syncProgressManager')
const currentProgress = syncProgressManager.getProgress()
if (currentProgress) {
syncProgressManager.setProgress({
...currentProgress,
currentStep: 0,
currentRelay: relayUrl,
})
}
return createSubscription(poolWithSub, [relayUrl], filters)
},
5000 // 5 second timeout per relay
)
sub = result
} catch (rotationError) {
// Fallback to primary relay if rotation fails
usedRelayUrl = getPrimaryRelaySync()
sub = createSubscription(pool, [usedRelayUrl], filters)
}
if (!sub) {
throw new Error('Failed to create subscription')
}
const events: Event[] = []
return new Promise<void>((resolve) => {
let finished = false
const done = async (): Promise<void> => {
if (finished) {
return
}
finished = true
sub.unsub()
for (const event of events) {
const extracted = await extractPurchaseFromEvent(event)
if (extracted) {
// Parse to Purchase object for cache
const { parsePurchaseFromEvent } = await import('./nostrEventParsing')
const purchase = await parsePurchaseFromEvent(event)
if (purchase) {
await objectCache.set('purchase', purchase.hash, event, purchase, 0, false, purchase.index)
}
}
}
resolve()
}
sub.on('event', (event: Event): void => {
console.log('[Sync] Received purchase event:', event.id)
events.push(event)
})
sub.on('eose', (): void => {
console.log(`[Sync] EOSE for purchases, received ${events.length} events`)
void done()
})
setTimeout((): void => {
if (!finished) {
console.log(`[Sync] Timeout for purchases, received ${events.length} events`)
}
void done()
}, 10000).unref?.()
})
}
/**
* Fetch all sponsoring by an author and cache them
*/
async function fetchAndCacheSponsoring(
pool: SimplePoolWithSub,
authorPubkey: string
): Promise<void> {
const { getLastSyncDate } = await import('./syncStorage')
const lastSyncDate = await getLastSyncDate()
const filters = [
{
kinds: [9735], // Zap receipt
'#p': [authorPubkey],
'#kind_type': ['sponsoring'],
since: lastSyncDate,
limit: 1000,
},
]
// Try relays with rotation (no retry on failure, just move to next)
const { createSubscription } = require('@/types/nostr-tools-extended')
let sub: ReturnType<typeof createSubscription> | null = null
let usedRelayUrl = ''
try {
const result = await tryWithRelayRotation(
pool as unknown as import('nostr-tools').SimplePool,
async (relayUrl, poolWithSub) => {
usedRelayUrl = relayUrl
// Notify progress manager that we're starting with a new relay (reset step counter)
const { syncProgressManager } = await import('./syncProgressManager')
const currentProgress = syncProgressManager.getProgress()
if (currentProgress) {
syncProgressManager.setProgress({
...currentProgress,
currentStep: 0,
currentRelay: relayUrl,
})
}
return createSubscription(poolWithSub, [relayUrl], filters)
},
5000 // 5 second timeout per relay
)
sub = result
} catch (rotationError) {
// Fallback to primary relay if rotation fails
usedRelayUrl = getPrimaryRelaySync()
sub = createSubscription(pool, [usedRelayUrl], filters)
}
if (!sub) {
throw new Error('Failed to create subscription')
}
const events: Event[] = []
return new Promise<void>((resolve) => {
let finished = false
const done = async (): Promise<void> => {
if (finished) {
return
}
finished = true
sub.unsub()
for (const event of events) {
const extracted = await extractSponsoringFromEvent(event)
if (extracted) {
// Parse to Sponsoring object for cache
const { parseSponsoringFromEvent } = await import('./nostrEventParsing')
const sponsoring = await parseSponsoringFromEvent(event)
if (sponsoring) {
await objectCache.set('sponsoring', sponsoring.hash, event, sponsoring, 0, false, sponsoring.index)
}
}
}
resolve()
}
sub.on('event', (event: Event): void => {
console.log('[Sync] Received sponsoring event:', event.id)
events.push(event)
})
sub.on('eose', (): void => {
console.log(`[Sync] EOSE for sponsoring, received ${events.length} events`)
void done()
})
setTimeout((): void => {
if (!finished) {
console.log(`[Sync] Timeout for sponsoring, received ${events.length} events`)
}
void done()
}, 10000).unref?.()
})
}
/**
* Fetch all review tips by an author and cache them
*/
async function fetchAndCacheReviewTips(
pool: SimplePoolWithSub,
authorPubkey: string
): Promise<void> {
const { getLastSyncDate } = await import('./syncStorage')
const lastSyncDate = await getLastSyncDate()
const filters = [
{
kinds: [9735], // Zap receipt
'#p': [authorPubkey],
'#kind_type': ['review_tip'],
since: lastSyncDate,
limit: 1000,
},
]
// Try relays with rotation (no retry on failure, just move to next)
const { createSubscription } = require('@/types/nostr-tools-extended')
let sub: ReturnType<typeof createSubscription> | null = null
let usedRelayUrl = ''
try {
const result = await tryWithRelayRotation(
pool as unknown as import('nostr-tools').SimplePool,
async (relayUrl, poolWithSub) => {
usedRelayUrl = relayUrl
// Notify progress manager that we're starting with a new relay (reset step counter)
const { syncProgressManager } = await import('./syncProgressManager')
const currentProgress = syncProgressManager.getProgress()
if (currentProgress) {
syncProgressManager.setProgress({
...currentProgress,
currentStep: 0,
currentRelay: relayUrl,
})
}
return createSubscription(poolWithSub, [relayUrl], filters)
},
5000 // 5 second timeout per relay
)
sub = result
} catch (rotationError) {
// Fallback to primary relay if rotation fails
usedRelayUrl = getPrimaryRelaySync()
sub = createSubscription(pool, [usedRelayUrl], filters)
}
if (!sub) {
throw new Error('Failed to create subscription')
}
const events: Event[] = []
return new Promise<void>((resolve) => {
let finished = false
const done = async (): Promise<void> => {
if (finished) {
return
}
finished = true
sub.unsub()
for (const event of events) {
const extracted = await extractReviewTipFromEvent(event)
if (extracted) {
// Parse to ReviewTip object for cache
const { parseReviewTipFromEvent } = await import('./nostrEventParsing')
const reviewTip = await parseReviewTipFromEvent(event)
if (reviewTip) {
await objectCache.set('review_tip', reviewTip.hash, event, reviewTip, 0, false, reviewTip.index)
}
}
}
resolve()
}
sub.on('event', (event: Event): void => {
console.log('[Sync] Received review tip event:', event.id)
events.push(event)
})
sub.on('eose', (): void => {
console.log(`[Sync] EOSE for review tips, received ${events.length} events`)
void done()
})
setTimeout((): void => {
if (!finished) {
console.log(`[Sync] Timeout for review tips, received ${events.length} events`)
}
void done()
}, 10000).unref?.()
})
}
export interface SyncProgress {
currentStep: number
totalSteps: number
completed: boolean
currentRelay?: string // URL of the relay currently being used
}
/**
* Fetch all payment notes (kind 1 with type='payment') by a user and cache them
*/
async function fetchAndCachePaymentNotes(
pool: SimplePoolWithSub,
userPubkey: string
): Promise<void> {
const { getLastSyncDate } = await import('./syncStorage')
const lastSyncDate = await getLastSyncDate()
// Payment notes are kind 1 with type='payment'
// They can be: as payer (authors) or as recipient (#recipient tag)
const filters = [
{
kinds: [1],
authors: [userPubkey],
'#payment': [''],
'#service': [PLATFORM_SERVICE],
since: lastSyncDate,
limit: 1000,
},
{
kinds: [1],
'#recipient': [userPubkey],
'#payment': [''],
'#service': [PLATFORM_SERVICE],
since: lastSyncDate,
limit: 1000,
},
]
// Try relays with rotation (no retry on failure, just move to next)
const { createSubscription } = require('@/types/nostr-tools-extended')
let subscriptions: Array<ReturnType<typeof createSubscription>> = []
let usedRelayUrl = ''
try {
const result = await tryWithRelayRotation(
pool as unknown as import('nostr-tools').SimplePool,
async (relayUrl, poolWithSub) => {
usedRelayUrl = relayUrl
// Notify progress manager that we're starting with a new relay (reset step counter)
const { syncProgressManager } = await import('./syncProgressManager')
const currentProgress = syncProgressManager.getProgress()
if (currentProgress) {
syncProgressManager.setProgress({
...currentProgress,
currentStep: 0,
currentRelay: relayUrl,
})
}
// Create subscriptions for both filters (payer and recipient)
return filters.map((filter) => createSubscription(poolWithSub, [relayUrl], [filter]))
},
5000 // 5 second timeout per relay
)
subscriptions = result.flat()
} catch (rotationError) {
// Fallback to primary relay if rotation fails
usedRelayUrl = getPrimaryRelaySync()
subscriptions = filters.map((filter) => createSubscription(pool, [usedRelayUrl], [filter]))
}
if (subscriptions.length === 0) {
throw new Error('Failed to create subscriptions')
}
const events: Event[] = []
return new Promise<void>((resolve) => {
let finished = false
let eoseCount = 0
const done = async (): Promise<void> => {
if (finished) {
return
}
finished = true
subscriptions.forEach((sub) => sub.unsub())
for (const event of events) {
const tags = extractTagsFromEvent(event)
if (tags.type === 'payment' && tags.payment) {
// Cache the payment note event
// Use event.id as hash since payment notes don't have a separate hash system
await objectCache.set('payment_note', event.id, event, {
id: event.id,
type: 'payment_note',
eventId: event.id,
}, 0, false, 0)
}
}
resolve()
}
subscriptions.forEach((sub) => {
sub.on('event', (event: Event): void => {
const tags = extractTagsFromEvent(event)
if (tags.type === 'payment' && tags.payment) {
console.log('[Sync] Received payment note event:', event.id)
// Deduplicate events (same event might match both filters)
if (!events.some((e) => e.id === event.id)) {
events.push(event)
}
}
})
sub.on('eose', (): void => {
eoseCount++
if (eoseCount >= subscriptions.length) {
console.log(`[Sync] EOSE for payment notes, received ${events.length} events`)
void done()
}
})
})
setTimeout((): void => {
if (!finished) {
console.log(`[Sync] Timeout for payment notes, received ${events.length} events`)
}
void done()
}, 10000).unref?.()
})
}
/**
* Synchronize all user content to IndexedDB cache
* Fetches profile, series, publications, purchases, sponsoring, review tips, and payment notes and caches them
* @param userPubkey - The user's public key
* @param onProgress - Optional callback to report progress (currentStep, totalSteps, completed)
*/
export async function syncUserContentToCache(
userPubkey: string,
onProgress?: (progress: SyncProgress) => void
): Promise<void> {
try {
const pool = nostrService.getPool()
if (!pool) {
const errorMsg = 'Pool not initialized, cannot sync user content'
console.warn(errorMsg)
throw new Error(errorMsg)
}
const poolWithSub = pool as unknown as SimplePoolWithSub
// Get current timestamp for last sync date
const { setLastSyncDate, getCurrentTimestamp } = await import('./syncStorage')
const currentTimestamp = getCurrentTimestamp()
const TOTAL_STEPS = 7
// Report initial progress
const { relaySessionManager } = await import('./relaySessionManager')
const { syncProgressManager } = await import('./syncProgressManager')
const activeRelays = await relaySessionManager.getActiveRelays()
const initialRelay = activeRelays[0] ?? 'Connecting...'
if (onProgress) {
onProgress({ currentStep: 0, totalSteps: TOTAL_STEPS, completed: false, currentRelay: initialRelay })
}
syncProgressManager.setProgress({ currentStep: 0, totalSteps: TOTAL_STEPS, completed: false, currentRelay: initialRelay })
let currentStep = 0
// Helper function to update progress with current relay
const updateProgress = (step: number, completed: boolean = false): void => {
const currentRelay = syncProgressManager.getProgress()?.currentRelay ?? initialRelay
const progressUpdate = { currentStep: step, totalSteps: TOTAL_STEPS, completed, currentRelay }
if (onProgress) {
onProgress(progressUpdate)
}
syncProgressManager.setProgress(progressUpdate)
}
// Fetch and cache author profile (already caches itself)
console.log('[Sync] Step 1/7: Fetching author profile...')
await fetchAuthorPresentationFromPool(poolWithSub, userPubkey)
console.log('[Sync] Step 1/7: Author profile fetch completed')
currentStep++
updateProgress(currentStep)
// Fetch and cache all series
console.log('[Sync] Step 2/7: Fetching series...')
await fetchAndCacheSeries(poolWithSub, userPubkey)
currentStep++
updateProgress(currentStep)
// Fetch and cache all publications
console.log('[Sync] Step 3/7: Fetching publications...')
await fetchAndCachePublications(poolWithSub, userPubkey)
currentStep++
updateProgress(currentStep)
// Fetch and cache all purchases (as payer)
console.log('[Sync] Step 4/7: Fetching purchases...')
await fetchAndCachePurchases(poolWithSub, userPubkey)
currentStep++
updateProgress(currentStep)
// Fetch and cache all sponsoring (as author)
console.log('[Sync] Step 5/7: Fetching sponsoring...')
await fetchAndCacheSponsoring(poolWithSub, userPubkey)
currentStep++
updateProgress(currentStep)
// Fetch and cache all review tips (as author)
console.log('[Sync] Step 6/7: Fetching review tips...')
await fetchAndCacheReviewTips(poolWithSub, userPubkey)
currentStep++
updateProgress(currentStep)
// Fetch and cache all payment notes (kind 1 with type='payment')
console.log('[Sync] Step 7/7: Fetching payment notes...')
await fetchAndCachePaymentNotes(poolWithSub, userPubkey)
currentStep++
updateProgress(currentStep, true)
// Store the current timestamp as last sync date
await setLastSyncDate(currentTimestamp)
console.log('[Sync] Synchronization completed successfully')
} catch (syncError) {
console.error('Error syncing user content to cache:', syncError)
throw syncError // Re-throw to allow UI to handle it
}
}