import { RxStomp as RxStompMain, RxStompState } from '@stomp/rx-stomp';
import { fromEvent, interval, merge, Observable, Subject, Subscription } from 'rxjs';
import { map, switchMap, tap } from 'rxjs/operators';
import { default as SockJsMain } from 'sockjs-client';

interface IOpenConnectionProps {
	url: string;
	options?: SockJS.Options;
}

interface IRxStompSocketConstructor {
	heartBeatTimeMs: number;
}

type IWSOnlineStateConnectionEvent = 'OFFLINE_EVENT' | 'ONLINE_EVENT';
type IWSConnectionEvent = 'COUNTER_EVENT' | 'SOCKET_EVENT' | IWSOnlineStateConnectionEvent;

class RxStompSocket extends RxStompMain {
	private monitorConnectionSubscription: Subscription | undefined;
	private heartBeatTimeMs: number;

	constructor({ heartBeatTimeMs }: IRxStompSocketConstructor) {
		super();
		this.heartBeatTimeMs = heartBeatTimeMs;
		this.stompClient.onDisconnect = () => {
			this.monitorConnectionSubscription?.unsubscribe();
			this.reconnect();
		};
	}

	openConnection({ url, options }: IOpenConnectionProps) {
		const sockJs = new this.SockJS(url, options);

		this.configure({ webSocketFactory: () => sockJs, reconnectDelay: 2000 });
		this.activate();
		this.monitorConnectionSubscription = this.monitorConnection(sockJs.message$);
	}

	private monitorConnection(messages$: Observable<any>) {
		let heartbeatTimeLeft = this.heartBeatTimeMs;

		return this.connected$
			.pipe(
				switchMap(() =>
					merge(
						fromEvent(window, 'online').pipe(map((): IWSOnlineStateConnectionEvent => 'ONLINE_EVENT')),
						fromEvent(window, 'offline').pipe(map((): IWSOnlineStateConnectionEvent => 'OFFLINE_EVENT')),
						interval(1000).pipe(map((): IWSConnectionEvent => 'COUNTER_EVENT')),
						messages$.pipe(map((): IWSConnectionEvent => 'SOCKET_EVENT'))
					)
				),
				map((event: IWSConnectionEvent) => {
					switch (event) {
						case 'COUNTER_EVENT':
							heartbeatTimeLeft -= 1000;
							break;
						case 'SOCKET_EVENT':
							heartbeatTimeLeft = this.heartBeatTimeMs;
							break;
						case 'OFFLINE_EVENT':
							heartbeatTimeLeft = -1000;
							break;
						case 'ONLINE_EVENT':
							return true;
					}
					return heartbeatTimeLeft < 0;
				}),
				tap((shouldReconnect) => shouldReconnect && this.activate())
			)
			.subscribe();
	}

	private SockJS = class extends SockJsMain {
		private _message$: Subject<any>;

		constructor(url: string, options?: SockJS.Options) {
			const { protocol, hostname, port } = window.location;
			const prefix = `${protocol}//${hostname}${
				process.env.NODE_ENV === 'development' ? `:${port}` : ''
			}/api/v1/ws`;
			const urlSocket = `${prefix}${url}`;

			super(urlSocket, undefined, options);
			this._message$ = new Subject();
		}

		_transportMessage(message: any) {
			//@ts-ignore
			super._transportMessage(message);

			this._message$.next(message);
		}

		public get message$(): Subject<any> {
			return this._message$;
		}
	};

	private reconnect() {
		if (this.connectionState$.value === RxStompState.CLOSED) {
			this.activate();
		}
	}
}

export default RxStompSocket;
