import {Injectable} from '@angular/core';
import {environment} from '../../../../environments/environment';
import {ApiConstants} from '../../api.constant';

import {ARLogger, ARUserStorage} from '@relayter/core';
import {UserService} from '../users.service';
import {distinctUntilChanged, filter, map, tap} from 'rxjs/operators';
import {BehaviorSubject, combineLatest, Observable, ReplaySubject, Subject} from 'rxjs';
import {ESocketEventType, EUserEventType} from './constants/socket-events';
import {ESocketAction} from './constants/socket-actions';
import {UserStorage} from '../../../classes/user-storage.class';
import {JobModel} from '../../../models/api/job.model';
import {ERoomTypes} from './constants/socket-room-types';
import {SocketRoomManager} from './socket-room-manager';
import {io, Socket} from 'socket.io-client';

export interface ISocketMessageBody {
    room: string;
    type: ESocketEventType;
    data?: any;
}

export enum EConnectionStatus {
    UP = 'UP',
    CONNECTING = 'CONNECTING',
    DOWN = 'DOWN'
}

@Injectable({
    providedIn: 'root'
})
export class SocketService {
    public static readonly MESSAGES_DEBOUNCE_TIME = 250;
    public socket: Socket;
    private messageSubject = new Subject<ISocketMessageBody>();
    public personalMessages$: Subject<ISocketMessageBody> = new Subject<ISocketMessageBody>();
    public publicationUpdates$ = new Subject<ISocketMessageBody>();
    // UI subscribers do it after the job already is created and scheduled, and there is a change the job is already DONE on subscription.
    // So emit all job updates from 10 sec ago on subscribe
    public jobMessages$ = new ReplaySubject<JobModel>(undefined, 10000);
    private connectionStatusSubject = new BehaviorSubject<EConnectionStatus>(EConnectionStatus.DOWN);
    private disconnectSubject = new Subject<void>();
    private socketRoomManager = new SocketRoomManager(this.messageSubject);

    /**
     * Subscribes to user and team updates
     */
    constructor(private userService: UserService) {
        const connectionStatusChange = this.connectionStatusSubject.pipe(distinctUntilChanged());

        this.userService.getUserUpdates().pipe(
            map((user) => user ? {_id: user._id, team: user.team._id} : null),
            distinctUntilChanged((prevUser, currUser) =>
                prevUser?._id === currUser?._id && prevUser?.team === currUser?.team)
        ).subscribe(user => user ? this.joinRoomsForUser(user) : this.disconnect());

        // Next line will only connect if there was no connection. This ensures that if the token is expired we can retry when a new token is ready
        combineLatest([UserStorage.getAccessTokenUpdates(), connectionStatusChange]).pipe(
            filter(([accessToken, status]) => !!accessToken && status === EConnectionStatus.DOWN),
            tap(() => ARLogger.debug('SocketService: Found new access token while connection status is DOWN. Reconnecting')))
            .subscribe(() => this.connect());

        // Whenever we see a connection status change to UP we reconnect to all the rooms we were in.
        // This may cause us send a 'join-room' request multiple times, however this does not have any side effect since the server will not add the
        // user to the room multiple times.
        this.connectionStatusSubject.pipe(
            filter((status) => status === EConnectionStatus.UP)
        ).subscribe(() => this.socketRoomManager.rejoinRooms());
    }

    /**
     * Join rooms on login of the user
     * @private
     */
    private joinRoomsForUser(user: { _id: string; team: string}): void {
        // Join the team/users/{id} room.
        if (user?._id) {
            this.personalMessages$ = this.joinRoom(`${user.team}/${ERoomTypes.USERS}/${user._id}`) as Subject<ISocketMessageBody>;
        }

        // Join the team/jobs room
        this.joinRoom(`${user.team}/${ERoomTypes.JOBS}`).pipe(
            filter(message => message?.type === EUserEventType.JOB_UPDATE),
            // Map to JobModel
            map(message => new JobModel(message.data.jobId, message.data.jobType, message.data.status, message.data.result))
        ).subscribe((message) => this.jobMessages$.next(message));
    }

    /**
     * Opens socket connection, calls authenticate and sets up connection status listeners
     */
    public connect(): void {
        if (!this.socket) {
            this.connectionStatusSubject.next(EConnectionStatus.CONNECTING);
            this.socket = io(environment.API_SERVER, {path: ApiConstants.API_SOCKET_ENDPOINT, transports: ['websocket']});
            this.socketRoomManager.socket = this.socket;

            // Listen to socket events
            this.socket.on('connect', () => {
                ARLogger.debug('SocketService: Connected.. Authenticating');
                this.authenticate();
            });

            this.socket.on('disconnect', (reason) => {
                this.connectionStatusSubject.next(EConnectionStatus.DOWN);
                if (reason === 'io client disconnect') {
                    ARLogger.debug(`SocketService: Client disconnected`);
                } else if (reason === 'io server disconnect') {
                    ARLogger.debug(`SocketService: Server disconnected`);
                    this.socket = null;
                    this.socketRoomManager.socket = null;
                } else {
                    // Socket will automatically try to reconnect, so just log the event
                    ARLogger.error(`SocketService: Connection lost, reason: ${reason}`);
                }
            });

            this.socket.on('connect_error', (failure) => ARLogger.error(`SocketService: Connection failure: ${failure}`));

            this.socket.on(ESocketAction.UPDATE, (update) => this.messageSubject.next(update));

            // Listen to manager events
            this.socket.io.on('reconnect_attempt', (attemptNumber) => ARLogger.debug(`SocketService: Reconnecting... attempt: ${attemptNumber}`));

            this.socket.io.on('reconnect_failed', () => {
                // TODO: set max reconnection attempts?
                ARLogger.error('SocketService: Failed to reconnect to websocket');
                this.socket = null;
                this.socketRoomManager.socket = null;
            });

            this.socket.io.on('reconnect', () => ARLogger.error('SocketService: Successful reconnection'));
            this.socket.io.on('reconnect_error', (error) => ARLogger.error(`SocketService: Reconnect error: ${error}`));
            this.socket.io.on('error', (error) => ARLogger.error(`SocketService: Error: ${error}`));
        }
    }

    /**
     * Sends an 'authenticate' event with jwt token and callback
     */
    private authenticate(): void {
        const token = ARUserStorage.getAccessToken();
        this.socket.emit(ESocketAction.AUTHENTICATE, {token}, (ack: boolean) => {
            ARLogger.debug(ack ? 'SocketService: Authenticated' : 'SocketService: Not authenticated');
            this.connectionStatusSubject.next(ack ? EConnectionStatus.UP : EConnectionStatus.DOWN);
        });
    }

    /**
     * Called when user is logged out
     */
    private disconnect(): void {
        this.disconnectSubject.next();
        if (this.socket) {
            this.socketRoomManager.clear();
            this.socket.disconnect();
            this.socket = null;
            this.socketRoomManager.socket = null;
            this.connectionStatusSubject.next(EConnectionStatus.DOWN);
        }
    }

    /**
     * Join a room
     */
    public joinRoom(roomId: string): Observable<ISocketMessageBody> {
        return this.socketRoomManager.joinRoom(roomId);
    }

    /**
     * Leave the room
     */
    public leaveRoom(roomId: string): void {
        this.socketRoomManager.leaveRoom(roomId);
    }
}
