559 lines
19 KiB
TypeScript
559 lines
19 KiB
TypeScript
import WebSocket from 'ws';
|
|
import { MessageType } from './models';
|
|
import { config } from './config';
|
|
import { Service } from './service';
|
|
import { ApiReturn, Process } from '../pkg/sdk_client';
|
|
import { EMPTY32BYTES, splitPrivateData } from './utils';
|
|
|
|
interface ServerMessageEvent {
|
|
data: {
|
|
type: MessageType;
|
|
messageId?: string;
|
|
apiKey?: string;
|
|
[key: string]: any;
|
|
};
|
|
clientId: string;
|
|
}
|
|
|
|
interface ServerResponse {
|
|
type: MessageType;
|
|
messageId?: string;
|
|
[key: string]: any;
|
|
}
|
|
|
|
class SimpleProcessHandlers {
|
|
private apiKey: string;
|
|
private service: Service;
|
|
|
|
constructor(apiKey: string, service: Service) {
|
|
this.apiKey = apiKey;
|
|
this.service = service;
|
|
}
|
|
|
|
private errorResponse = (errorMsg: string, clientId: string, messageId?: string): ServerResponse => {
|
|
return {
|
|
type: MessageType.ERROR,
|
|
error: errorMsg,
|
|
messageId
|
|
};
|
|
};
|
|
|
|
private validateApiKey(apiKey: string): boolean {
|
|
return apiKey === this.apiKey;
|
|
}
|
|
|
|
async handleCreateProcess(event: ServerMessageEvent): Promise<ServerResponse> {
|
|
if (event.data.type !== MessageType.CREATE_PROCESS) {
|
|
throw new Error('Invalid message type');
|
|
}
|
|
|
|
if (!this.service.isPaired()) {
|
|
throw new Error('Device not paired');
|
|
}
|
|
|
|
try {
|
|
const { processData, privateFields, roles, exclusionRules, apiKey } = event.data;
|
|
|
|
if (!apiKey || !this.validateApiKey(apiKey)) {
|
|
throw new Error('Invalid API key');
|
|
}
|
|
|
|
const { privateData, publicData } = splitPrivateData(processData, privateFields);
|
|
|
|
const createProcessReturn = await this.service.createProcess(privateData, publicData, roles);
|
|
if (!createProcessReturn.updated_process) {
|
|
throw new Error('Empty updated_process in createProcessReturn');
|
|
}
|
|
console.log('🚀 ~ handleCreateProcess ~ createProcessReturn:', createProcessReturn);
|
|
const processId = createProcessReturn.updated_process.process_id;
|
|
const process = createProcessReturn.updated_process.current_process;
|
|
await this.service.handleApiReturn(createProcessReturn);
|
|
|
|
const processCreated = {
|
|
processId,
|
|
process,
|
|
processData,
|
|
}
|
|
|
|
return {
|
|
type: MessageType.PROCESS_CREATED,
|
|
processCreated,
|
|
messageId: event.data.messageId
|
|
};
|
|
} catch (e) {
|
|
const errorMessage = e instanceof Error ? e.message : String(e || 'Unknown error');
|
|
// Remove redundant "Error:" prefix and simplify the message
|
|
const cleanMessage = errorMessage.replace(/^Error:\s*/, '');
|
|
throw new Error(cleanMessage);
|
|
}
|
|
}
|
|
|
|
async handleNotifyUpdate(event: ServerMessageEvent): Promise<ServerResponse> {
|
|
if (event.data.type !== MessageType.NOTIFY_UPDATE) {
|
|
throw new Error('Invalid message type');
|
|
}
|
|
|
|
try {
|
|
const { processId, stateId, apiKey } = event.data;
|
|
|
|
if (!apiKey || !this.validateApiKey(apiKey)) {
|
|
throw new Error('Invalid API key');
|
|
}
|
|
|
|
// Check if device is paired
|
|
if (!this.service.isPaired()) {
|
|
throw new Error('Device not paired');
|
|
}
|
|
|
|
let res: ApiReturn;
|
|
try {
|
|
res = await this.service.createPrdUpdate(processId, stateId);
|
|
await this.service.handleApiReturn(res);
|
|
} catch (e) {
|
|
const errorMessage = e instanceof Error ? e.message : String(e || 'Unknown error');
|
|
throw new Error(errorMessage);
|
|
}
|
|
|
|
return {
|
|
type: MessageType.UPDATE_NOTIFIED,
|
|
messageId: event.data.messageId
|
|
};
|
|
} catch (e) {
|
|
const errorMessage = e instanceof Error ? e.message : String(e || 'Unknown error');
|
|
// Remove redundant "Error:" prefix and simplify the message
|
|
const cleanMessage = errorMessage.replace(/^Error:\s*/, '');
|
|
throw new Error(cleanMessage);
|
|
}
|
|
}
|
|
|
|
async handleValidateState(event: ServerMessageEvent): Promise<ServerResponse> {
|
|
if (event.data.type !== MessageType.VALIDATE_STATE) {
|
|
throw new Error('Invalid message type');
|
|
}
|
|
|
|
try {
|
|
const { processId, stateId, apiKey } = event.data;
|
|
|
|
if (!apiKey || !this.validateApiKey(apiKey)) {
|
|
throw new Error('Invalid API key');
|
|
}
|
|
|
|
// Check if device is paired
|
|
if (!this.service.isPaired()) {
|
|
throw new Error('Device not paired');
|
|
}
|
|
|
|
let res: ApiReturn;
|
|
try {
|
|
res = await this.service.approveChange(processId, stateId);
|
|
await this.service.handleApiReturn(res);
|
|
} catch (e) {
|
|
const errorMessage = e instanceof Error ? e.message : String(e || 'Unknown error');
|
|
throw new Error(errorMessage);
|
|
}
|
|
|
|
return {
|
|
type: MessageType.STATE_VALIDATED,
|
|
validatedProcess: res.updated_process,
|
|
messageId: event.data.messageId
|
|
};
|
|
} catch (e) {
|
|
const errorMessage = e instanceof Error ? e.message : String(e || 'Unknown error');
|
|
// Remove redundant "Error:" prefix and simplify the message
|
|
const cleanMessage = errorMessage.replace(/^Error:\s*/, '');
|
|
throw new Error(cleanMessage);
|
|
}
|
|
}
|
|
|
|
async handleUpdateProcess(event: ServerMessageEvent): Promise<ServerResponse> {
|
|
if (event.data.type !== MessageType.UPDATE_PROCESS) {
|
|
throw new Error('Invalid message type');
|
|
}
|
|
|
|
if (!this.service.isPaired()) {
|
|
throw new Error('Device not paired');
|
|
}
|
|
|
|
try {
|
|
// privateFields is only used if newData contains new fields
|
|
// roles can be empty meaning that roles from the last commited state are kept
|
|
const { processId, newData, privateFields, roles, apiKey } = event.data;
|
|
|
|
if (!apiKey || !this.validateApiKey(apiKey)) {
|
|
throw new Error('Invalid API key');
|
|
}
|
|
|
|
// Check if the new data is already in the process or if it's a new field
|
|
const process = await this.service.getProcess(processId);
|
|
if (!process) {
|
|
throw new Error('Process not found');
|
|
}
|
|
const lastState = this.service.getLastCommitedState(process);
|
|
if (!lastState) {
|
|
throw new Error('Process doesn\'t have a commited state yet');
|
|
}
|
|
const lastStateIndex = this.service.getLastCommitedStateIndex(process);
|
|
if (lastStateIndex === null) {
|
|
throw new Error('Process doesn\'t have a commited state yet');
|
|
}
|
|
|
|
const privateData: Record<string, any> = {};
|
|
const publicData: Record<string, any> = {};
|
|
|
|
for (const field of Object.keys(newData)) {
|
|
// Public data are carried along each new state
|
|
// TODO I hope that at some point we stop doing that
|
|
// So the first thing we can do is check if the new data is public data
|
|
if (lastState.public_data[field]) {
|
|
// Add it to public data
|
|
publicData[field] = newData[field];
|
|
continue;
|
|
}
|
|
|
|
// If it's not a public data, it may be either a private data update, or a new field (public of private)
|
|
// Caller gave us a list of new private fields, if we see it here this is a new private field
|
|
if (privateFields.includes(field)) {
|
|
// Add it to private data
|
|
privateData[field] = newData[field];
|
|
continue;
|
|
}
|
|
|
|
// Now it can be an update of private data or a new public data
|
|
// We check that the field exists in previous states private data
|
|
for (let i = lastStateIndex; i >= 0; i--) {
|
|
const state = process.states[i];
|
|
if (state.pcd_commitment[field]) {
|
|
// We don't even check if it's a public field, we would have seen it in the last state
|
|
// TODO maybe that's an issue if we remove a public field at some point? That's not explicitly forbidden but not really supported yet
|
|
privateData[field] = newData[field];
|
|
break;
|
|
} else {
|
|
// This attribute was not modified in that state, we go back to the previous state
|
|
continue;
|
|
}
|
|
}
|
|
|
|
if (privateData[field]) continue;
|
|
|
|
// We've get back all the way to the first state without seeing it, it's a new public field
|
|
publicData[field] = newData[field];
|
|
}
|
|
|
|
// We'll let the wasm check if roles are consistent
|
|
|
|
const res = await this.service.updateProcess(process, privateData, publicData, roles);
|
|
await this.service.handleApiReturn(res);
|
|
|
|
return {
|
|
type: MessageType.PROCESS_UPDATED,
|
|
updatedProcess: res.updated_process,
|
|
messageId: event.data.messageId
|
|
};
|
|
} catch (e) {
|
|
const errorMessage = e instanceof Error ? e.message : String(e || 'Unknown error');
|
|
// Remove redundant "Error:" prefix and simplify the message
|
|
const cleanMessage = errorMessage.replace(/^Error:\s*/, '');
|
|
throw new Error(cleanMessage);
|
|
}
|
|
}
|
|
|
|
async handleGetMyProcesses(event: ServerMessageEvent): Promise<ServerResponse> {
|
|
if (event.data.type !== MessageType.GET_MY_PROCESSES) {
|
|
throw new Error('Invalid message type');
|
|
}
|
|
|
|
const processes = this.service.getProcesses();
|
|
const myProcesses = await this.service.getMyProcesses();
|
|
|
|
if (!myProcesses || myProcesses.length === 0) {
|
|
throw new Error('No my processes found');
|
|
}
|
|
|
|
const filteredProcesses: Record<string, Process> = {};
|
|
for (const processId of myProcesses) {
|
|
const process = processes.get(processId);
|
|
console.log(processId, ':', process);
|
|
|
|
if (process) {
|
|
filteredProcesses[processId] = process;
|
|
}
|
|
}
|
|
|
|
const data: Record<string, any> = {};
|
|
// Now we decrypt all we can in the processes
|
|
for (const [processId, process] of Object.entries(filteredProcesses)) {
|
|
// We also take the public data
|
|
const lastState = this.service.getLastCommitedState(process);
|
|
if (!lastState) {
|
|
console.error(`❌ Process ${processId} doesn't have a commited state`);
|
|
continue;
|
|
}
|
|
const processData: Record<string, any> = {};
|
|
for (const attribute of Object.keys(lastState.public_data)) {
|
|
try {
|
|
const value = this.service.decodeValue(lastState.public_data[attribute]);
|
|
if (value) {
|
|
processData[attribute] = value;
|
|
}
|
|
} catch (e) {
|
|
console.error(`❌ Error decoding public data ${attribute} for process ${processId}:`, e);
|
|
}
|
|
}
|
|
for (let i = process.states.length - 2; i >= 0; i--) {
|
|
const state = process.states[i];
|
|
for (const attribute of Object.keys(state.keys)) {
|
|
if (processData[attribute] !== undefined && processData[attribute] !== null) continue;
|
|
try {
|
|
const value = await this.service.decryptAttribute(processId, state, attribute);
|
|
if (value) {
|
|
processData[attribute] = value;
|
|
}
|
|
} catch (e) {
|
|
console.error(`❌ Error decrypting attribute ${attribute} for process ${processId}:`, e);
|
|
}
|
|
}
|
|
}
|
|
data[processId] = processData;
|
|
}
|
|
|
|
return {
|
|
type: MessageType.PROCESSES_RETRIEVED,
|
|
processes: filteredProcesses,
|
|
data,
|
|
messageId: event.data.messageId
|
|
};
|
|
}
|
|
|
|
async handleMessage(event: ServerMessageEvent): Promise<ServerResponse> {
|
|
try {
|
|
switch (event.data.type) {
|
|
case MessageType.CREATE_PROCESS:
|
|
return await this.handleCreateProcess(event);
|
|
case MessageType.NOTIFY_UPDATE:
|
|
return await this.handleNotifyUpdate(event);
|
|
case MessageType.VALIDATE_STATE:
|
|
return await this.handleValidateState(event);
|
|
case MessageType.UPDATE_PROCESS:
|
|
return await this.handleUpdateProcess(event);
|
|
case MessageType.GET_MY_PROCESSES:
|
|
return await this.handleGetMyProcesses(event);
|
|
default:
|
|
throw new Error(`Unhandled message type: ${event.data.type}`);
|
|
}
|
|
} catch (error) {
|
|
const errorMessage = error instanceof Error ? error.message : String(error || 'Unknown error');
|
|
return this.errorResponse(errorMessage, event.clientId, event.data.messageId);
|
|
}
|
|
}
|
|
}
|
|
|
|
export class Server {
|
|
private wss: WebSocket.Server;
|
|
private handlers!: SimpleProcessHandlers;
|
|
private clients: Map<WebSocket, string> = new Map();
|
|
|
|
constructor(port: number = 9090) {
|
|
this.wss = new WebSocket.Server({ port });
|
|
this.init();
|
|
}
|
|
|
|
private async init() {
|
|
try {
|
|
console.log('🚀 Initializing Simple 4NK Protocol Server...');
|
|
|
|
// Initialize service
|
|
const service = Service.getInstance();
|
|
|
|
// Initialize handlers with API key and service
|
|
this.handlers = new SimpleProcessHandlers(config.apiKey, service);
|
|
|
|
// Setup WebSocket handlers
|
|
this.setupWebSocketHandlers();
|
|
|
|
// Check if we have a device
|
|
const hasDevice = await service.hasDevice();
|
|
if (!hasDevice) {
|
|
const spAddress = await service.createNewDevice();
|
|
console.log('🔑 New device created:', spAddress);
|
|
} else {
|
|
console.log('🔑 Device found, restoring from database...');
|
|
const device = await service.getDeviceFromDatabase();
|
|
const metadata = await service.getDeviceMetadata();
|
|
|
|
if (device) {
|
|
await service.restoreDeviceFromDatabase(device);
|
|
console.log('🔑 Device restored successfully');
|
|
if (metadata) {
|
|
console.log(`📋 Device info: ${metadata.device_address} (created: ${metadata.created_at})`);
|
|
}
|
|
} else {
|
|
console.error('❌ Failed to retrieve device from database');
|
|
}
|
|
}
|
|
|
|
// Check if we are paired
|
|
if (!service.isPaired()) {
|
|
console.log('🔑 Not paired, creating pairing process...');
|
|
try {
|
|
const pairingResult = await service.createPairingProcess('', []);
|
|
const processId: string = pairingResult.updated_process?.process_id;
|
|
const stateId = pairingResult.updated_process?.current_process?.states[0].state_id;
|
|
if (!processId || !stateId) {
|
|
throw new Error('Failed to get process id or state id');
|
|
}
|
|
await service.handleApiReturn(pairingResult);
|
|
const udpateResult = await service.createPrdUpdate(processId, stateId);
|
|
await service.handleApiReturn(udpateResult);
|
|
const approveResult = await service.approveChange(processId, stateId);
|
|
await service.handleApiReturn(approveResult);
|
|
|
|
// now pair the device
|
|
service.pairDevice(processId, [service.getDeviceAddress()]);
|
|
|
|
// Update the device in the database
|
|
const device = service.dumpDeviceFromMemory();
|
|
if (device) {
|
|
await service.restoreDeviceFromDatabase(device);
|
|
} else {
|
|
throw new Error('Failed to dump device from wasm');
|
|
}
|
|
|
|
console.log('🔑 Pairing process created successfully');
|
|
} catch (error) {
|
|
console.error('❌ Failed to create pairing process:', error);
|
|
}
|
|
} else {
|
|
console.log('🔑 Already paired with id:', service.getPairingProcessId());
|
|
}
|
|
|
|
// Get all processes from database
|
|
await service.getAllProcessesFromDb();
|
|
|
|
// Connect to relays
|
|
await service.connectToRelaysAndWaitForHandshake();
|
|
|
|
console.log(`✅ Simple server running on port ${this.wss.options.port}`);
|
|
console.log('📋 Supported operations: UPDATE_PROCESS, NOTIFY_UPDATE, VALIDATE_STATE');
|
|
console.log('🔑 Authentication: API key required for all operations');
|
|
console.log('🔧 Services: Integrated with SimpleService protocol logic');
|
|
|
|
} catch (error) {
|
|
console.error('❌ Failed to initialize server:', error);
|
|
process.exit(1);
|
|
}
|
|
}
|
|
|
|
private setupWebSocketHandlers() {
|
|
this.wss.on('connection', (ws: WebSocket, req) => {
|
|
const clientId = this.generateClientId();
|
|
this.clients.set(ws, clientId);
|
|
|
|
console.log(`🔗 Client connected: ${clientId} from ${req.socket.remoteAddress}`);
|
|
|
|
// Send listening message
|
|
this.sendToClient(ws, {
|
|
type: MessageType.LISTENING,
|
|
clientId
|
|
});
|
|
|
|
ws.on('message', async (data: WebSocket.Data) => {
|
|
try {
|
|
const message = JSON.parse(data.toString());
|
|
console.log(`📨 Received message from ${clientId}:`, message.type);
|
|
|
|
const serverEvent: ServerMessageEvent = {
|
|
data: message,
|
|
clientId
|
|
};
|
|
|
|
const response = await this.handlers.handleMessage(serverEvent);
|
|
this.sendToClient(ws, response);
|
|
} catch (error) {
|
|
console.error(`❌ Error handling message from ${clientId}:`, error);
|
|
const errorMessage = error instanceof Error ? error.message : String(error || 'Unknown error');
|
|
this.sendToClient(ws, {
|
|
type: MessageType.ERROR,
|
|
error: `Server error: ${errorMessage}`,
|
|
messageId: JSON.parse(data.toString())?.messageId
|
|
});
|
|
}
|
|
});
|
|
|
|
ws.on('close', () => {
|
|
console.log(`🔌 Client disconnected: ${clientId}`);
|
|
this.clients.delete(ws);
|
|
});
|
|
|
|
ws.on('error', (error) => {
|
|
console.error(`❌ WebSocket error for ${clientId}:`, error);
|
|
this.clients.delete(ws);
|
|
});
|
|
});
|
|
|
|
this.wss.on('error', (error) => {
|
|
console.error('❌ WebSocket server error:', error);
|
|
});
|
|
}
|
|
|
|
private sendToClient(ws: WebSocket, response: ServerResponse) {
|
|
if (ws.readyState === WebSocket.OPEN) {
|
|
ws.send(JSON.stringify(response));
|
|
}
|
|
}
|
|
|
|
private generateClientId(): string {
|
|
return `client_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
|
|
}
|
|
|
|
public shutdown() {
|
|
console.log('🛑 Shutting down server...');
|
|
|
|
// Close all active client connections first
|
|
for (const [ws, clientId] of this.clients.entries()) {
|
|
console.log(`🔌 Closing connection to ${clientId}...`);
|
|
ws.close(1000, 'Server shutting down');
|
|
}
|
|
this.clients.clear();
|
|
|
|
// Close the WebSocket server
|
|
this.wss.close(() => {
|
|
console.log('✅ Server shutdown complete');
|
|
process.exit(0);
|
|
});
|
|
|
|
// Force exit after a timeout if graceful shutdown fails
|
|
setTimeout(() => {
|
|
console.log('⚠️ Force shutdown after timeout');
|
|
process.exit(1);
|
|
}, 5000);
|
|
}
|
|
}
|
|
|
|
// Handle graceful shutdown
|
|
let isShuttingDown = false;
|
|
|
|
process.on('SIGINT', () => {
|
|
if (isShuttingDown) return;
|
|
isShuttingDown = true;
|
|
console.log('\n🛑 Received SIGINT, shutting down gracefully...');
|
|
if (server) {
|
|
server.shutdown();
|
|
} else {
|
|
process.exit(0);
|
|
}
|
|
});
|
|
|
|
process.on('SIGTERM', () => {
|
|
if (isShuttingDown) return;
|
|
isShuttingDown = true;
|
|
console.log('\n🛑 Received SIGTERM, shutting down gracefully...');
|
|
if (server) {
|
|
server.shutdown();
|
|
} else {
|
|
process.exit(0);
|
|
}
|
|
});
|
|
|
|
// Start the server
|
|
const port = parseInt(process.env.PORT || '9090');
|
|
const server = new Server(port);
|