Compare commits

...

8 Commits

Author SHA1 Message Date
Sosthene
1664b4aa69 Various minor fixes 2025-08-25 01:14:28 +02:00
Sosthene
832fa171d8 Add getMyProcesses api 2025-08-25 01:14:28 +02:00
Sosthene
770eedf39d Add getMyProcesses 2025-08-25 01:14:28 +02:00
Sosthene
64775af683 Fix handleApiReturn 2025-08-25 01:14:28 +02:00
Sosthene
33dbf552a6 Add decryptAttribute 2025-08-25 01:14:28 +02:00
Sosthene
bd10842a53 Add getRoles 2025-08-25 01:14:28 +02:00
Sosthene
6cd7fe3204 [bug] Add processes as arg for parse_cipher 2025-08-25 01:14:28 +02:00
Sosthene
45a1478210 [bug] make shutdown reliable 2025-08-25 01:14:28 +02:00
3 changed files with 270 additions and 19 deletions

View File

@ -342,6 +342,8 @@ export class RelayManager {
break;
case "Commit":
console.log(`📨 Commit response from relay ${relayId}`);
// If we receive a commit response, that's basically an error
console.error(`❌ Commit response from relay ${relayId}:`, message.error);
break;
case "Cipher":
console.log(`📨 Cipher response from relay ${relayId}`);

View File

@ -542,8 +542,9 @@ export class Service {
async parseCipher(message: string): Promise<void> {
const membersList = this.getAllMembers();
const processes = Object.fromEntries(this.getProcesses());
try {
const apiReturn = wasm.parse_cipher(message, membersList);
const apiReturn = wasm.parse_cipher(message, membersList, processes);
await this.handleApiReturn(apiReturn);
} catch (e) {
console.error(`Failed to parse cipher: ${e}`);
@ -607,7 +608,8 @@ export class Service {
const result = wasm.create_update_message(process, stateId, this.membersList);
return result;
} catch (error) {
throw new Error(`Failed to create update message: ${error}`);
const errorMessage = error instanceof Error ? error.message : String(error || 'Unknown error');
throw new Error(errorMessage);
}
}
@ -624,7 +626,8 @@ export class Service {
const result = wasm.validate_state(process, stateId, this.membersList);
return result;
} catch (error) {
throw new Error(`Failed to validate state: ${error}`);
const errorMessage = error instanceof Error ? error.message : String(error || 'Unknown error');
throw new Error(errorMessage);
}
}
@ -661,7 +664,42 @@ export class Service {
throw new Error('Failed to update process');
}
} catch (error) {
throw new Error(`WASM error: ${error}`);
const errorMessage = error instanceof Error ? error.message : String(error || 'Unknown error');
throw new Error(errorMessage);
}
}
public async getMyProcesses(): Promise<string[] | null> {
// If we're not paired yet, just skip it
let pairingProcessId = null;
try {
pairingProcessId = this.getPairingProcessId();
} catch (e) {
return null;
}
if (!pairingProcessId) {
return null;
}
try {
const newMyProcesses = new Set<string>();
// MyProcesses automatically contains pairing process
newMyProcesses.add(pairingProcessId);
for (const [processId, process] of Object.entries(this.processes)) {
try {
const roles = this.getRoles(process);
if (roles && this.rolesContainsMember(roles, pairingProcessId)) {
newMyProcesses.add(processId);
}
} catch (e) {
console.error(e);
}
}
return Array.from(newMyProcesses);
} catch (e) {
console.error("Failed to get processes:", e);
return null;
}
}
@ -708,8 +746,11 @@ export class Service {
}
}
// Database method: Get All Processes
async getAllProcesses(): Promise<Record<string, any>> {
public getProcesses(): Map<string, Process> {
return this.processes;
}
async getAllProcessesFromDb(): Promise<Record<string, any>> {
try {
const db = await Database.getInstance();
const processes = await db.dumpStore('processes');
@ -844,6 +885,19 @@ export class Service {
return null;
}
public getRoles(process: Process): Record<string, RoleDefinition> | null {
const lastCommitedState = this.getLastCommitedState(process);
if (lastCommitedState && lastCommitedState.roles && Object.keys(lastCommitedState.roles).length != 0) {
return lastCommitedState!.roles;
} else if (process.states.length === 2) {
const firstState = process.states[0];
if (firstState && firstState.roles && Object.keys(firstState.roles).length != 0) {
return firstState!.roles;
}
}
return null;
}
public rolesContainsUs(roles: Record<string, RoleDefinition>): boolean {
let us;
try {
@ -964,6 +1018,23 @@ export class Service {
}
public async handleApiReturn(apiReturn: ApiReturn) {
// Check for errors in the returned objects
if (apiReturn.new_tx_to_send && apiReturn.new_tx_to_send.error) {
const error = apiReturn.new_tx_to_send.error;
const errorMessage = typeof error === 'object' && error !== null ?
(error as any).GenericError || JSON.stringify(error) :
String(error);
throw new Error(`Transaction error: ${errorMessage}`);
}
if (apiReturn.commit_to_send && apiReturn.commit_to_send.error) {
const error = apiReturn.commit_to_send.error;
const errorMessage = typeof error === 'object' && error !== null ?
(error as any).GenericError || JSON.stringify(error) :
String(error);
throw new Error(`Commit error: ${errorMessage}`);
}
if (apiReturn.partial_tx) {
try {
const res = wasm.sign_transaction(apiReturn.partial_tx);
@ -1091,6 +1162,77 @@ export class Service {
}
}
async decryptAttribute(processId: string, state: ProcessState, attribute: string): Promise<any | null> {
let hash = state.pcd_commitment[attribute];
if (!hash) {
// attribute doesn't exist
return null;
}
let key = state.keys[attribute];
const pairingProcessId = this.getPairingProcessId();
// If key is missing, request an update and then retry
if (!key) {
const roles = state.roles;
let hasAccess = false;
// If we're not supposed to have access to this attribute, ignore
for (const role of Object.values(roles)) {
for (const rule of Object.values(role.validation_rules)) {
if (rule.fields.includes(attribute)) {
if (role.members.includes(pairingProcessId)) {
// We have access to this attribute
hasAccess = true;
break;
}
}
}
}
if (!hasAccess) return null;
// We should have the key, so we're going to ask other members for it
await this.requestDataFromPeers(processId, [state.state_id], [state.roles]);
const maxRetries = 5;
const retryDelay = 500; // delay in milliseconds
let retries = 0;
while ((!hash || !key) && retries < maxRetries) {
await new Promise(resolve => setTimeout(resolve, retryDelay));
// Re-read hash and key after waiting
hash = state.pcd_commitment[attribute];
key = state.keys[attribute];
retries++;
}
}
if (hash && key) {
const blob = await this.getBlobFromDb(hash);
if (blob) {
// Decrypt the data
const buf = await blob.arrayBuffer();
const cipher = new Uint8Array(buf);
const keyUIntArray = this.hexToUInt8Array(key);
try {
const clear = wasm.decrypt_data(keyUIntArray, cipher);
if (clear) {
// deserialize the result to get the actual data
const decoded = wasm.decode_value(clear);
return decoded;
} else {
throw new Error('decrypt_data returned null');
}
} catch (e) {
console.error(`Failed to decrypt data: ${e}`);
}
}
}
return null;
}
decodeValue(value: number[]): any | null {
try {
return wasm.decode_value(new Uint8Array(value));

View File

@ -4,7 +4,6 @@ import { config } from './config';
import { Service } from './service';
import { ApiReturn, Process } from '../pkg/sdk_client';
import { EMPTY32BYTES } from './utils';
import { rust_zstd_wasm_shim_calloc } from '../pkg/sdk_client_bg.wasm';
interface ServerMessageEvent {
data: {
@ -65,7 +64,8 @@ class SimpleProcessHandlers {
res = await this.service.createPrdUpdate(processId, stateId);
await this.service.handleApiReturn(res);
} catch (e) {
throw new Error(e as string);
const errorMessage = e instanceof Error ? e.message : String(e || 'Unknown error');
throw new Error(errorMessage);
}
return {
@ -73,8 +73,10 @@ class SimpleProcessHandlers {
messageId: event.data.messageId
};
} catch (e) {
const errorMsg = `Failed to notify update for process: ${e}`;
throw new Error(errorMsg);
const errorMessage = e instanceof Error ? e.message : String(e || 'Unknown error');
// Remove redundant "Error:" prefix and simplify the message
const cleanMessage = errorMessage.replace(/^Error:\s*/, '');
throw new Error(cleanMessage);
}
}
@ -100,7 +102,8 @@ class SimpleProcessHandlers {
res = await this.service.approveChange(processId, stateId);
await this.service.handleApiReturn(res);
} catch (e) {
throw new Error(e as string);
const errorMessage = e instanceof Error ? e.message : String(e || 'Unknown error');
throw new Error(errorMessage);
}
return {
@ -109,8 +112,10 @@ class SimpleProcessHandlers {
messageId: event.data.messageId
};
} catch (e) {
const errorMsg = `Failed to validate process: ${e}`;
throw new Error(errorMsg);
const errorMessage = e instanceof Error ? e.message : String(e || 'Unknown error');
// Remove redundant "Error:" prefix and simplify the message
const cleanMessage = errorMessage.replace(/^Error:\s*/, '');
throw new Error(cleanMessage);
}
}
@ -199,12 +204,81 @@ class SimpleProcessHandlers {
messageId: event.data.messageId
};
} catch (e) {
const errorMsg = `Failed to update process: ${e}`;
throw new Error(errorMsg);
const errorMessage = e instanceof Error ? e.message : String(e || 'Unknown error');
// Remove redundant "Error:" prefix and simplify the message
const cleanMessage = errorMessage.replace(/^Error:\s*/, '');
throw new Error(cleanMessage);
}
}
async handleMessage(event: ServerMessageEvent): Promise<ServerResponse > {
async handleGetMyProcesses(event: ServerMessageEvent): Promise<ServerResponse> {
if (event.data.type !== MessageType.GET_MY_PROCESSES) {
throw new Error('Invalid message type');
}
const processes = this.service.getProcesses();
const myProcesses = await this.service.getMyProcesses();
if (!myProcesses || myProcesses.length === 0) {
throw new Error('No my processes found');
}
const filteredProcesses: Record<string, Process> = {};
for (const processId of myProcesses) {
const process = processes.get(processId);
console.log(processId, ':', process);
if (process) {
filteredProcesses[processId] = process;
}
}
const data: Record<string, any> = {};
// Now we decrypt all we can in the processes
for (const [processId, process] of Object.entries(filteredProcesses)) {
// We also take the public data
const lastState = this.service.getLastCommitedState(process);
if (!lastState) {
console.error(`❌ Process ${processId} doesn't have a commited state`);
continue;
}
const processData: Record<string, any> = {};
for (const attribute of Object.keys(lastState.public_data)) {
try {
const value = this.service.decodeValue(lastState.public_data[attribute]);
if (value) {
processData[attribute] = value;
}
} catch (e) {
console.error(`❌ Error decoding public data ${attribute} for process ${processId}:`, e);
}
}
for (let i = process.states.length - 2; i >= 0; i--) {
const state = process.states[i];
for (const attribute of Object.keys(state.keys)) {
if (processData[attribute] !== undefined && processData[attribute] !== null) continue;
try {
const value = await this.service.decryptAttribute(processId, state, attribute);
if (value) {
processData[attribute] = value;
}
} catch (e) {
console.error(`❌ Error decrypting attribute ${attribute} for process ${processId}:`, e);
}
}
}
data[processId] = processData;
}
return {
type: MessageType.PROCESSES_RETRIEVED,
processes: filteredProcesses,
data,
messageId: event.data.messageId
};
}
async handleMessage(event: ServerMessageEvent): Promise<ServerResponse> {
try {
switch (event.data.type) {
case MessageType.NOTIFY_UPDATE:
@ -213,11 +287,14 @@ class SimpleProcessHandlers {
return await this.handleValidateState(event);
case MessageType.UPDATE_PROCESS:
return await this.handleUpdateProcess(event);
case MessageType.GET_MY_PROCESSES:
return await this.handleGetMyProcesses(event);
default:
throw new Error(`Unhandled message type: ${event.data.type}`);
}
} catch (error) {
return this.errorResponse(error as string, event.clientId, event.data.messageId);
const errorMessage = error instanceof Error ? error.message : String(error || 'Unknown error');
return this.errorResponse(errorMessage, event.clientId, event.data.messageId);
}
}
}
@ -301,7 +378,11 @@ export class Server {
console.log('🔑 Already paired with id:', service.getPairingProcessId());
}
// Relays are automatically initialized in Service constructor
// Get all processes from database
await service.getAllProcessesFromDb();
// Connect to relays
await service.connectToRelaysAndWaitForHandshake();
console.log(`✅ Simple server running on port ${this.wss.options.port}`);
console.log('📋 Supported operations: UPDATE_PROCESS, NOTIFY_UPDATE, VALIDATE_STATE');
@ -341,9 +422,10 @@ export class Server {
this.sendToClient(ws, response);
} catch (error) {
console.error(`❌ Error handling message from ${clientId}:`, error);
const errorMessage = error instanceof Error ? error.message : String(error || 'Unknown error');
this.sendToClient(ws, {
type: MessageType.ERROR,
error: `Server error: ${error instanceof Error ? error.message : String(error)}`,
error: `Server error: ${errorMessage}`,
messageId: JSON.parse(data.toString())?.messageId
});
}
@ -377,25 +459,50 @@ export class Server {
public shutdown() {
console.log('🛑 Shutting down server...');
// Close all active client connections first
for (const [ws, clientId] of this.clients.entries()) {
console.log(`🔌 Closing connection to ${clientId}...`);
ws.close(1000, 'Server shutting down');
}
this.clients.clear();
// Close the WebSocket server
this.wss.close(() => {
console.log('✅ Server shutdown complete');
process.exit(0);
});
// Force exit after a timeout if graceful shutdown fails
setTimeout(() => {
console.log('⚠️ Force shutdown after timeout');
process.exit(1);
}, 5000);
}
}
// Handle graceful shutdown
let isShuttingDown = false;
process.on('SIGINT', () => {
if (isShuttingDown) return;
isShuttingDown = true;
console.log('\n🛑 Received SIGINT, shutting down gracefully...');
if (server) {
server.shutdown();
} else {
process.exit(0);
}
});
process.on('SIGTERM', () => {
if (isShuttingDown) return;
isShuttingDown = true;
console.log('\n🛑 Received SIGTERM, shutting down gracefully...');
if (server) {
server.shutdown();
} else {
process.exit(0);
}
});