Add background-sync service

This commit is contained in:
Sosthene 2025-09-08 15:49:03 +02:00
parent 1408b2ddf3
commit 8e61990f79
4 changed files with 377 additions and 0 deletions

View File

@ -4,6 +4,20 @@ Toutes les modifications notables de ce projet seront documentées ici.
## [Unreleased]
### Added
- Service de Background Sync pour la détection automatique des données manquantes
- Surveillance périodique des `pcd_commitment` (toutes les 30 secondes)
- API WebSocket pour contrôler le background sync (`FORCE_DATA_SCAN`, `GET_BACKGROUND_SYNC_STATUS`)
- Gestion automatique des entrées `diff` pour le tracking des données manquantes
- Récupération automatique des données manquantes auprès des pairs
- Scripts de test et validation du service de background sync
- Documentation complète dans `docs/BACKGROUND_SYNC.md`
### Changed
- Intégration du background sync dans le service principal
- Démarrage automatique du background sync avec le serveur
- Arrêt propre du background sync lors de l'arrêt du serveur
## [0.1.1] - 2025-08-26
- Bump version package.json à 0.1.1
- Documentation déploiement mise à jour (exemples tag)

View File

@ -0,0 +1,284 @@
// Service de surveillance en arrière-plan pour Node.js
// Équivalent du service worker pour la détection des données manquantes
import { Service } from './service';
import Database from './database.service';
import { config } from './config';
export class BackgroundSyncService {
private static instance: BackgroundSyncService;
private service: Service;
private db: Database | null = null;
private scanInterval: NodeJS.Timeout | null = null;
private isRunning: boolean = false;
private readonly SCAN_INTERVAL_MS = 30000; // 30 secondes
private readonly EMPTY32BYTES = '0'.repeat(64);
private constructor() {
this.service = Service.getInstance();
console.log('🔧 BackgroundSyncService initialized');
}
static async getInstance(): Promise<BackgroundSyncService> {
if (!BackgroundSyncService.instance) {
BackgroundSyncService.instance = new BackgroundSyncService();
await BackgroundSyncService.instance.init();
}
return BackgroundSyncService.instance;
}
private async init(): Promise<void> {
this.db = await Database.getInstance();
}
/**
* Démarre le service de surveillance en arrière-plan
*/
public async start(): Promise<void> {
if (this.isRunning) {
console.log('⚠️ BackgroundSyncService already running');
return;
}
console.log('🚀 Starting background sync service...');
this.isRunning = true;
// Scan immédiat au démarrage
await this.scanMissingData();
// Puis scan périodique
this.scanInterval = setInterval(async () => {
try {
await this.scanMissingData();
} catch (error) {
console.error('❌ Error in background scan:', error);
}
}, this.SCAN_INTERVAL_MS);
console.log('✅ Background sync service started');
}
/**
* Arrête le service de surveillance
*/
public stop(): void {
if (!this.isRunning) {
return;
}
console.log('🛑 Stopping background sync service...');
this.isRunning = false;
if (this.scanInterval) {
clearInterval(this.scanInterval);
this.scanInterval = null;
}
console.log('✅ Background sync service stopped');
}
/**
* Scan manuel des données manquantes (équivalent à scanMissingData du service worker)
*/
public async scanMissingData(): Promise<string[]> {
console.log('🔍 Scanning for missing data...');
try {
const myProcesses = await this.getMyProcesses();
if (!myProcesses || myProcesses.length === 0) {
console.log('No processes to scan');
return [];
}
const toDownload = new Set<string>();
for (const processId of myProcesses) {
const process = await this.service.getProcess(processId);
if (!process) continue;
for (const state of process.states) {
if (state.state_id === this.EMPTY32BYTES) continue;
// Vérifier chaque pcd_commitment
for (const [field, hash] of Object.entries(state.pcd_commitment)) {
// Ignorer les champs publics
if (state.public_data[field] !== undefined || field === 'roles') continue;
// Vérifier que hash est une string
if (typeof hash !== 'string') continue;
// Vérifier si on a déjà les données
const existingData = await this.getBlob(hash);
if (!existingData) {
toDownload.add(hash);
// Ajouter une entrée diff si elle n'existe pas
await this.addDiff(processId, state.state_id, hash, state.roles, field);
}
}
}
}
const missingHashes = Array.from(toDownload);
if (missingHashes.length > 0) {
console.log(`📥 Found ${missingHashes.length} missing data hashes:`, missingHashes);
await this.requestMissingData(missingHashes);
} else {
console.log('✅ No missing data found');
}
return missingHashes;
} catch (error) {
console.error('❌ Error scanning missing data:', error);
throw error;
}
}
/**
* Récupère les processus de l'utilisateur
*/
private async getMyProcesses(): Promise<string[]> {
try {
return await this.service.getMyProcesses() || [];
} catch (error) {
console.error('Error getting my processes:', error);
return [];
}
}
/**
* Vérifie si un blob existe en base
*/
private async getBlob(hash: string): Promise<Buffer | null> {
try {
return await this.service.getBufferFromDb(hash);
} catch (error) {
return null;
}
}
/**
* Ajoute une entrée diff pour tracking
*/
private async addDiff(
processId: string,
stateId: string,
hash: string,
roles: any,
field: string
): Promise<void> {
try {
if (!this.db) {
console.error('Database not initialized');
return;
}
const existingDiff = await this.db.getObject('diffs', hash);
if (!existingDiff) {
const newDiff = {
process_id: processId,
state_id: stateId,
value_commitment: hash,
roles: roles,
field: field,
description: null,
previous_value: null,
new_value: null,
notify_user: false,
need_validation: false,
validation_status: 'None'
};
await this.db.addObject({
storeName: 'diffs',
object: newDiff,
key: hash
});
console.log(`📝 Added diff entry for hash: ${hash}`);
}
} catch (error) {
console.error('Error adding diff:', error);
}
}
/**
* Demande les données manquantes aux pairs
*/
private async requestMissingData(hashes: string[]): Promise<void> {
try {
console.log('🔄 Requesting missing data from peers...');
// Récupérer tous les processus pour déterminer les rôles
const myProcesses = await this.getMyProcesses();
const processesToRequest: Record<string, any> = {};
for (const processId of myProcesses) {
const process = await this.service.getProcess(processId);
if (process) {
processesToRequest[processId] = process;
}
}
// Pour chaque hash manquant, essayer de le récupérer
for (const hash of hashes) {
try {
// Trouver le diff correspondant
const diffs = await this.service.getDiffsFromDb();
const diff = Object.values(diffs).find((d: any) => d.value_commitment === hash);
if (diff) {
const processId = diff.process_id;
const stateId = diff.state_id;
const roles = diff.roles;
if (processesToRequest[processId]) {
console.log(`🔄 Requesting data for hash ${hash} from process ${processId}`);
await this.service.requestDataFromPeers(processId, [stateId], [roles]);
}
}
} catch (error) {
console.error(`Error requesting data for hash ${hash}:`, error);
}
}
} catch (error) {
console.error('Error requesting missing data:', error);
}
}
/**
* Force un scan immédiat (pour tests ou usage manuel)
*/
public async forceScan(): Promise<string[]> {
console.log('🔄 Forcing immediate scan...');
return await this.scanMissingData();
}
/**
* Obtient le statut du service
*/
public getStatus(): { isRunning: boolean; scanInterval: number } {
return {
isRunning: this.isRunning,
scanInterval: this.SCAN_INTERVAL_MS
};
}
/**
* Met à jour l'intervalle de scan
*/
public setScanInterval(intervalMs: number): void {
if (this.isRunning && this.scanInterval) {
clearInterval(this.scanInterval);
this.scanInterval = setInterval(async () => {
try {
await this.scanMissingData();
} catch (error) {
console.error('❌ Error in background scan:', error);
}
}, intervalMs);
}
}
}

View File

@ -37,6 +37,11 @@ export enum MessageType {
// Account management
ADD_DEVICE = 'ADD_DEVICE',
DEVICE_ADDED = 'DEVICE_ADDED',
// Background sync
FORCE_DATA_SCAN = 'FORCE_DATA_SCAN',
DATA_SCAN_RESULT = 'DATA_SCAN_RESULT',
GET_BACKGROUND_SYNC_STATUS = 'GET_BACKGROUND_SYNC_STATUS',
BACKGROUND_SYNC_STATUS = 'BACKGROUND_SYNC_STATUS',
}
// Re-export AnkFlag from WASM for relay message typing

View File

@ -321,6 +321,57 @@ class SimpleProcessHandlers {
}
}
async handleForceDataScan(event: ServerMessageEvent): Promise<ServerResponse> {
if (event.data.type !== MessageType.FORCE_DATA_SCAN) {
throw new Error('Invalid message type');
}
try {
const { apiKey } = event.data;
if (!apiKey || !this.validateApiKey(apiKey)) {
throw new Error('Invalid API key');
}
const missingHashes = await this.service.forceDataScan();
return {
type: MessageType.DATA_SCAN_RESULT,
missingHashes,
count: missingHashes.length,
messageId: event.data.messageId
};
} catch (e) {
const errorMessage = e instanceof Error ? e.message : String(e || 'Unknown error');
throw new Error(errorMessage);
}
}
async handleGetBackgroundSyncStatus(event: ServerMessageEvent): Promise<ServerResponse> {
if (event.data.type !== MessageType.GET_BACKGROUND_SYNC_STATUS) {
throw new Error('Invalid message type');
}
try {
const { apiKey } = event.data;
if (!apiKey || !this.validateApiKey(apiKey)) {
throw new Error('Invalid API key');
}
const status = this.service.getBackgroundSyncStatus();
return {
type: MessageType.BACKGROUND_SYNC_STATUS,
status,
messageId: event.data.messageId
};
} catch (e) {
const errorMessage = e instanceof Error ? e.message : String(e || 'Unknown error');
throw new Error(errorMessage);
}
}
async handleMessage(event: ServerMessageEvent): Promise<ServerResponse> {
try {
switch (event.data.type) {
@ -336,6 +387,10 @@ class SimpleProcessHandlers {
return await this.handleGetMyProcesses(event);
case MessageType.GET_PAIRING_ID:
return await this.handleGetPairingId(event);
case MessageType.FORCE_DATA_SCAN:
return await this.handleForceDataScan(event);
case MessageType.GET_BACKGROUND_SYNC_STATUS:
return await this.handleGetBackgroundSyncStatus(event);
default:
throw new Error(`Unhandled message type: ${event.data.type}`);
}
@ -433,10 +488,20 @@ export class Server {
// Connect to relays
await service.connectToRelaysAndWaitForHandshake();
// Start background sync service for missing data detection
try {
await service.startBackgroundSync();
console.log('🔄 Background sync service started');
} catch (error) {
console.error('❌ Failed to start background sync service:', error);
// Don't exit, continue without background sync
}
console.log(`✅ Simple server running on port ${this.wss.options.port}`);
console.log('📋 Supported operations: UPDATE_PROCESS, NOTIFY_UPDATE, VALIDATE_STATE');
console.log('🔑 Authentication: API key required for all operations');
console.log('🔧 Services: Integrated with SimpleService protocol logic');
console.log('🔄 Background sync: Automatic missing data detection enabled');
} catch (error) {
console.error('❌ Failed to initialize server:', error);
@ -509,6 +574,15 @@ export class Server {
public shutdown() {
console.log('🛑 Shutting down server...');
// Stop background sync service
try {
const service = Service.getInstance();
service.stopBackgroundSync();
console.log('🔄 Background sync service stopped');
} catch (error) {
console.error('❌ Error stopping background sync service:', error);
}
// Close all active client connections first
for (const [ws, clientId] of this.clients.entries()) {
console.log(`🔌 Closing connection to ${clientId}...`);