import { ReplaySubject, Subscription } from 'rxjs';

import { get, set } from 'lodash';
import { SyncEvent, SyncEventStorage, SyncStorage } from './sync.model';
import { SyncService } from './sync.service';
import { mergeCopyArrays } from '@weavix/utils/src/merge-copy-arrays';

export class SyncPartition<T extends { id?: string }> {
    loaded$ = new ReplaySubject<boolean>(1);
    reset$ = new ReplaySubject<boolean>(1);

    private subscription?: Subscription;
    private closed = false;
    private updating: SyncEvent<T>[] = [];
    private applying = false;

    constructor(
        private syncService: SyncService,
        private storage: SyncStorage<T>,
        private eventStorage: SyncEventStorage,
        public topic: string,
        public partition: string,
        private lazy: boolean
    ) {
        this.subscribeAndGet().catch(e => console.error(e));
    }

    async close(reset: boolean) {
        if (this.closed) return;
        this.closed = true;

        console.log(`Clearing sync partition ${this.partition}`);
        this.unsubscribe();
        this.loaded$.next(true);

        if (reset) await this.eventStorage.clear(this.partition);
    }

    async subscribeAndGet() {
        const date = await this.eventStorage.getDate(this.partition);
        let queue: SyncEvent<T>[] | null = [];

        console.log(`Initializing sync partition ${this.topic} from date ${date}`);
        await this.storage.dump(this.partition);

        const subject = await this.syncService.subscribe<SyncEvent<T>[]>(this.topic, () => this.closed, this.lazy);
        this.subscription = subject.subscribe(
            async val => {
                if (queue) queue.push(...val);
                else await this.queueUpdate(val, false);
            },
            () => this.subscribeAndGet().catch(e => console.error(e)),
        );

        const data = await this.syncService.get<SyncEvent<T>[]>(
            `/sync/${this.topic}`,
            date ? { date } : {},
            () => this.closed,
            this.lazy
        );

        await this.queueUpdate(data, true);
        const remaining = queue;
        queue = null;
        await this.queueUpdate(remaining, true);

        await this.storage.addPartition(this.partition);
        this.loaded$.next(true);
    }

    private unsubscribe() {
        if (this.subscription) {
            this.subscription.unsubscribe();
            this.subscription = null;
        }
    }

    protected async queueUpdate(publish: SyncEvent<T>[], init: boolean) {
        if (init && publish.length) console.log(`Applying ${publish.length} events to ${this.topic}`);
        this.updating.push(...publish);
        if (this.applying) return;
        this.applying = true;
        const start = new Date().getTime();
        while (this.updating.length && !this.closed) {
            const event = this.updating.shift() as SyncEvent<T>;
            const result = this.applyUpdate(event, init);
            // Doing large amounts of promises is slow in react native... has to do with timeouts and stuff
            // So we only await when we need to (most stuff in lazy storage needs loaded once then is fine)
            if ((result as Promise<any>)?.then) await result;
            this.eventStorage.add(this.partition, event.date, event.syncId);
        }
        if (init && publish.length) console.log(`Done applying updates ${this.topic} in ${new Date().getTime() - start} ms sync time ${publish[publish.length - 1].date}`);
        this.applying = false;
    }

    protected applyUpdate(publish: SyncEvent<T>, init: boolean) {
        if (publish.syncId && this.eventStorage.isDuplicate(publish.syncId)) {
            if (!init) console.log(`Sync ${publish.syncId} ignored due to duplication for ${this.topic}`);
            return;
        }
        switch (publish.type) {
            case 'reset':
                console.log(`Sync reset event for ${this.topic}`);
                return this.reset();
            case 'clear':
                console.log(`Sync clear event for ${this.topic} of size ${publish.payload?.length}`);
                return this.eventStorage.clear(this.partition, publish.payload ?? []);
            case 'insert':
                if (!init) console.log(`Sync insert event for ${this.partition} id ${publish.payload.id}`);
                return this.storage.add(publish.payload as T, this.partition);
            case 'delete':
            case 'update':
            case 'pull':
            case 'push':
                if (!init) console.log(`Sync ${publish.type} event for ${this.partition} from ${this.topic} id ${publish.id}`);

                const getKeys = () => Object.keys((publish as any).payload ?? {});
                switch (publish.type) {
                    case 'delete':
                        return this.storage.remove(publish.id, this.partition);
                    case 'update':
                        if (publish.inc || publish.set) {
                            const incSet = record => {
                                if (publish.inc) {
                                    Object.keys(publish.inc).forEach(field => {
                                        const current = get(record, field) ?? 0;
                                        const incValue = publish.inc[field];
                                        const newValue = current + incValue;
                                        set(record, field, newValue);
                                    });
                                }
                                if (publish.set) {
                                    Object.keys(publish.set).forEach(field => {
                                        set(record, field, publish.set[field]);
                                    });
                                }
                                return record;
                            }
                            return this.storage.update(publish.id, incSet, getKeys, this.partition);
                        } else {
                            return this.storage.update(publish.id, record => mergeCopyArrays(record, publish.payload), getKeys, this.partition);
                        }
                    case 'pull':
                        const pull = record => {
                            Object.keys(publish.payload).forEach(field => {
                                const array = get(record, field);
                                if (array) {
                                    const filter = publish.payload[field as keyof T];
                                    for (let i = 0; i < array.length; i++) {
                                        if (typeof filter === 'object' && Object.keys(filter).every(key => array[i][key] === (filter as any)[key])
                                            || typeof filter !== 'object' && filter === array[i]) {
                                            array.splice(i--, 1);
                                        }
                                    }
                                }
                            });
                            return record;
                        }
                        return this.storage.update(publish.id, pull, getKeys, this.partition);
                    case 'push':
                        const push = record => {
                            Object.keys(publish.payload).forEach(field => {
                                const array = get(record, field);
                                if (!array) set(record as T, field, [publish.payload[field as keyof T]]);
                                else array.push(publish.payload[field as keyof T]);
                            });
                            return record;
                        }
                        return this.storage.update(publish.id, push, getKeys, this.partition);
                }
            default:
                console.warn(`${(publish as any).type} sync event not recognized`);
                return;
        }
    }

    private async reset() {
        await this.close(true);
        this.reset$.next(true);
    }
}
