diff --git a/src/pages/chat/chat.ts b/src/pages/chat/chat.ts index 0bb6d4d..646de09 100755 --- a/src/pages/chat/chat.ts +++ b/src/pages/chat/chat.ts @@ -335,10 +335,12 @@ class ChatElement extends HTMLElement { newState.message = message; + console.log(`Creating state ${newState}`); // Now we create a new state for the dm process - const apiReturn = service.updateProcess(this.processId, newState); + const apiReturn = await service.updateProcess(this.processId, newState); const updatedProcess = apiReturn.updated_process.current_process; const newStateId = updatedProcess.states[updatedProcess.states.length - 2 ].state_id; // We take the last concurrent state, just before the tip + console.log(`newStateId: ${newStateId}`); await service.handleApiReturn(apiReturn); const createPrdReturn = service.createPrdUpdate(this.processId, newStateId); @@ -347,6 +349,8 @@ class ChatElement extends HTMLElement { // Now we validate the new state const approveChangeReturn = service.approveChange(this.processId, newStateId); await service.handleApiReturn(approveChangeReturn); + + await this.loadMemberChat(this.selectedMember); } catch (error) { console.error('❌ Error in sendMessage:', error); } @@ -482,7 +486,6 @@ class ChatElement extends HTMLElement { const recipientAddresses = memberAddresses.sp_addresses; for (const [processId, process] of Object.entries(processes)) { const description = await service.getDescription(processId, process); - // console.log('Process description:', description); if (description !== "dm") { continue; } @@ -555,10 +558,10 @@ class ChatElement extends HTMLElement { return; } - // while (dmProcessId === null) { - // dmProcessId = await this.lookForDmProcess(); - // await new Promise(r => setTimeout(r, 1000)); - // } + while (dmProcessId === null) { + dmProcessId = await this.lookForDmProcess(); + await new Promise(r => setTimeout(r, 1000)); + } } else { console.log('Found DM process', dmProcessId); this.processId = dmProcessId; @@ -614,10 +617,9 @@ class ChatElement extends HTMLElement { const messageContent = document.createElement('div'); messageContent.className = 'message user'; - // console.log("SENDER: ", message.metadata.sender); const pairingProcess = await this.getMyProcessId(); const senderEmoji = await addressToEmoji(pairingProcess); - + if (message.type === 'file') { let fileContent = ''; if (message.content.type.startsWith('image/')) { @@ -934,8 +936,6 @@ class ChatElement extends HTMLElement { return null; } - console.log('lastDifferentState (roles):', lastDifferentState); - // Take the roles out of the state const roles = lastDifferentState!.pcd_commitment['roles']; if (roles) { @@ -943,8 +943,6 @@ class ChatElement extends HTMLElement { if (userDiff) { console.log("Successfully retrieved userDiff:", userDiff); return userDiff.new_value; - } else { - console.log("Failed to retrieve a non-null userDiff."); } } @@ -1428,8 +1426,7 @@ class ChatElement extends HTMLElement { } const hasCurrentUser = Object.values(roles).some(role => - (role as { members: { sp_addresses: string[] }[] }).members - .some(member => member.sp_addresses.includes(currentMember[0])) + this.rolesContainsUs(role) ); if (hasCurrentUser) { diff --git a/src/services/service.ts b/src/services/service.ts index b0ba574..e01f139 100755 --- a/src/services/service.ts +++ b/src/services/service.ts @@ -394,7 +394,20 @@ export default class Services { } } - public updateProcess(processId: string, new_state: any): ApiReturn { + public async updateProcess(processId: string, new_state: any): Promise { + const roles = new_state.roles; + if (!roles) { + throw new Error('new state doesn\'t contain roles'); + } + + let members = new Set(); + for (const role of Object.values(roles)) { + for (const member of role.members) { + members.add(member) + } + } + console.log(members); + await this.checkConnections([...members]); try { return this.sdkClient.update_process(processId, JSON.stringify(new_state)); } catch (e) { @@ -510,6 +523,28 @@ export default class Services { } } + private async getCipherForDiff(diff: UserDiff): Promise { + // get the process + try { + const process = await this.getProcess(diff.process_id); + } catch (e) { + console.error('Failed to get process:', e); + return null; + } + const state = process.states.find(state => state.state_id === diff.state_id); + if (state) { + // Now we return the encrypted value for that field + const cipher = state.encrypted_pcd[diff.field]; + if (cipher) { + return cipher; + } else { + console.error('Failed to get encrypted value'); + } + } + + return null; + } + public async tryFetchDiffValue(diffs: UserDiff[]): Promise<[UserDiff[], Record]>{ if (diffs.length === 0) { return [[], {}]; @@ -518,12 +553,15 @@ export default class Services { // We check if we have the value in diffs let retrievedValues: Record = {}; for (const diff of diffs) { + const hash = diff.value_commitment; + if (!hash) { + console.error('No commitment for diff'); + continue; + } + + const value = diff.new_value; // Check if `new_value` is missing - if (diff.new_value === null) { - const hash = diff.value_commitment; - if (!hash) { - console.error('No commitment for diff'); - } + if (value === null) { try { const res = await this.fetchValueFromStorage(hash); if (!res) { @@ -535,6 +573,36 @@ export default class Services { } catch (error) { console.error(`Failed to fetch new_value for diff: ${JSON.stringify(diff)}`, error); } + } else { + // We should have it in db if it came from the wasm, but just in case + try { + await this.saveDiffsToDb(diff); + } catch (e) { + console.error(`Failed to save diff to db: ${e}`); + } + + // We already have this value, so we check if it's on storage and push it if not + const onStorage = this.testDataInStorage(hash); + if (onStorage === null) { + console.error('Failed to test data presence in storage'); + continue; + } + if (!onStorage) { + // We push the encrypted data on storage with default ttl + // We need to take the encrypted data from the state + const cipher = await getCipherForDiff(diff); + if (cipher) { + try { + await this.saveDataToStorage(hash, cipher, null); + } catch (e) { + console.error(`Failed to save to storage: ${e}`); + } + } + } else { + // We could pump the ttl here + // for now, do nothing + continue; + } } } @@ -580,19 +648,19 @@ export default class Services { if (apiReturn.updated_process) { const updatedProcess = apiReturn.updated_process; - const processId: string = updatedProcess.commitment_tx; + const processId: string = updatedProcess.process_id; - // Save process to storage + // Save process to db try { - await this.saveProcess(processId, updatedProcess.current_process); + await this.saveProcessToDb(processId, updatedProcess.current_process); } catch (e) { throw e; } const isPaired = this.isPaired(); - if (updatedProcess.new_diffs.length != 0) { - const [updatedDiffs, retrievedValues] = await this.tryFetchDiffValue(updatedProcess.new_diffs); + if (updatedProcess.diffs && updatedProcess.diffs.length != 0) { + const [updatedDiffs, retrievedValues] = await this.tryFetchDiffValue(updatedProcess.diffs); if (Object.entries(retrievedValues).length != 0) { const stateId = updatedDiffs[0].state_id; const processId = updatedDiffs[0].process_id; @@ -602,7 +670,7 @@ export default class Services { await this.handleApiReturn(apiReturn); } else { try { - await this.saveDiffs(updatedDiffs); + await this.saveDiffsToDb(updatedDiffs); } catch (e) { throw e; } @@ -759,6 +827,15 @@ export default class Services { return true; } + membersInSameRoleThanUs(roles: any): Member[] | null { + try { + return this.sdkClient.members_in_same_roles_me(JSON.stringify(roles)); + } catch (e) { + console.error(e); + return null; + } + } + async dumpWallet() { const wallet = await this.sdkClient.dump_wallet(); console.log('🚀 ~ Services ~ dumpWallet ~ wallet:', wallet); @@ -794,18 +871,31 @@ export default class Services { } } - public async saveProcess(commitedIn: string, process: Process) { + private async removeProcess(processId: string): Promise { + const db = await Database.getInstance(); + const storeName = 'processes'; + + try { + await db.deleteObject(storeName, processId); + } catch (e) { + console.error(e); + } + } + + public async saveProcessToDb(processId: string, process: Process) { const db = await Database.getInstance(); try { await db.addObject({ storeName: 'processes', object: process, - key: commitedIn, + key: processId, }); } catch (e) { throw new Error(`Failed to save process: ${e}`); } + } + public async saveStatesToStorage(process: Process, state_ids: string[]) { // We check how many copies in storage nodes // We check the storage nodes in the process itself // this.sdkClient.get_storages(commitedIn); @@ -819,21 +909,39 @@ export default class Services { console.warn('Empty encrypted pcd, skipping...'); continue; } - for (const [field, hash] of Object.entries(state.pcd_commitment)) { - // get the encrypted value with the field name - const value = state.encrypted_pcd[field]; - await storeData(storages, hash, value, null); + if (state_ids.includes(state.state_id)) { + for (const [field, hash] of Object.entries(state.pcd_commitment)) { + // get the encrypted value with the field name + const value = state.encrypted_pcd[field]; + await storeData(storages, hash, value, null); + } } } } + public async saveDataToStorage(hash: string, data: string, ttl: number | null) { + const storages = [storageUrl]; + + try { + await storeData(storages, hash, value, ttl); + } catch (e) { + console.error(`Failed to store data with hash ${hash}: ${e}`); + } + } + public async fetchValueFromStorage(hash: string): Promise { const storages = [storageUrl]; return await retrieveData(storages, hash); } - public async saveDiffs(diffs: UserDiff[]) { + public async testDataInStorage(hash: string): Promise { + const storages = [storageUrl]; + + return await testData(storages, hash); + } + + public async saveDiffsToDb(diffs: UserDiff[]) { const db = await Database.getInstance(); try { for (const diff of diffs) { @@ -890,20 +998,14 @@ export default class Services { await this.restoreProcessesFromDB(); } + // Match what we get from relay against what we already know and fetch missing data public async updateProcessesFromRelay(processes: Record) { const db = await Database.getInstance(); for (const [processId, process] of Object.entries(processes)) { - const processFromDb = await this.getProcess(processId); - if (!processFromDb) { - console.log(`Unknown process ${processId}, adding to db`); - await db.addObject({ storeName: 'processes', object: process, key: processId}); - } else { - const missingStates = process.states.length - processFromDb.states.length; - if (missingStates < 0) { - // We are missing one or more states for a known process - // TODO send a request to process managers to get the missing states - console.log(`Missing ${missingStates} for process ${processId}`) - } + try { + this.sdkClient.sync_process_from_relay(processId, JSON.stringify(process)); + } catch (e) { + console.error(e); } } } @@ -976,7 +1078,6 @@ export default class Services { const service = await Services.getInstance(); // Get the `commited_in` value of the last state and remove it from the array const currentCommitedIn = process.states.at(-1)?.commited_in; - console.log('Current CommitedIn:' currentCommitedIn) if (currentCommitedIn === undefined) { return null; // No states available @@ -1002,10 +1103,7 @@ export default class Services { if (description) { const userDiff = await service.getDiffByValue(description); if (userDiff) { - console.log("Successfully retrieved userDiff:", userDiff); return userDiff.new_value; - } else { - console.log("Failed to retrieve a non-null userDiff."); } } @@ -1097,6 +1195,7 @@ export default class Services { } public async handleHandshakeMsg(url: string, parsedMsg: any) { + const us = this try { const handshakeMsg: HandshakeMessage = JSON.parse(parsedMsg); this.updateRelay(url, handshakeMsg.sp_address); @@ -1104,6 +1203,8 @@ export default class Services { if (this.membersList && Object.keys(this.membersList).length === 0) { this.membersList = handshakeMsg.peers_list; } else { + // console.log('Received members:'); + // console.log(handshakeMsg.peers_list); for (const [processId, member] of Object.entries(handshakeMsg.peers_list)) { this.membersList[processId] = member; } @@ -1112,6 +1213,43 @@ export default class Services { setTimeout(async () => { const newProcesses = handshakeMsg.processes_list; if (newProcesses && Object.keys(newProcesses).length !== 0) { + for (const [processId, process] of Object.entries(newProcesses)) { + // We check if we're part of the process + if (process.states.length < 2) continue; + let stateIds = []; + let managers = new Set(); + for (const state of process.states) { + if (state.encrypted_pcd === null) continue; + const roles = state.encrypted_pcd['roles']; + if (!roles) { + console.error('Can\'t find roles'); + continue; + } + + if (this.rolesContainsUs(roles)) { + // We add this state to the list to request + stateIds.push(state.state_id); + } else { + continue; + } + + // For now we just add everyone that is in the same role than us + // const sendTo = this.membersInSameRoleThanUs(roles); + for (const [_, role] of Object.entries(roles)) { + if (!role.members.includes(us)) continue; + for (const member of role.members) { + if (member !== us) { + managers.push(member); + } + } + } + } + try { + this.sdkClient.request_data(processId, stateIds, managers); + } catch (e) { + console.error(e); + } + } await this.updateProcessesFromRelay(newProcesses); } }, 500) diff --git a/src/services/storage.service.ts b/src/services/storage.service.ts index 23d344f..06daae8 100644 --- a/src/services/storage.service.ts +++ b/src/services/storage.service.ts @@ -4,14 +4,16 @@ export async function storeData(servers: string[], key: string, value: any, ttl: for (const server of servers) { try { const response = await axios.post(`${server}/store`, { key, value, ttl }); - console.log('Data stored successfully:', response.data); + console.log('Data stored successfully:', key); if (response.status !== 200) { console.error('Received response status', response.status); continue; } - console.log('Stored data:', response.data); return response; } catch (error) { + if (error?.response?.status === 409) { + return null; + } console.error('Error storing data:', error); } } @@ -34,3 +36,33 @@ export async function retrieveData(servers: string[], key: string): Promise { + for (const server of servers) { + try { + const response = await axios.get(`${server}/test/${key}`); + if (response.status !== 200) { + console.error('Test response status', response.status); + continue; + } + + const data: TestResponse = response.data; + + if (data.value === true) { + return true; + } else { + // Keep looking + continue; + } + } catch (error) { + console.error('Error retrieving data:', error); + } + } + + return null; +}