import { RestConnection } from '../RestConnection';
import { ConnectionState } from './ConnectionState';
import * as Helpers from '../../Helpers/Helpers';
import { RestResponse } from '../RestResponse';
import HttpStatusCode from '../../Client/Enumerations/HttpStatusCode';
import { StreamerBase } from './StreamerBase';
import { FieldObject } from '../../Helpers/FieldObject';
import { EventSourcePolyfill } from 'ng-event-source';
import { IFieldObject } from '../../Client/Interface/IFieldObject';

export class Streamer extends StreamerBase {
    // #region Fields

    private _evtSource: EventSourcePolyfill | null = null;
    private _ws: WebSocket | null = null;

    // #endregion

    // #region Properties

    // #endregion

    // #region Constructor, Destructor, Dispose

    constructor(parent: RestConnection) {
        super(parent);
    }

    // #endregion

    // #region Override methods

    public async startStreamerAsync(cancelToken?: Helpers.CancellationToken | null): Promise<RestResponse> {
        this._streamerStarted = true;
        if (this._startStreamerToken != null) {
            this._startStreamerToken.cancelToken();
            this._startStreamerToken = null;
        }
        if (cancelToken != null) {
            this._startStreamerToken = cancelToken;
        }
        await this.onStateChangedAsync(ConnectionState.Connecting);

        let response: RestResponse;
        this._parent.isWebSocket = false;
        if (this._parent.enableWebSocket === true) {
            // Connect with WebSocket
            response = await this.startWebSocket();
            if (response.statusCode === HttpStatusCode.OK) {
                this._parent.isWebSocket = true;
                return response;
            }
        }
        if (this._parent.enableSSE === true) {
            // Connect with SSE
            return await this.startSSE();
        } else {
            return new RestResponse(HttpStatusCode.INTERNAL_SERVER_ERROR);
        }
    }

    public writeMessageToWebSocket(message: string) {
        if (this._parent.isWebSocket && this._ws) {
            this._ws.send(message);
        }
    }

    protected async stopStreamerInternalImplAsync() {
        try {
            await this.invokeOnStreamerAsync('Disconnect');
        } catch (e) {
            // ignore error... we might be already disconnected
        }

        if (this._evtSource) {
            this._evtSource.close();
            this._evtSource = null;
        }
        if (this._ws) {
            this._ws.close();
            this._ws = null;
        }
        if (this._startStreamerToken != null) {
            this._startStreamerToken.cancelToken();
            this._startStreamerToken = null;
        }
    }

    // #endregion

    // #region Private methods

    private async startSSE(): Promise<RestResponse> {
        const url = 'v2/EntitiesStream?sse=true';
        let restUrl = '';
        if (this._parent.restServerUrl.endsWith('/') === false) {
            restUrl = this._parent.restServerUrl + '/' + url;
        } else {
            restUrl = this._parent.restServerUrl + url;
        }

        this._streamerConnectionId = '';
        const that = this;

        const headers: any = {};
        headers.headers = {};
        headers.headers.Authorization = that._parent.authorizationHeader;
        for (const [key, value] of that._parent.additionalHeaders) {
            headers.headers[key] = value;
        }

        return new Promise<RestResponse>(function (resolve, reject) {
            that._evtSource = new EventSourcePolyfill(restUrl, headers);
            that._evtSource.onopen = async function () {};

            that._evtSource.onerror = async (e: any) => {
                // reject(new RestResponse(HttpStatusCode.BAD_REQUEST, "", null, false));
                await that.onStateChangedAsync(ConnectionState.Disconnected);
            };

            that._evtSource.onmessage = async (result) => {
                const data = JSON.parse(result.data);
                const fo = new FieldObject();
                fo.loadFields(data);
                if (that.parseResponseAsync(fo)) {
                    // Got the connection id
                    try {
                        // Execute 1 call to make sure the proxy has initialized properly (ValidateCredential issue...)
                        const logonResult = await that.verifyConnectionAsync();
                        const statusCode = logonResult.getField<number>('LoginResult');
                        const reason = logonResult.getFieldObject<FieldObject, IFieldObject>(FieldObject, 'FailedReason');
                        await that._streamerSessionIdReceivedDispatcher.dispatch(that._streamerConnectionId);
                        // check for cancellation
                        if (that._startStreamerToken?.isCancellationRequested === true) {
                            if (that._ws) {
                                that._ws.close();
                                that._ws = null;
                            }
                            reject(new RestResponse(HttpStatusCode.NO_CONTENT, '', null, true));
                        } else {
                            if (statusCode !== HttpStatusCode.OK) {
                                if (that._evtSource) {
                                    that._evtSource.close();
                                    that._evtSource = null;
                                }
                                reject(new RestResponse(HttpStatusCode.UNAUTHORIZED, reason.toString(), null, false));
                            } else {
                                await that.onStateChangedAsync(ConnectionState.Connected);
                                resolve(new RestResponse(HttpStatusCode.OK, null, null, false));
                            }
                        }
                    } catch (e) {
                        if (that._evtSource) {
                            that._evtSource.close();
                            that._evtSource = null;
                        }
                        reject(new RestResponse(HttpStatusCode.UNAUTHORIZED, null, null, false));
                    }
                }
            };
        });
    }

    private async startWebSocket(): Promise<RestResponse> {
        const url = 'v2/EntitiesStream?websocket=true';
        let restUrl = '';
        if (this._parent.restServerUrl.endsWith('/') === false) {
            restUrl = this._parent.restServerUrl + '/' + url;
        } else {
            restUrl = this._parent.restServerUrl + url;
        }

        restUrl = restUrl.replace('http', 'ws');

        this._streamerConnectionId = '';
        const headers: any = {};
        headers.Authorization = this._parent.authorizationHeader;
        for (const [key, value] of this._parent.additionalHeaders) {
            headers[key] = value;
        }
        const headerString = JSON.stringify(headers);
        restUrl = restUrl + '&headers=' + headerString;

        const that = this;
        return new Promise<RestResponse>(function (resolve, reject) {
            that._ws = new WebSocket(restUrl);
            if (that._ws) {
                that._ws.onmessage = async function (event) {
                    const data = JSON.parse(event.data);
                    const fo = new FieldObject();
                    fo.loadFields(data);
                    if (that.parseResponseAsync(fo)) {
                        // Got the connection id
                        try {
                            // Execute 1 call to make sure the proxy has initialized properly (ValidateCredential issue...)
                            const logonResult = await that.verifyConnectionAsync();
                            const statusCode = logonResult.getField<number>('LoginResult');
                            const reason = logonResult.getFieldObject<FieldObject, IFieldObject>(FieldObject, 'FailedReason');
                            await that._streamerSessionIdReceivedDispatcher.dispatch(that._streamerConnectionId);

                            // check for cancellation
                            if (that._startStreamerToken?.isCancellationRequested === true) {
                                if (that._ws) {
                                    that._ws.close();
                                    that._ws = null;
                                }
                                reject(new RestResponse(HttpStatusCode.NO_CONTENT, '', null, true));
                            } else {
                                if (statusCode !== HttpStatusCode.OK) {
                                    if (that._ws) {
                                        that._ws.close();
                                        that._ws = null;
                                    }
                                    reject(new RestResponse(HttpStatusCode.UNAUTHORIZED, reason.toString(), null, false));
                                } else {
                                    await that.onStateChangedAsync(ConnectionState.Connected);
                                    resolve(new RestResponse(HttpStatusCode.OK, null, null, false));
                                }
                            }
                        } catch (e) {
                            if (that._ws) {
                                that._ws.close();
                                that._ws = null;
                            }
                            reject(new RestResponse(HttpStatusCode.UNAUTHORIZED, null, null, false));
                        }
                    }
                };

                that._ws.onclose = async function (event) {
                    await that.onStateChangedAsync(ConnectionState.Disconnected);
                };

                that._ws.onerror = async function (event) {
                    if (that.streamerConnectionState === ConnectionState.Reconnecting) {
                        reject(new RestResponse(HttpStatusCode.NOT_FOUND, '', null, false));
                    } else {
                        await that.onStateChangedAsync(ConnectionState.Disconnected);
                        resolve(new RestResponse(HttpStatusCode.UNAUTHORIZED, '', null, false));
                    }
                };
            }
        });
    }

    // #endregion
}
