import { Injectable, OnDestroy } from '@angular/core';
import { bufferWhen, debounceTime, distinctUntilChanged, Observable, race, skip, Subject, switchMap, takeUntil } from 'rxjs';

@Injectable({
    providedIn: 'root'
})
export class ScanQueueService implements OnDestroy {
    bufferFlushes: Observable<Array<string>>;

    private readonly scanBuffer = new Subject<string>();
    private readonly scannedValueChanges = new Subject<void>();
    private readonly userStopsScanning = new Subject<void>();
    private readonly clearBuffer = new Subject<void>();
    private readonly flushedScans = new Subject<Array<string>>();
    private readonly unsubscribe = new Subject<void>();

    constructor() {
        this.scanBuffer.asObservable()
            .pipe(
                distinctUntilChanged(),
                // distinctUntilChanged() always emits the first value. This would cause the buffer to be flushed for the first item in the
                // queue, but we only want to flush the buffer when a scanned value has changed
                skip(1),
                takeUntil(this.unsubscribe)
            )
            .subscribe(() => {
                this.scannedValueChanges.next();
            });

        this.scanBuffer.asObservable()
            .pipe(
                debounceTime(600),
                takeUntil(this.unsubscribe)
            )
            .subscribe(() => {
                this.userStopsScanning.next();
            });

        this.scanBuffer.asObservable()
            .pipe(
                switchMap(() => {
                    // As scans are buffered, we only want to flush the buffer if the user has stopped scanning, or they have scanned a new
                    // product. This will allow us to consolidate multiple scans into one lookup and upsert request.
                    // This is checked after each scan.
                    return race(
                        this.scannedValueChanges.asObservable(),
                        this.userStopsScanning.asObservable()
                    );
                }),
                takeUntil(this.unsubscribe)
            )
            .subscribe(() => {
                this.clearBuffer.next();
            });

        this.scanBuffer.asObservable()
            .pipe(
                bufferWhen(() => this.clearBuffer.asObservable()),
                takeUntil(this.unsubscribe)
            )
            .subscribe(value => {
                this.flushedScans.next(value);
            });

        this.bufferFlushes = this.flushedScans.asObservable();
    }

    ngOnDestroy(): void {
        this.unsubscribe.next();
        this.unsubscribe.complete();
        this.scanBuffer.complete();
        this.flushedScans.complete();
        this.scannedValueChanges.complete();
        this.userStopsScanning.complete();
        this.clearBuffer.complete();
    }

    queueScan(searchText: string): void {
        this.scanBuffer.next(searchText);
    }
}
