import { Injectable } from '@angular/core';
import { Utils } from '@weavix/components/src/utils/utils';
import { MqttQos, MqttService, MqttTransformationType, ObservableSubscribe, TopicPayload } from '@weavix/mqtt';
import { PubSubServiceStub } from '@weavix/services/src/pub-sub.service';
import { Observable, Subscription } from 'rxjs';
import { shareReplay } from 'rxjs/operators';
import { WebMqttStorage } from 'weavix-shared/services/web-mqtt-storage';
import { environment } from '../../environments/environment';
import { Topic } from '@weavix/models/src/topic/topic';
import { HttpService } from './http.service';

@Injectable()
export class PubSubService extends PubSubServiceStub {
    private internalClient: MqttService;
    private replayableSubscriptions: {[topic: string]: ObservableSubscribe<TopicPayload<any>>} = {};
    private storage = new WebMqttStorage('outgoing');
    private options: any = {};
    private connecting = false;

    constructor() {
        super();
        this.initInternalClient();
    }

    get clientId() {
        return this.internalClient.clientId;
    }

    async disconnect() {
        const oldClient = this.internalClient;
        this.internalClient = null;
        this.connecting = false;
        oldClient?.disconnect();
    }

    async loggedIn() {
        const oldClient = this.internalClient;
        if (!this.initInternalClient()) {
            if (!this.connecting) {
                this.internalClient.connect();
                this.connecting = true;
            }
            return;
        }
        this.internalClient.connect();
        this.connecting = true;
        oldClient?.disconnect();
    }

    private initInternalClient() {
        // auth happens via cookie on web so we can pass an empty string here for the auth token
        const platform = environment.teamsApp ? 'teams' : 'web';
        const options = {
            url: environment.pubsubApi,
            port: environment.pubsubPort,
            platform,
            source: `${platform}-${environment.version}`,
            storage: this.storage,
            token: environment.teamsApp && HttpService.bearerToken ? HttpService.bearerToken : '',
            timeout: 30,
            ssl: environment.is360Api.includes('https://') ? true : false,
        };
        if (this.internalClient && Object.keys(options).every(key => options[key] === this.options[key])) return false;
        this.options = options;
        this.internalClient = new MqttService(options);
        return true;
    }

    public async publish(topic: Topic, args: string[], payload: any, qos: MqttQos = MqttQos.atLeastOnce) {
        const resolvedTopic = this.resolveTopic(topic, args);

        const blob = payload instanceof Uint8Array ? payload : payload instanceof Blob ? await (payload as any).arrayBuffer() : JSON.stringify(payload);

        this.internalClient.publishRaw(resolvedTopic, blob, qos);
    }

    public subscribe<T>(component: any, topic: Topic, args: string[], retain: boolean | number = true, qos: MqttQos = MqttQos.atLeastOnce, raw: boolean = false): ObservableSubscribe<TopicPayload<T>> {
        const resolvedTopic = this.resolveTopic(topic, args);
        const transform = raw ? MqttTransformationType.raw : MqttTransformationType.json;

        const obs = this.subscribeInternal<T>(resolvedTopic, retain, qos, transform);

        const safeObs = Utils.safeSubscribe(component, obs);
        return Object.assign(safeObs, { subscribed: obs.subscribed });
    }

    // If client is disconnected for more than 30 minutes, the reconnect errors and has to resubscribe
    // Ideally things might want to reload intelligently instead of just re-subbing (sync service uses pass through)
    public subscribeWithReconnect<T>(topic: string, qos: MqttQos, transform: MqttTransformationType) {
        const client = this.internalClient;
        let base = client.subscribeWithTopic<T>(topic, qos as any, transform);
        const ret = new Observable(obs => {
            let sub: Subscription;
            const resub = () => {
                if (this.internalClient !== client) {
                    // eslint-disable-next-line no-console
                    console.log(`Terminating ${topic}`);
                    // obs.error('disconnected');
                    return;
                }
                // eslint-disable-next-line no-console
                console.log(`Resubbing to ${topic}`);

                base = client.subscribeWithTopic<T>(topic, qos as any, transform);
                sub = base.subscribe(
                    val => obs.next(val),
                    (_: unknown) => resub(),
                );
            };
            resub();
            return () => {
                sub.unsubscribe();
            };
        }) as ObservableSubscribe<TopicPayload<T>>;
        ret.subscribed = base.subscribed;
        return ret;
    }

    public subscribePassThrough<T>(topic: string): ObservableSubscribe<T> {
        return this.internalClient.subscribe(topic);
    }

    private subscribeInternal<T>(resolvedTopic: string, retain: boolean | number, qos: MqttQos, transform: MqttTransformationType): ObservableSubscribe<TopicPayload<T>> {
        if (!retain) {
            return this.subscribeWithReconnect<T>(resolvedTopic, qos, transform);
        } else {
            const bufferSize = (retain === true || retain === 1) ? 1 : undefined; // undefined === infinite
            return this.subscribeWithReplay(resolvedTopic, bufferSize, qos, transform);
        }
    }

    private subscribeWithReplay<T>(topicString: string, bufferSize: number, qos: MqttQos, transform: MqttTransformationType): ObservableSubscribe<TopicPayload<T>> {
        if (!this.replayableSubscriptions[topicString]) {
            const subscription = this.subscribeWithReconnect(topicString, qos, transform);
            const replayedSubscription = subscription.pipe(shareReplay(bufferSize));
            this.replayableSubscriptions[topicString] = Object.assign(replayedSubscription, { subscribed: subscription.subscribed });
        }
        return this.replayableSubscriptions[topicString];
    }

    private resolveTopic(topic: string, args: string[]): string {
        let top = topic.toString();
        args.forEach(a => top = top.replace('+', a));
        return top;
    }
}
