import { logoutUser } from '@features/auth/store/auth.actions';
import { IWSOnsiteNotification } from '@features/onsiteNotifications/models/onsiteNotification-message.model';
import { openNotificationsSocket } from '@features/onsiteNotifications/store/onsiteNotifications.actions';
import {
	closeNotificationsSocket,
	onsiteNotificationReceivedWS,
	setIsNotificationsSocketOpened,
} from '@features/onsiteNotifications/store/onsiteNotifications.reducer';
import { RxStompState } from '@stomp/rx-stomp';
import { resetState } from '@store/root.actions';
import { ofType } from 'redux-observable';
import { from } from 'rxjs';
import { filter, map, switchMap, tap, withLatestFrom } from 'rxjs/operators';
import { IReduxObservableEpic } from './models/epic.model';
import RxStompSocket from './rxStompSocket';
import { wsLogger } from '@utils/wsLogger';

export const rxStompNotifications = new RxStompSocket({ heartBeatTimeMs: 25000 });

const notificationsEpics: IReduxObservableEpic[] = [];

const openNotificationsSocketEpic: IReduxObservableEpic = (actions$, state$) => {
	return actions$.pipe(
		ofType(openNotificationsSocket.fulfilled.type),
		map(({ payload }) => payload.data.token),
		withLatestFrom(state$.pipe(map(({ auth }) => auth.user?.id))),
		map(([token, userId]) => ({ token, userId })),
		filter(({ userId }) => !!userId),
		filter(() => !rxStompNotifications.active),
		tap(({ token, userId }) =>
			rxStompNotifications.openConnection({ url: `/notification?token=${token}&receiverId=${userId}` })
		),
		tap(() => rxStompNotifications.activate()),
		switchMap(({ userId }) => rxStompNotifications.watch(`/notification/${userId}`)),
		tap((message) => wsLogger(message, 'notifications')),
		map(({ body }) => {
			const notification: IWSOnsiteNotification = JSON.parse(body);
			notification.data.dateReceived = new Date().toISOString();
			return notification;
		}),
		map((payload: IWSOnsiteNotification) =>
			onsiteNotificationReceivedWS({
				type: payload.type,
				data: { ...payload.data, type: 'WS' },
			})
		)
	);
};

const closeNotificationsSocketEpic: IReduxObservableEpic = (actions$) => {
	return actions$.pipe(
		ofType(logoutUser.fulfilled.type, resetState.type),
		filter(() => rxStompNotifications.active),
		switchMap(() => from(rxStompNotifications.deactivate())),
		map(closeNotificationsSocket)
	);
};

const notificationsSocketStateEpic: IReduxObservableEpic = (actions$, state$) => {
	return actions$.pipe(
		ofType(openNotificationsSocket.fulfilled.type),
		filter(() => rxStompNotifications.active),
		switchMap(() => rxStompNotifications.connectionState$),
		withLatestFrom(state$.pipe(map((state) => state.onsiteNotifications.isOpened))),
		map(([connectionState, isOpened]) => ({ connectionState, isOpened })),
		filter(
			({ connectionState, isOpened }) =>
				(isOpened && connectionState === RxStompState.CLOSED) ||
				(!isOpened && connectionState === RxStompState.OPEN)
		),
		map(({ connectionState }) => setIsNotificationsSocketOpened(connectionState === RxStompState.OPEN))
	);
};

notificationsEpics.push(openNotificationsSocketEpic, closeNotificationsSocketEpic, notificationsSocketStateEpic);

export default notificationsEpics;
