Compare commits
7 Commits
93eb637f1c
...
6625771830
Author | SHA1 | Date | |
---|---|---|---|
![]() |
6625771830 | ||
![]() |
e320cfa193 | ||
![]() |
80dc42bbe6 | ||
![]() |
6569686634 | ||
![]() |
77e3dfc29c | ||
![]() |
a2ae855c10 | ||
![]() |
c3455ac888 |
@ -60,19 +60,27 @@ export default class Database {
|
||||
}
|
||||
|
||||
private parseKey(fullKey: string): { storeName: string; key: string } | null {
|
||||
const parts = fullKey.split(':', 2);
|
||||
if (parts.length !== 2) return null;
|
||||
return { storeName: parts[0], key: parts[1] };
|
||||
const colonIndex = fullKey.indexOf(':');
|
||||
if (colonIndex === -1) return null;
|
||||
|
||||
const storeName = fullKey.substring(0, colonIndex);
|
||||
const key = fullKey.substring(colonIndex + 1);
|
||||
|
||||
return { storeName, key };
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a single object from a store
|
||||
* O(log n) operation - only reads specific key
|
||||
*/
|
||||
public async getObject(storeName: string, key: string): Promise<any | null> {
|
||||
public async getObject(storeName: string, key: string, isBuffer: boolean = false): Promise<any | null> {
|
||||
try {
|
||||
const fullKey = this.getKey(storeName, key);
|
||||
if (isBuffer) {
|
||||
return await this.db.get(fullKey, { valueEncoding: 'buffer' });
|
||||
} else {
|
||||
return await this.db.get(fullKey);
|
||||
}
|
||||
} catch (error) {
|
||||
if ((error as any).code === 'LEVEL_NOT_FOUND') {
|
||||
return null;
|
||||
@ -85,12 +93,16 @@ export default class Database {
|
||||
* Add or update an object in a store
|
||||
* O(log n) operation - only writes specific key-value pair
|
||||
*/
|
||||
public async addObject(operation: DatabaseObject): Promise<void> {
|
||||
public async addObject(operation: DatabaseObject, isBuffer: boolean = false): Promise<void> {
|
||||
const { storeName, object, key } = operation;
|
||||
|
||||
if (key) {
|
||||
const fullKey = this.getKey(storeName, key);
|
||||
if (isBuffer) {
|
||||
await this.db.put(fullKey, object, { valueEncoding: 'buffer' });
|
||||
} else {
|
||||
await this.db.put(fullKey, object);
|
||||
}
|
||||
} else {
|
||||
// Auto-generate key if none provided
|
||||
const autoKey = Date.now().toString() + Math.random().toString(36).substr(2, 9);
|
||||
|
@ -80,57 +80,50 @@ export class Service {
|
||||
const existing = await this.getProcess(processId);
|
||||
if (existing) {
|
||||
// Look for state id we don't know yet
|
||||
let new_states = [];
|
||||
let roles = [];
|
||||
let newStates: string[] = [];
|
||||
let newRoles: Record<string, RoleDefinition>[] = [];
|
||||
for (const state of process.states) {
|
||||
if (!state.state_id || state.state_id === EMPTY32BYTES) { continue; }
|
||||
if (!this.lookForStateId(existing, state.state_id)) {
|
||||
if (!state || !state.state_id) { continue; } // shouldn't happen
|
||||
if (state.state_id === EMPTY32BYTES) {
|
||||
// We check that the tip is the same we have, if not we update
|
||||
const existingTip = existing.states[existing.states.length - 1].commited_in;
|
||||
if (existingTip !== state.commited_in) {
|
||||
console.log('Found new tip for process', processId);
|
||||
existing.states.pop(); // We discard the last state
|
||||
existing.states.push(state);
|
||||
// We know that's the last state, so we just trigger the update
|
||||
toSave[processId] = existing;
|
||||
}
|
||||
} else if (!this.lookForStateId(existing, state.state_id)) {
|
||||
// We don't want to overwrite what we already have for existing processes
|
||||
// We may end up overwriting the keys for example
|
||||
// So the process we're going to save needs to merge new states with what we already have
|
||||
const existingLastState = existing.states.pop();
|
||||
existing.states.push(state);
|
||||
existing.states.push(existingLastState);
|
||||
toSave[processId] = existing; // We mark it for update
|
||||
if (this.rolesContainsUs(state.roles)) {
|
||||
new_states.push(state.state_id);
|
||||
roles.push(state.roles);
|
||||
newStates.push(state.state_id);
|
||||
newRoles.push(state.roles);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (new_states.length != 0) {
|
||||
// We request the new states
|
||||
await this.requestDataFromPeers(processId, new_states, roles);
|
||||
toSave[processId] = process;
|
||||
if (newStates.length != 0) {
|
||||
await this.requestDataFromPeers(processId, newStates, newRoles);
|
||||
}
|
||||
|
||||
// Just to be sure check if that's a pairing process
|
||||
const lastCommitedState = this.getLastCommitedState(process);
|
||||
if (lastCommitedState && lastCommitedState.public_data && lastCommitedState.public_data['pairedAddresses']) {
|
||||
// This is a pairing process
|
||||
try {
|
||||
const pairedAddresses = this.decodeValue(lastCommitedState.public_data['pairedAddresses'] as unknown as number[]);
|
||||
// Are we part of it?
|
||||
if (pairedAddresses && pairedAddresses.length > 0 && pairedAddresses.includes(this.getDeviceAddress())) {
|
||||
// We save the process to db
|
||||
await this.saveProcessToDb(processId, process as Process);
|
||||
// We update the device
|
||||
await this.updateDevice();
|
||||
}
|
||||
} catch (e) {
|
||||
console.error('Failed to check for pairing process:', e);
|
||||
}
|
||||
}
|
||||
|
||||
// Otherwise we're probably just in the initial loading at page initialization
|
||||
|
||||
// We may learn an update for this process
|
||||
// TODO maybe actually check if what the relay is sending us contains more information than what we have
|
||||
// relay should always have more info than us, but we never know
|
||||
// For now let's keep it simple and let the worker do the job
|
||||
} else {
|
||||
// We add it to db
|
||||
console.log(`Saving ${processId} to db`);
|
||||
toSave[processId] = process;
|
||||
}
|
||||
}
|
||||
|
||||
if (toSave && Object.keys(toSave).length > 0) {
|
||||
console.log('batch saving processes to db', toSave);
|
||||
await this.batchSaveProcessesToDb(toSave);
|
||||
}
|
||||
}
|
||||
}, 500)
|
||||
} catch (e) {
|
||||
console.error('Failed to parse init message:', e);
|
||||
@ -716,7 +709,7 @@ export class Service {
|
||||
for (const attribute of Object.keys(lastState.public_data)) {
|
||||
try {
|
||||
const value = this.decodeValue(lastState.public_data[attribute]);
|
||||
if (value) {
|
||||
if (value !== null && value !== undefined) {
|
||||
processData[attribute] = value;
|
||||
}
|
||||
} catch (e) {
|
||||
@ -992,31 +985,31 @@ export class Service {
|
||||
}
|
||||
|
||||
// Blob and data storage methods
|
||||
async saveBlobToDb(hash: string, data: Blob) {
|
||||
async saveBufferToDb(hash: string, data: Buffer) {
|
||||
const db = await Database.getInstance();
|
||||
try {
|
||||
await db.addObject({
|
||||
storeName: 'data',
|
||||
object: data,
|
||||
key: hash,
|
||||
});
|
||||
}, true);
|
||||
} catch (e) {
|
||||
console.error(`Failed to save data to db: ${e}`);
|
||||
}
|
||||
}
|
||||
|
||||
async getBlobFromDb(hash: string): Promise<Blob | null> {
|
||||
async getBufferFromDb(hash: string): Promise<Buffer | null> {
|
||||
const db = await Database.getInstance();
|
||||
try {
|
||||
return await db.getObject('data', hash);
|
||||
return await db.getObject('data', hash, true);
|
||||
} catch (e) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
async saveDataToStorage(hash: string, data: Blob, ttl: number | null) {
|
||||
async saveDataToStorage(hash: string, data: Buffer, ttl: number | null) {
|
||||
console.log('💾 Saving data to storage:', hash);
|
||||
// TODO: Implement actual storage service
|
||||
console.debug('TODO');
|
||||
// const storages = [STORAGEURL];
|
||||
// try {
|
||||
// await storeData(storages, hash, data, ttl);
|
||||
@ -1057,6 +1050,10 @@ export class Service {
|
||||
return uint8Array;
|
||||
}
|
||||
|
||||
hexToBuffer(hexString: string): Buffer {
|
||||
return Buffer.from(this.hexToUInt8Array(hexString));
|
||||
}
|
||||
|
||||
public async handleApiReturn(apiReturn: ApiReturn) {
|
||||
// Check for errors in the returned objects
|
||||
if (apiReturn.new_tx_to_send && apiReturn.new_tx_to_send.error) {
|
||||
@ -1121,9 +1118,9 @@ export class Service {
|
||||
|
||||
if (updatedProcess.encrypted_data && Object.keys(updatedProcess.encrypted_data).length != 0) {
|
||||
for (const [hash, cipher] of Object.entries(updatedProcess.encrypted_data)) {
|
||||
const blob = this.hexToBlob(cipher);
|
||||
const buffer = this.hexToBuffer(cipher);
|
||||
try {
|
||||
await this.saveBlobToDb(hash, blob);
|
||||
await this.saveBufferToDb(hash, buffer);
|
||||
} catch (e) {
|
||||
console.error(e);
|
||||
}
|
||||
@ -1144,9 +1141,9 @@ export class Service {
|
||||
|
||||
if (apiReturn.push_to_storage && apiReturn.push_to_storage.length != 0) {
|
||||
for (const hash of apiReturn.push_to_storage) {
|
||||
const blob = await this.getBlobFromDb(hash);
|
||||
if (blob) {
|
||||
await this.saveDataToStorage(hash, blob, null);
|
||||
const buffer = await this.getBufferFromDb(hash);
|
||||
if (buffer) {
|
||||
await this.saveDataToStorage(hash, buffer, null);
|
||||
} else {
|
||||
console.error('Failed to get data from db');
|
||||
}
|
||||
@ -1247,11 +1244,10 @@ export class Service {
|
||||
}
|
||||
|
||||
if (hash && key) {
|
||||
const blob = await this.getBlobFromDb(hash);
|
||||
if (blob) {
|
||||
const buffer = await this.getBufferFromDb(hash);
|
||||
if (buffer) {
|
||||
// Decrypt the data
|
||||
const buf = await blob.arrayBuffer();
|
||||
const cipher = new Uint8Array(buf);
|
||||
const cipher = new Uint8Array(buffer);
|
||||
|
||||
const keyUIntArray = this.hexToUInt8Array(key);
|
||||
|
||||
|
@ -366,15 +366,14 @@ export class Server {
|
||||
if (!processId || !stateId) {
|
||||
throw new Error('Failed to get process id or state id');
|
||||
}
|
||||
// now pair the device before continuing
|
||||
service.pairDevice(processId, [service.getDeviceAddress()]);
|
||||
await service.handleApiReturn(pairingResult);
|
||||
const udpateResult = await service.createPrdUpdate(processId, stateId);
|
||||
await service.handleApiReturn(udpateResult);
|
||||
const approveResult = await service.approveChange(processId, stateId);
|
||||
await service.handleApiReturn(approveResult);
|
||||
|
||||
// now pair the device
|
||||
service.pairDevice(processId, [service.getDeviceAddress()]);
|
||||
|
||||
// Update the device in the database
|
||||
const device = service.dumpDeviceFromMemory();
|
||||
if (device) {
|
||||
|
Loading…
x
Reference in New Issue
Block a user