MessageBus refactoring
* better error management * Keep tracks of when messages sent don't have answers * New convenient methods
This commit is contained in:
parent
68ecdf181f
commit
de2ff30960
@ -11,8 +11,8 @@ import { FileBlob } from '../front/Api/Entities/types';
|
|||||||
|
|
||||||
export default class MessageBus {
|
export default class MessageBus {
|
||||||
private static instance: MessageBus;
|
private static instance: MessageBus;
|
||||||
private errors: { [key: string]: string } = {};
|
|
||||||
private isListening: boolean = false;
|
private isListening: boolean = false;
|
||||||
|
private messagesSent: Set<string> = new Set();
|
||||||
|
|
||||||
public static getInstance(): MessageBus {
|
public static getInstance(): MessageBus {
|
||||||
if (!MessageBus.instance) {
|
if (!MessageBus.instance) {
|
||||||
@ -27,6 +27,11 @@ export default class MessageBus {
|
|||||||
|
|
||||||
public destroyMessageListener(): void {
|
public destroyMessageListener(): void {
|
||||||
window.removeEventListener('message', this.handleMessage.bind(this));
|
window.removeEventListener('message', this.handleMessage.bind(this));
|
||||||
|
this.isListening = false; // Reset the flag when destroying listener
|
||||||
|
}
|
||||||
|
|
||||||
|
public resetListeningState(): void {
|
||||||
|
this.isListening = false; // Allow external components to reset listening state
|
||||||
}
|
}
|
||||||
|
|
||||||
public isReady(): Promise<void> {
|
public isReady(): Promise<void> {
|
||||||
@ -41,20 +46,61 @@ export default class MessageBus {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public isWaitingForMessage(): boolean {
|
||||||
|
return this.messagesSent.size > 0;
|
||||||
|
}
|
||||||
|
|
||||||
public requestLink(): Promise<void> {
|
public requestLink(): Promise<void> {
|
||||||
return new Promise<void>((resolve: () => void, reject: (error: string) => void) => {
|
return new Promise<void>((resolve: () => void, reject: (error: string) => void) => {
|
||||||
const messageId = `REQUEST_LINK_${uuidv4()}`;
|
const messageId = `REQUEST_LINK_${uuidv4()}`;
|
||||||
|
console.log('[MessageBus] requestLink - waiting for messageId:', messageId);
|
||||||
|
|
||||||
const unsubscribe = EventBus.getInstance().on('LINK_ACCEPTED', (responseId: string, message: { accessToken: string, refreshToken: string }) => {
|
const unsubscribe = EventBus.getInstance().on('LINK_ACCEPTED', (responseId: string, message: { accessToken: string, refreshToken: string }) => {
|
||||||
|
console.log('[MessageBus] LINK_ACCEPTED received with responseId:', responseId, 'expected:', messageId);
|
||||||
if (responseId !== messageId) {
|
if (responseId !== messageId) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
console.log('[MessageBus] LINK_ACCEPTED matched - resolving');
|
||||||
unsubscribe();
|
unsubscribe();
|
||||||
|
unsubscribeError();
|
||||||
User.getInstance().setTokens(message.accessToken, message.refreshToken);
|
User.getInstance().setTokens(message.accessToken, message.refreshToken);
|
||||||
resolve();
|
resolve();
|
||||||
});
|
});
|
||||||
|
|
||||||
const unsubscribeError = EventBus.getInstance().on('ERROR_LINK_ACCEPTED', (responseId: string, error: string) => {
|
const unsubscribeError = EventBus.getInstance().on('ERROR_LINK_ACCEPTED', (responseId: string, error: string) => {
|
||||||
|
console.log('[MessageBus] ERROR_LINK_ACCEPTED received with responseId:', responseId, 'expected:', messageId);
|
||||||
|
if (responseId !== messageId) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
console.log('[MessageBus] ERROR_LINK_ACCEPTED matched - rejecting with error:', error);
|
||||||
|
unsubscribe();
|
||||||
|
unsubscribeError();
|
||||||
|
reject(error);
|
||||||
|
});
|
||||||
|
|
||||||
|
console.log('[MessageBus] requestLink - sending REQUEST_LINK message');
|
||||||
|
this.sendMessage({
|
||||||
|
type: 'REQUEST_LINK',
|
||||||
|
messageId
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
public createPairing(): Promise<void> {
|
||||||
|
return new Promise<void>((resolve: () => void, reject: (error: string) => void) => {
|
||||||
|
this.checkToken().then(() => {
|
||||||
|
const messageId = `CREATE_PAIRING_${uuidv4()}`;
|
||||||
|
|
||||||
|
const unsubscribe = EventBus.getInstance().on('PAIRING_CREATED', (responseId: string, pairingId: string) => {
|
||||||
|
if (responseId !== messageId) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
unsubscribe();
|
||||||
|
User.getInstance().setPairingId(pairingId);
|
||||||
|
resolve();
|
||||||
|
});
|
||||||
|
|
||||||
|
const unsubscribeError = EventBus.getInstance().on('ERROR_CREATE_PAIRING', (responseId: string, error: string) => {
|
||||||
if (responseId !== messageId) {
|
if (responseId !== messageId) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@ -62,10 +108,15 @@ export default class MessageBus {
|
|||||||
reject(error);
|
reject(error);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
const user = User.getInstance();
|
||||||
|
const accessToken = user.getAccessToken()!;
|
||||||
|
|
||||||
this.sendMessage({
|
this.sendMessage({
|
||||||
type: 'REQUEST_LINK',
|
type: 'CREATE_PAIRING',
|
||||||
|
accessToken,
|
||||||
messageId
|
messageId
|
||||||
});
|
});
|
||||||
|
}).catch(reject);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -98,7 +149,7 @@ export default class MessageBus {
|
|||||||
accessToken,
|
accessToken,
|
||||||
messageId
|
messageId
|
||||||
});
|
});
|
||||||
}).catch(console.error);
|
}).catch(reject);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -298,6 +349,114 @@ export default class MessageBus {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public getRolesForProcess(processId: string): Promise<any> {
|
||||||
|
return new Promise<any>((resolve: (roles: any[]) => void, reject: (error: string) => void) => {
|
||||||
|
this.getAllProcesses().then((processes: any) => {
|
||||||
|
const process = processes[processId];
|
||||||
|
|
||||||
|
if (!process.states || process.states.length < 2) {
|
||||||
|
reject('No states found for process');
|
||||||
|
}
|
||||||
|
|
||||||
|
const roles = process.states[process.states.length - 2].roles;
|
||||||
|
if (!roles) {
|
||||||
|
reject('No roles found for process');
|
||||||
|
}
|
||||||
|
|
||||||
|
resolve(roles);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// Returns all processes details, including processes we only have public data for
|
||||||
|
public getAllProcessesDecoded(filterPublicValues: (publicValues: { [key: string]: any }) => boolean): Promise<any[]> {
|
||||||
|
return new Promise<any[]>((resolve: (processesDecoded: any[]) => void, reject: (error: string) => void) => {
|
||||||
|
this.getAllProcesses().then(async (processes: any) => {
|
||||||
|
const processesDecoded: any[] = [];
|
||||||
|
|
||||||
|
for (const processId of Object.keys(processes)) {
|
||||||
|
const process = processes[processId];
|
||||||
|
if (!process.states) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
const publicDataDecoded: { [key: string]: any } = {};
|
||||||
|
|
||||||
|
for (let stateId = 0; stateId < process.states.length - 1; stateId++) {
|
||||||
|
const state = process.states[stateId];
|
||||||
|
if (!state) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
const publicDataEncoded = state.public_data;
|
||||||
|
if (!publicDataEncoded) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (const key of Object.keys(publicDataEncoded)) {
|
||||||
|
publicDataDecoded[key] = await this.getPublicData(publicDataEncoded[key]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!filterPublicValues(publicDataDecoded)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
let processDecoded: any;
|
||||||
|
|
||||||
|
for (let stateId = 0; stateId < process.states.length - 1; stateId++) {
|
||||||
|
const lastState = process.states[stateId];
|
||||||
|
if (!lastState) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
const lastStateId = lastState.state_id;
|
||||||
|
if (!lastStateId) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
let processData = await this.getData(processId, lastStateId);
|
||||||
|
if (!processData) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
const isEmpty = Object.keys(processData).length === 0;
|
||||||
|
if (isEmpty) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (const key of Object.keys(publicDataDecoded)) {
|
||||||
|
processData[key] = publicDataDecoded[key];
|
||||||
|
}
|
||||||
|
processData = MapUtils.toJson(processData);
|
||||||
|
|
||||||
|
if (!processDecoded) {
|
||||||
|
processDecoded = {
|
||||||
|
processId,
|
||||||
|
lastStateId,
|
||||||
|
processData,
|
||||||
|
};
|
||||||
|
} else {
|
||||||
|
for (const key of Object.keys(processData)) {
|
||||||
|
processDecoded.processData[key] = processData[key];
|
||||||
|
}
|
||||||
|
processDecoded.lastStateId = lastStateId;
|
||||||
|
}
|
||||||
|
} catch (error) {
|
||||||
|
console.error(error);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
processesDecoded.push(processDecoded);
|
||||||
|
}
|
||||||
|
|
||||||
|
resolve(processesDecoded);
|
||||||
|
}).catch(reject);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// Returns details about processes that we are involved in
|
||||||
public getProcessesDecoded(filterPublicValues: (publicValues: { [key: string]: any }) => boolean): Promise<any[]> {
|
public getProcessesDecoded(filterPublicValues: (publicValues: { [key: string]: any }) => boolean): Promise<any[]> {
|
||||||
return new Promise<any[]>((resolve: (processesDecoded: any[]) => void, reject: (error: string) => void) => {
|
return new Promise<any[]>((resolve: (processesDecoded: any[]) => void, reject: (error: string) => void) => {
|
||||||
this.getProcesses().then(async (processes: any) => {
|
this.getProcesses().then(async (processes: any) => {
|
||||||
@ -417,7 +576,7 @@ export default class MessageBus {
|
|||||||
accessToken,
|
accessToken,
|
||||||
messageId
|
messageId
|
||||||
});
|
});
|
||||||
}).catch(console.error);
|
}).catch(reject);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -454,12 +613,48 @@ export default class MessageBus {
|
|||||||
accessToken,
|
accessToken,
|
||||||
messageId
|
messageId
|
||||||
});
|
});
|
||||||
}).catch(console.error);
|
}).catch(reject);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
public getProcesses(): Promise<any> {
|
// Returns all processes, including processes that have nothing to do with us
|
||||||
return new Promise<any>((resolve: (processes: any) => void, reject: (error: string) => void) => {
|
public getAllProcesses(): Promise<{ [processId: string]: any }> {
|
||||||
|
return new Promise<{ [processId: string]: any }>((resolve: (processes: { [processId: string]: any }) => void, reject: (error: string) => void) => {
|
||||||
|
this.checkToken().then(() => {
|
||||||
|
const messageId = `GET_PROCESSES_${uuidv4()}`;
|
||||||
|
|
||||||
|
const unsubscribe = EventBus.getInstance().on('PROCESSES_RETRIEVED', (responseId: string, processes: any) => {
|
||||||
|
if (responseId !== messageId) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
unsubscribe();
|
||||||
|
|
||||||
|
resolve(processes);
|
||||||
|
});
|
||||||
|
|
||||||
|
const unsubscribeError = EventBus.getInstance().on('ERROR_PROCESSES_RETRIEVED', (responseId: string, error: string) => {
|
||||||
|
if (responseId !== messageId) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
unsubscribeError();
|
||||||
|
reject(error);
|
||||||
|
});
|
||||||
|
|
||||||
|
const user = User.getInstance();
|
||||||
|
const accessToken = user.getAccessToken()!;
|
||||||
|
|
||||||
|
this.sendMessage({
|
||||||
|
type: 'GET_PROCESSES',
|
||||||
|
accessToken,
|
||||||
|
messageId
|
||||||
|
});
|
||||||
|
}).catch(reject);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// Returns processes that we are involved in
|
||||||
|
public getProcesses(): Promise<{ [processId: string]: any }> {
|
||||||
|
return new Promise<{ [processId: string]: any }>((resolve: (processes: { [processId: string]: any }) => void, reject: (error: string) => void) => {
|
||||||
this.checkToken().then(() => {
|
this.checkToken().then(() => {
|
||||||
const messageId = `GET_PROCESSES_${uuidv4()}`;
|
const messageId = `GET_PROCESSES_${uuidv4()}`;
|
||||||
|
|
||||||
@ -471,7 +666,7 @@ export default class MessageBus {
|
|||||||
|
|
||||||
// Filter processes by my processes
|
// Filter processes by my processes
|
||||||
setTimeout(() => {
|
setTimeout(() => {
|
||||||
this.getMyProcesses().then((myProcesses: any) => {
|
this.getMyProcesses().then((myProcesses: string[]) => {
|
||||||
const processesFiltered: { [processId: string]: any } = {};
|
const processesFiltered: { [processId: string]: any } = {};
|
||||||
for (const processId of myProcesses) {
|
for (const processId of myProcesses) {
|
||||||
const process = processes[processId];
|
const process = processes[processId];
|
||||||
@ -500,16 +695,18 @@ export default class MessageBus {
|
|||||||
accessToken,
|
accessToken,
|
||||||
messageId
|
messageId
|
||||||
});
|
});
|
||||||
}).catch(console.error);
|
}).catch(reject);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
public getMyProcesses(): Promise<any> {
|
// Returns the processes id of processes we are involved in
|
||||||
return new Promise<any>((resolve: (processes: any) => void, reject: (error: string) => void) => {
|
// It's meant to be used to filter processes in the getProcesses() method
|
||||||
|
public getMyProcesses(): Promise<string[]> {
|
||||||
|
return new Promise<string[]>((resolve: (processes: string[]) => void, reject: (error: string) => void) => {
|
||||||
this.checkToken().then(() => {
|
this.checkToken().then(() => {
|
||||||
const messageId = `GET_MY_PROCESSES_${uuidv4()}`;
|
const messageId = `GET_MY_PROCESSES_${uuidv4()}`;
|
||||||
|
|
||||||
const unsubscribe = EventBus.getInstance().on('GET_MY_PROCESSES', (responseId: string, processes: any) => {
|
const unsubscribe = EventBus.getInstance().on('GET_MY_PROCESSES', (responseId: string, processes: string[]) => {
|
||||||
if (responseId !== messageId) {
|
if (responseId !== messageId) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@ -533,7 +730,7 @@ export default class MessageBus {
|
|||||||
accessToken,
|
accessToken,
|
||||||
messageId
|
messageId
|
||||||
});
|
});
|
||||||
}).catch(console.error);
|
}).catch(reject);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -568,7 +765,7 @@ export default class MessageBus {
|
|||||||
accessToken,
|
accessToken,
|
||||||
messageId
|
messageId
|
||||||
});
|
});
|
||||||
}).catch(console.error);
|
}).catch(reject);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -603,7 +800,7 @@ export default class MessageBus {
|
|||||||
accessToken,
|
accessToken,
|
||||||
messageId
|
messageId
|
||||||
});
|
});
|
||||||
}).catch(console.error);
|
}).catch(reject);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -638,7 +835,7 @@ export default class MessageBus {
|
|||||||
accessToken,
|
accessToken,
|
||||||
messageId
|
messageId
|
||||||
});
|
});
|
||||||
}).catch(console.error);
|
}).catch(reject);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -672,7 +869,7 @@ export default class MessageBus {
|
|||||||
accessToken,
|
accessToken,
|
||||||
messageId
|
messageId
|
||||||
});
|
});
|
||||||
}).catch(console.error);
|
}).catch(reject);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -874,6 +1071,7 @@ export default class MessageBus {
|
|||||||
const targetOrigin = IframeReference.getTargetOrigin();
|
const targetOrigin = IframeReference.getTargetOrigin();
|
||||||
const iframe = IframeReference.getIframe();
|
const iframe = IframeReference.getIframe();
|
||||||
iframe.contentWindow?.postMessage(message, targetOrigin);
|
iframe.contentWindow?.postMessage(message, targetOrigin);
|
||||||
|
this.messagesSent.add(message.messageId);
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
console.error('[MessageBus] sendMessage: error', error);
|
console.error('[MessageBus] sendMessage: error', error);
|
||||||
}
|
}
|
||||||
@ -975,19 +1173,26 @@ export default class MessageBus {
|
|||||||
|
|
||||||
case 'ERROR':
|
case 'ERROR':
|
||||||
console.error('Error:', message);
|
console.error('Error:', message);
|
||||||
this.errors[message.messageId] = message.error;
|
// Extract operation type from messageId by splitting on last underscore
|
||||||
|
const operationType = this.extractOperationTypeFromMessageId(message.messageId);
|
||||||
|
EventBus.getInstance().emit(`ERROR_${operationType}`, message.messageId, message.error);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private doHandleMessage(messageId: string, messageType: string, message: any, callback: (message: any) => any) {
|
private extractOperationTypeFromMessageId(messageId: string): string {
|
||||||
if (this.errors[messageId]) {
|
// Split on last underscore to extract operation type
|
||||||
const error = this.errors[messageId];
|
// e.g., "GET_PAIRING_ID_abc123" -> "GET_PAIRING_ID"
|
||||||
delete this.errors[messageId];
|
const lastUnderscoreIndex = messageId.lastIndexOf('_');
|
||||||
EventBus.getInstance().emit(`ERROR_${messageType}`, messageId, error);
|
if (lastUnderscoreIndex === -1) {
|
||||||
return;
|
return messageId; // No underscore found, return as-is
|
||||||
}
|
}
|
||||||
|
return messageId.substring(0, lastUnderscoreIndex);
|
||||||
|
}
|
||||||
|
|
||||||
|
private doHandleMessage(messageId: string, messageType: string, message: any, callback: (message: any) => any) {
|
||||||
EventBus.getInstance().emit('MESSAGE_RECEIVED', message);
|
EventBus.getInstance().emit('MESSAGE_RECEIVED', message);
|
||||||
EventBus.getInstance().emit(messageType, messageId, callback(message));
|
EventBus.getInstance().emit(messageType, messageId, callback(message));
|
||||||
|
this.messagesSent.delete(messageId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user