import { RestConnection } from 'RestClient/Connection/RestConnection';
import { Streamer } from 'RestClient/Connection/Streamers/Streamer';
import { LogonState } from 'RestClient/Client/Args/LogonStateChangedArgs';
import HttpStatusCode from 'RestClient/Client/Enumerations/HttpStatusCode';
import { IFieldObject } from 'RestClient/Client/Interface/IFieldObject';
import { WebAppClient } from './WebAppClient';
import { Observable, Subject, from } from 'rxjs';
import { filter, map, tap } from 'rxjs/operators';
import { exhaustMapWithTrailing } from './Rxjs/exhaust-map-with-trailing';
import { CancellationToken } from 'RestClient/Helpers/Helpers';
import { RestResponse } from 'RestClient/Connection/RestResponse';

export class WebAppConnection extends RestConnection {
    private _parent: WebAppClient;
    private fetchEventSubject = new Subject();
    private cancellationToken = new CancellationToken();
    private webSocketSubject?: Subject<void>;

    constructor(parent: WebAppClient) {
        super();
        this._parent = parent;
        this.windowsAuthentificationEnabled = parent.windowsAuthentificationEnabled;

        this.fetchEventSubject
            .pipe(
                exhaustMapWithTrailing(() => this.getEvents()),
                filter((restResponse) => restResponse.statusCode === HttpStatusCode.OK && restResponse.body !== undefined),
                map((restResponse) => restResponse.body as Array<IFieldObject>),
                tap((fieldObjects) => {
                    fieldObjects.forEach((fieldObject) => {
                        this._restStreamer.entity_EventReceived(fieldObject.toString());
                    });
                })
            )
            .subscribe();
    }

    public async disposeAsync(): Promise<void> {
        await super.disposeAsync();

        this.fetchEventSubject.next();
        this.fetchEventSubject.complete();

        this.disposeWebSocketSubject();

        this.cancellationToken.cancelToken();
    }

    public buildStreamer(): void {
        this._restStreamer = new Streamer(this);
        this._restStreamer._unknownRequestDispatcher.subscribe(async (arg) => {
            switch (arg.msgType) {
                case 'fetchevent':
                    {
                        this.fetchEventSubject.next();
                    }
                    break;
            }
        });

        this._restStreamer._streamerSessionIdReceivedDispatcher.subscribe(async (_: string) => {
            try {
                await this._parent.afterLogonOperationAsync(LogonState.LoggedOn);
            } catch (e) {
                await this.disposeAsync();
            }
        });
    }

    private disposeWebSocketSubject() {
        if (this.webSocketSubject) {
            this.webSocketSubject.next();
            this.webSocketSubject.complete();
        }
    }

    private getEvents(): Observable<RestResponse> {
        return from(this.executeRequestAsync('GET', 'api/Session/Events', null, this.cancellationToken));
    }
}
