package proj.zoie.impl.indexing;

import java.util.Collection;
import java.util.Comparator;
import java.util.LinkedList;
import org.apache.log4j.Logger;
import proj.zoie.api.DataConsumer;
import proj.zoie.api.LifeCycleCotrolledDataConsumer;
import proj.zoie.api.ZoieException;
import proj.zoie.api.ZoieHealth;

/* loaded from: input_file:proj/zoie/impl/indexing/AsyncDataConsumer.class */
public class AsyncDataConsumer<D> implements LifeCycleCotrolledDataConsumer<D> {
    private static final Logger log = Logger.getLogger(AsyncDataConsumer.class);
    private volatile DataConsumer<D> _consumer;
    private final Comparator<String> _versionComparator;
    private String _currentVersion = null;
    private volatile String _bufferedVersion = null;
    private LinkedList<DataConsumer.DataEvent<D>> _batch = new LinkedList<>();
    private int _batchSize = 1;
    private volatile AsyncDataConsumer<D>.ConsumerThread _consumerThread = null;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:proj/zoie/impl/indexing/AsyncDataConsumer$ConsumerThread.class */
    public final class ConsumerThread extends IndexingThread {
        boolean _stop;

        ConsumerThread() {
            super("ConsumerThread");
            this._stop = false;
        }

        public void terminate() {
            this._stop = true;
            synchronized (AsyncDataConsumer.this) {
                AsyncDataConsumer.this.notifyAll();
            }
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (!this._stop) {
                AsyncDataConsumer.this.flushBuffer();
            }
        }
    }

    public AsyncDataConsumer(Comparator<String> comparator) {
        this._versionComparator = comparator;
    }

    @Override // proj.zoie.api.LifeCycleCotrolledDataConsumer
    public void start() {
        this._consumerThread = new ConsumerThread();
        this._consumerThread.setDaemon(true);
        this._consumerThread.start();
    }

    @Override // proj.zoie.api.LifeCycleCotrolledDataConsumer
    public void stop() {
        this._consumerThread.terminate();
    }

    public void setDataConsumer(DataConsumer<D> dataConsumer) {
        synchronized (this) {
            this._consumer = dataConsumer;
        }
    }

    public void setBatchSize(int i) {
        synchronized (this) {
            this._batchSize = Math.max(1, i);
        }
    }

    public int getBatchSize() {
        int i;
        synchronized (this) {
            i = this._batchSize;
        }
        return i;
    }

    public int getCurrentBatchSize() {
        int size;
        synchronized (this) {
            size = this._batch != null ? this._batch.size() : 0;
        }
        return size;
    }

    public String getCurrentVersion() {
        String str;
        synchronized (this) {
            str = this._currentVersion;
        }
        return str;
    }

    public void flushEvents(long j) throws ZoieException {
        syncWithVersion(j, this._bufferedVersion);
    }

    public void syncWithVersion(long j, String str) throws ZoieException {
        if (this._consumerThread == null) {
            throw new ZoieException("not running");
        }
        if (str == null) {
            log.info("buffered version is NULL. Nothing to flush.");
            return;
        }
        synchronized (this) {
            long j2 = Long.MAX_VALUE;
            while (true) {
                if (this._currentVersion == null || this._versionComparator.compare(this._currentVersion, str) < 0) {
                    if (log.isDebugEnabled()) {
                        if (j2 > j + 5000) {
                            log.debug("syncWithVersion: timeRemaining: " + j + "ms current: " + this._currentVersion + " expecting: " + str);
                        }
                        j2 = j;
                    }
                    notifyAll();
                    long currentTimeMillis = System.currentTimeMillis();
                    if (j <= 0) {
                        throw new ZoieException("sync timed out at current: " + this._currentVersion + " expecting: " + str);
                    }
                    try {
                        wait(Math.min(5000L, j));
                    } catch (InterruptedException e) {
                        log.warn(e.getMessage(), e);
                    }
                    j -= System.currentTimeMillis() - currentTimeMillis;
                }
            }
        }
    }

    @Override // proj.zoie.api.DataConsumer
    public void consume(Collection<DataConsumer.DataEvent<D>> collection) throws ZoieException {
        if (collection == null || collection.size() == 0) {
            return;
        }
        synchronized (this) {
            while (this._batch.size() >= this._batchSize) {
                if (this._consumerThread == null || !this._consumerThread.isAlive() || this._consumerThread._stop) {
                    ZoieHealth.setFatal();
                    throw new ZoieException("consumer thread has stopped");
                }
                try {
                    notifyAll();
                    wait();
                } catch (InterruptedException e) {
                }
            }
            for (DataConsumer.DataEvent<D> dataEvent : collection) {
                this._bufferedVersion = this._bufferedVersion == null ? dataEvent.getVersion() : this._versionComparator.compare(this._bufferedVersion, dataEvent.getVersion()) < 0 ? dataEvent.getVersion() : this._bufferedVersion;
                this._batch.add(dataEvent);
            }
            if (log.isDebugEnabled()) {
                log.debug("consume:receiving: buffered: " + this._bufferedVersion);
            }
            notifyAll();
        }
    }

    protected final void flushBuffer() {
        synchronized (this) {
            while (this._batch.size() == 0) {
                if (this._consumerThread._stop) {
                    return;
                }
                try {
                    notifyAll();
                    wait(1000L);
                } catch (InterruptedException e) {
                }
            }
            String str = this._currentVersion == null ? this._bufferedVersion : this._versionComparator.compare(this._currentVersion, this._bufferedVersion) < 0 ? this._bufferedVersion : this._currentVersion;
            LinkedList<DataConsumer.DataEvent<D>> linkedList = this._batch;
            this._batch = new LinkedList<>();
            notifyAll();
            if (log.isDebugEnabled()) {
                log.debug("flushBuffer: pre-flush: currentVersion: " + this._currentVersion + " processing version: " + str + " of size: " + linkedList.size());
            }
            if (this._consumer != null) {
                try {
                    this._consumer.consume(linkedList);
                } catch (Exception e2) {
                    log.error(e2.getMessage(), e2);
                }
            }
            synchronized (this) {
                this._currentVersion = str;
                if (log.isDebugEnabled()) {
                    log.debug("flushBuffer: post-flush: currentVersion: " + this._currentVersion);
                }
                notifyAll();
            }
        }
    }

    @Override // proj.zoie.api.DataConsumer
    public String getVersion() {
        return this._bufferedVersion;
    }

    @Override // proj.zoie.api.DataConsumer
    public Comparator<String> getVersionComparator() {
        return this._versionComparator;
    }
}
