import { HttpParams } from '@angular/common/http';
import { ApplicationRef, Injectable } from '@angular/core';
import { Observable, Subject } from 'rxjs';
import { first, switchMap, takeUntil } from 'rxjs/operators';
import { webSocket } from 'rxjs/webSocket';
import { JwtService } from '@app/auth/core/services/jwt.service';
import { HttpUrlCodec } from '@app/util/http';
import { startPingingWebSocketUntilClosed } from './helpers/start-pinging-websocket-until-closed.helper';
import { createWebSocketUrl } from './helpers/create-websocket-url.helper';
import { deserializerWithDateReviver } from './helpers/deserialize-with-date-reviver.helper';
import { backOffRetry } from '@app/util/operators/back-off-retry';
import { Store } from '@ngrx/store';
import { OrderListState } from '@app/order/routes/order-list/state/order-list.reducer';
import { OrderActionType } from '@app/order/core/state/order.actions';
import { TypedAction } from '@ngrx/store/src/models';
import { ShipmentActionType } from '@app/shipment/core/state/shipment.actions';

@Injectable()
export class WebSocketService {
  // Needed for unit tests
  // TODO: this should be added for unit test only, not for component
  public webSocketFn = webSocket;
  private closeAll$ = new Subject<void>();

  constructor(
    private jwtService: JwtService,
    private applicationRef: ApplicationRef,
    private store$: Store<OrderListState>,
  ) {}

  public createWebSocket$<T>(
    path: string,
    params = new HttpParams({ encoder: new HttpUrlCodec() }),
    action?:
      | (() => TypedAction<OrderActionType.WEBSOCKET_CONNECTED | ShipmentActionType.WEBSOCKET_CONNECTED>)
      | undefined,
  ): Observable<T> {
    const canRecover = (error: CloseEvent) => {
      return error.code !== 1000;
    };
    const recoverAttempts = 15;

    let shouldResetAttempts = false;

    const canReset: () => boolean = () => {
      if (shouldResetAttempts) {
        shouldResetAttempts = false;

        return true;
      }

      return false;
    };

    const resetAttempts: () => void = () => {
      shouldResetAttempts = true;
    };

    return this.jwtService.getValidTokens$().pipe(
      switchMap(tokens => {
        return this.webSocketFactory$<T>(createWebSocketUrl(path, params), tokens.accessToken, resetAttempts, action);
      }),
      backOffRetry(canRecover, recoverAttempts, canReset),
      takeUntil(this.closeAll$),
    );
  }

  public closeAll(): void {
    this.closeAll$.next();
  }

  /**
   * Creates an RxJS WebSocketSubject that:
   *  - Revives date strings from received messages
   *  - Pings the server to retain the connection
   *  - Recovers from connection failures
   */
  private webSocketFactory$<T>(
    url: string,
    auth: string,
    openFn: () => void,
    action:
      | (() => TypedAction<OrderActionType.WEBSOCKET_CONNECTED | ShipmentActionType.WEBSOCKET_CONNECTED>)
      | undefined,
  ): Observable<T> {
    const socket$ = this.webSocketFn<T>({
      url,
      deserializer: deserializerWithDateReviver,
      protocol: auth,
      openObserver: {
        next: (event: Event) => {
          openFn();
          if (action) {
            this.store$.dispatch(action());
          }
        },
      },
    });

    this.applicationRef.isStable.pipe(first(Boolean)).subscribe(() => {
      startPingingWebSocketUntilClosed(socket$);
    });

    /**
     * Do not recover connection on status code 1000: Normal closure; the connection successfully
     * completed whatever purpose for which it was created.
     */

    return socket$.asObservable();
  }
}
