Compare commits

..

8 Commits

Author SHA1 Message Date
Sosthene
1664b4aa69 Various minor fixes 2025-08-25 01:14:28 +02:00
Sosthene
832fa171d8 Add getMyProcesses api 2025-08-25 01:14:28 +02:00
Sosthene
770eedf39d Add getMyProcesses 2025-08-25 01:14:28 +02:00
Sosthene
64775af683 Fix handleApiReturn 2025-08-25 01:14:28 +02:00
Sosthene
33dbf552a6 Add decryptAttribute 2025-08-25 01:14:28 +02:00
Sosthene
bd10842a53 Add getRoles 2025-08-25 01:14:28 +02:00
Sosthene
6cd7fe3204 [bug] Add processes as arg for parse_cipher 2025-08-25 01:14:28 +02:00
Sosthene
45a1478210 [bug] make shutdown reliable 2025-08-25 01:14:28 +02:00
3 changed files with 270 additions and 19 deletions

View File

@ -342,6 +342,8 @@ export class RelayManager {
break; break;
case "Commit": case "Commit":
console.log(`📨 Commit response from relay ${relayId}`); console.log(`📨 Commit response from relay ${relayId}`);
// If we receive a commit response, that's basically an error
console.error(`❌ Commit response from relay ${relayId}:`, message.error);
break; break;
case "Cipher": case "Cipher":
console.log(`📨 Cipher response from relay ${relayId}`); console.log(`📨 Cipher response from relay ${relayId}`);

View File

@ -542,8 +542,9 @@ export class Service {
async parseCipher(message: string): Promise<void> { async parseCipher(message: string): Promise<void> {
const membersList = this.getAllMembers(); const membersList = this.getAllMembers();
const processes = Object.fromEntries(this.getProcesses());
try { try {
const apiReturn = wasm.parse_cipher(message, membersList); const apiReturn = wasm.parse_cipher(message, membersList, processes);
await this.handleApiReturn(apiReturn); await this.handleApiReturn(apiReturn);
} catch (e) { } catch (e) {
console.error(`Failed to parse cipher: ${e}`); console.error(`Failed to parse cipher: ${e}`);
@ -607,7 +608,8 @@ export class Service {
const result = wasm.create_update_message(process, stateId, this.membersList); const result = wasm.create_update_message(process, stateId, this.membersList);
return result; return result;
} catch (error) { } catch (error) {
throw new Error(`Failed to create update message: ${error}`); const errorMessage = error instanceof Error ? error.message : String(error || 'Unknown error');
throw new Error(errorMessage);
} }
} }
@ -624,7 +626,8 @@ export class Service {
const result = wasm.validate_state(process, stateId, this.membersList); const result = wasm.validate_state(process, stateId, this.membersList);
return result; return result;
} catch (error) { } catch (error) {
throw new Error(`Failed to validate state: ${error}`); const errorMessage = error instanceof Error ? error.message : String(error || 'Unknown error');
throw new Error(errorMessage);
} }
} }
@ -661,7 +664,42 @@ export class Service {
throw new Error('Failed to update process'); throw new Error('Failed to update process');
} }
} catch (error) { } catch (error) {
throw new Error(`WASM error: ${error}`); const errorMessage = error instanceof Error ? error.message : String(error || 'Unknown error');
throw new Error(errorMessage);
}
}
public async getMyProcesses(): Promise<string[] | null> {
// If we're not paired yet, just skip it
let pairingProcessId = null;
try {
pairingProcessId = this.getPairingProcessId();
} catch (e) {
return null;
}
if (!pairingProcessId) {
return null;
}
try {
const newMyProcesses = new Set<string>();
// MyProcesses automatically contains pairing process
newMyProcesses.add(pairingProcessId);
for (const [processId, process] of Object.entries(this.processes)) {
try {
const roles = this.getRoles(process);
if (roles && this.rolesContainsMember(roles, pairingProcessId)) {
newMyProcesses.add(processId);
}
} catch (e) {
console.error(e);
}
}
return Array.from(newMyProcesses);
} catch (e) {
console.error("Failed to get processes:", e);
return null;
} }
} }
@ -708,8 +746,11 @@ export class Service {
} }
} }
// Database method: Get All Processes public getProcesses(): Map<string, Process> {
async getAllProcesses(): Promise<Record<string, any>> { return this.processes;
}
async getAllProcessesFromDb(): Promise<Record<string, any>> {
try { try {
const db = await Database.getInstance(); const db = await Database.getInstance();
const processes = await db.dumpStore('processes'); const processes = await db.dumpStore('processes');
@ -844,6 +885,19 @@ export class Service {
return null; return null;
} }
public getRoles(process: Process): Record<string, RoleDefinition> | null {
const lastCommitedState = this.getLastCommitedState(process);
if (lastCommitedState && lastCommitedState.roles && Object.keys(lastCommitedState.roles).length != 0) {
return lastCommitedState!.roles;
} else if (process.states.length === 2) {
const firstState = process.states[0];
if (firstState && firstState.roles && Object.keys(firstState.roles).length != 0) {
return firstState!.roles;
}
}
return null;
}
public rolesContainsUs(roles: Record<string, RoleDefinition>): boolean { public rolesContainsUs(roles: Record<string, RoleDefinition>): boolean {
let us; let us;
try { try {
@ -964,6 +1018,23 @@ export class Service {
} }
public async handleApiReturn(apiReturn: ApiReturn) { public async handleApiReturn(apiReturn: ApiReturn) {
// Check for errors in the returned objects
if (apiReturn.new_tx_to_send && apiReturn.new_tx_to_send.error) {
const error = apiReturn.new_tx_to_send.error;
const errorMessage = typeof error === 'object' && error !== null ?
(error as any).GenericError || JSON.stringify(error) :
String(error);
throw new Error(`Transaction error: ${errorMessage}`);
}
if (apiReturn.commit_to_send && apiReturn.commit_to_send.error) {
const error = apiReturn.commit_to_send.error;
const errorMessage = typeof error === 'object' && error !== null ?
(error as any).GenericError || JSON.stringify(error) :
String(error);
throw new Error(`Commit error: ${errorMessage}`);
}
if (apiReturn.partial_tx) { if (apiReturn.partial_tx) {
try { try {
const res = wasm.sign_transaction(apiReturn.partial_tx); const res = wasm.sign_transaction(apiReturn.partial_tx);
@ -1091,6 +1162,77 @@ export class Service {
} }
} }
async decryptAttribute(processId: string, state: ProcessState, attribute: string): Promise<any | null> {
let hash = state.pcd_commitment[attribute];
if (!hash) {
// attribute doesn't exist
return null;
}
let key = state.keys[attribute];
const pairingProcessId = this.getPairingProcessId();
// If key is missing, request an update and then retry
if (!key) {
const roles = state.roles;
let hasAccess = false;
// If we're not supposed to have access to this attribute, ignore
for (const role of Object.values(roles)) {
for (const rule of Object.values(role.validation_rules)) {
if (rule.fields.includes(attribute)) {
if (role.members.includes(pairingProcessId)) {
// We have access to this attribute
hasAccess = true;
break;
}
}
}
}
if (!hasAccess) return null;
// We should have the key, so we're going to ask other members for it
await this.requestDataFromPeers(processId, [state.state_id], [state.roles]);
const maxRetries = 5;
const retryDelay = 500; // delay in milliseconds
let retries = 0;
while ((!hash || !key) && retries < maxRetries) {
await new Promise(resolve => setTimeout(resolve, retryDelay));
// Re-read hash and key after waiting
hash = state.pcd_commitment[attribute];
key = state.keys[attribute];
retries++;
}
}
if (hash && key) {
const blob = await this.getBlobFromDb(hash);
if (blob) {
// Decrypt the data
const buf = await blob.arrayBuffer();
const cipher = new Uint8Array(buf);
const keyUIntArray = this.hexToUInt8Array(key);
try {
const clear = wasm.decrypt_data(keyUIntArray, cipher);
if (clear) {
// deserialize the result to get the actual data
const decoded = wasm.decode_value(clear);
return decoded;
} else {
throw new Error('decrypt_data returned null');
}
} catch (e) {
console.error(`Failed to decrypt data: ${e}`);
}
}
}
return null;
}
decodeValue(value: number[]): any | null { decodeValue(value: number[]): any | null {
try { try {
return wasm.decode_value(new Uint8Array(value)); return wasm.decode_value(new Uint8Array(value));

View File

@ -4,7 +4,6 @@ import { config } from './config';
import { Service } from './service'; import { Service } from './service';
import { ApiReturn, Process } from '../pkg/sdk_client'; import { ApiReturn, Process } from '../pkg/sdk_client';
import { EMPTY32BYTES } from './utils'; import { EMPTY32BYTES } from './utils';
import { rust_zstd_wasm_shim_calloc } from '../pkg/sdk_client_bg.wasm';
interface ServerMessageEvent { interface ServerMessageEvent {
data: { data: {
@ -65,7 +64,8 @@ class SimpleProcessHandlers {
res = await this.service.createPrdUpdate(processId, stateId); res = await this.service.createPrdUpdate(processId, stateId);
await this.service.handleApiReturn(res); await this.service.handleApiReturn(res);
} catch (e) { } catch (e) {
throw new Error(e as string); const errorMessage = e instanceof Error ? e.message : String(e || 'Unknown error');
throw new Error(errorMessage);
} }
return { return {
@ -73,8 +73,10 @@ class SimpleProcessHandlers {
messageId: event.data.messageId messageId: event.data.messageId
}; };
} catch (e) { } catch (e) {
const errorMsg = `Failed to notify update for process: ${e}`; const errorMessage = e instanceof Error ? e.message : String(e || 'Unknown error');
throw new Error(errorMsg); // Remove redundant "Error:" prefix and simplify the message
const cleanMessage = errorMessage.replace(/^Error:\s*/, '');
throw new Error(cleanMessage);
} }
} }
@ -100,7 +102,8 @@ class SimpleProcessHandlers {
res = await this.service.approveChange(processId, stateId); res = await this.service.approveChange(processId, stateId);
await this.service.handleApiReturn(res); await this.service.handleApiReturn(res);
} catch (e) { } catch (e) {
throw new Error(e as string); const errorMessage = e instanceof Error ? e.message : String(e || 'Unknown error');
throw new Error(errorMessage);
} }
return { return {
@ -109,8 +112,10 @@ class SimpleProcessHandlers {
messageId: event.data.messageId messageId: event.data.messageId
}; };
} catch (e) { } catch (e) {
const errorMsg = `Failed to validate process: ${e}`; const errorMessage = e instanceof Error ? e.message : String(e || 'Unknown error');
throw new Error(errorMsg); // Remove redundant "Error:" prefix and simplify the message
const cleanMessage = errorMessage.replace(/^Error:\s*/, '');
throw new Error(cleanMessage);
} }
} }
@ -199,12 +204,81 @@ class SimpleProcessHandlers {
messageId: event.data.messageId messageId: event.data.messageId
}; };
} catch (e) { } catch (e) {
const errorMsg = `Failed to update process: ${e}`; const errorMessage = e instanceof Error ? e.message : String(e || 'Unknown error');
throw new Error(errorMsg); // Remove redundant "Error:" prefix and simplify the message
const cleanMessage = errorMessage.replace(/^Error:\s*/, '');
throw new Error(cleanMessage);
} }
} }
async handleMessage(event: ServerMessageEvent): Promise<ServerResponse > { async handleGetMyProcesses(event: ServerMessageEvent): Promise<ServerResponse> {
if (event.data.type !== MessageType.GET_MY_PROCESSES) {
throw new Error('Invalid message type');
}
const processes = this.service.getProcesses();
const myProcesses = await this.service.getMyProcesses();
if (!myProcesses || myProcesses.length === 0) {
throw new Error('No my processes found');
}
const filteredProcesses: Record<string, Process> = {};
for (const processId of myProcesses) {
const process = processes.get(processId);
console.log(processId, ':', process);
if (process) {
filteredProcesses[processId] = process;
}
}
const data: Record<string, any> = {};
// Now we decrypt all we can in the processes
for (const [processId, process] of Object.entries(filteredProcesses)) {
// We also take the public data
const lastState = this.service.getLastCommitedState(process);
if (!lastState) {
console.error(`❌ Process ${processId} doesn't have a commited state`);
continue;
}
const processData: Record<string, any> = {};
for (const attribute of Object.keys(lastState.public_data)) {
try {
const value = this.service.decodeValue(lastState.public_data[attribute]);
if (value) {
processData[attribute] = value;
}
} catch (e) {
console.error(`❌ Error decoding public data ${attribute} for process ${processId}:`, e);
}
}
for (let i = process.states.length - 2; i >= 0; i--) {
const state = process.states[i];
for (const attribute of Object.keys(state.keys)) {
if (processData[attribute] !== undefined && processData[attribute] !== null) continue;
try {
const value = await this.service.decryptAttribute(processId, state, attribute);
if (value) {
processData[attribute] = value;
}
} catch (e) {
console.error(`❌ Error decrypting attribute ${attribute} for process ${processId}:`, e);
}
}
}
data[processId] = processData;
}
return {
type: MessageType.PROCESSES_RETRIEVED,
processes: filteredProcesses,
data,
messageId: event.data.messageId
};
}
async handleMessage(event: ServerMessageEvent): Promise<ServerResponse> {
try { try {
switch (event.data.type) { switch (event.data.type) {
case MessageType.NOTIFY_UPDATE: case MessageType.NOTIFY_UPDATE:
@ -213,11 +287,14 @@ class SimpleProcessHandlers {
return await this.handleValidateState(event); return await this.handleValidateState(event);
case MessageType.UPDATE_PROCESS: case MessageType.UPDATE_PROCESS:
return await this.handleUpdateProcess(event); return await this.handleUpdateProcess(event);
case MessageType.GET_MY_PROCESSES:
return await this.handleGetMyProcesses(event);
default: default:
throw new Error(`Unhandled message type: ${event.data.type}`); throw new Error(`Unhandled message type: ${event.data.type}`);
} }
} catch (error) { } catch (error) {
return this.errorResponse(error as string, event.clientId, event.data.messageId); const errorMessage = error instanceof Error ? error.message : String(error || 'Unknown error');
return this.errorResponse(errorMessage, event.clientId, event.data.messageId);
} }
} }
} }
@ -301,7 +378,11 @@ export class Server {
console.log('🔑 Already paired with id:', service.getPairingProcessId()); console.log('🔑 Already paired with id:', service.getPairingProcessId());
} }
// Relays are automatically initialized in Service constructor // Get all processes from database
await service.getAllProcessesFromDb();
// Connect to relays
await service.connectToRelaysAndWaitForHandshake();
console.log(`✅ Simple server running on port ${this.wss.options.port}`); console.log(`✅ Simple server running on port ${this.wss.options.port}`);
console.log('📋 Supported operations: UPDATE_PROCESS, NOTIFY_UPDATE, VALIDATE_STATE'); console.log('📋 Supported operations: UPDATE_PROCESS, NOTIFY_UPDATE, VALIDATE_STATE');
@ -341,9 +422,10 @@ export class Server {
this.sendToClient(ws, response); this.sendToClient(ws, response);
} catch (error) { } catch (error) {
console.error(`❌ Error handling message from ${clientId}:`, error); console.error(`❌ Error handling message from ${clientId}:`, error);
const errorMessage = error instanceof Error ? error.message : String(error || 'Unknown error');
this.sendToClient(ws, { this.sendToClient(ws, {
type: MessageType.ERROR, type: MessageType.ERROR,
error: `Server error: ${error instanceof Error ? error.message : String(error)}`, error: `Server error: ${errorMessage}`,
messageId: JSON.parse(data.toString())?.messageId messageId: JSON.parse(data.toString())?.messageId
}); });
} }
@ -377,25 +459,50 @@ export class Server {
public shutdown() { public shutdown() {
console.log('🛑 Shutting down server...'); console.log('🛑 Shutting down server...');
// Close all active client connections first
for (const [ws, clientId] of this.clients.entries()) {
console.log(`🔌 Closing connection to ${clientId}...`);
ws.close(1000, 'Server shutting down');
}
this.clients.clear();
// Close the WebSocket server
this.wss.close(() => { this.wss.close(() => {
console.log('✅ Server shutdown complete'); console.log('✅ Server shutdown complete');
process.exit(0); process.exit(0);
}); });
// Force exit after a timeout if graceful shutdown fails
setTimeout(() => {
console.log('⚠️ Force shutdown after timeout');
process.exit(1);
}, 5000);
} }
} }
// Handle graceful shutdown // Handle graceful shutdown
let isShuttingDown = false;
process.on('SIGINT', () => { process.on('SIGINT', () => {
if (isShuttingDown) return;
isShuttingDown = true;
console.log('\n🛑 Received SIGINT, shutting down gracefully...'); console.log('\n🛑 Received SIGINT, shutting down gracefully...');
if (server) { if (server) {
server.shutdown(); server.shutdown();
} else {
process.exit(0);
} }
}); });
process.on('SIGTERM', () => { process.on('SIGTERM', () => {
if (isShuttingDown) return;
isShuttingDown = true;
console.log('\n🛑 Received SIGTERM, shutting down gracefully...'); console.log('\n🛑 Received SIGTERM, shutting down gracefully...');
if (server) { if (server) {
server.shutdown(); server.shutdown();
} else {
process.exit(0);
} }
}); });