import { getListener, initializeListenerConnection } from '@marlin/shared/utils-realtime-communication';
import { BehaviorSubject, Observable, throwError } from 'rxjs';
import { catchError, map } from 'rxjs/operators';

import { TFlowLoopNotification, flowLoopNotificationSchema } from '../loop-detected-notification.model.schema';
import { SYSTEM_MAP_EVENTS } from './system-map-events.enum';

export class TimeoutError extends Error {}

export const getSystemLoopNotificationSubject = (): BehaviorSubject<TFlowLoopNotification | undefined> => {
  return getListener<TFlowLoopNotification | undefined>(SYSTEM_MAP_EVENTS.SEND_SYSTEM_LOOP_UPDATE, undefined);
};

const mapUpdateOperator = (
  source$: Observable<TFlowLoopNotification | undefined>
): Observable<TFlowLoopNotification | undefined> =>
  source$.pipe(
    map((notification: TFlowLoopNotification | undefined) => {
      const parsedResult = flowLoopNotificationSchema.safeParse(notification);
      if (!parsedResult.success) {
        return undefined;
      }
      return parsedResult.data;
    }),
    catchError((error) => {
      return throwError(error);
    })
  );

export const initSubject = (): Observable<void> => {
  return initializeListenerConnection<TFlowLoopNotification | undefined>(
    SYSTEM_MAP_EVENTS.SEND_SYSTEM_LOOP_UPDATE,
    undefined,
    mapUpdateOperator
  );
};
