package proj.zoie.impl.indexing;

import java.util.Collection;
import java.util.Comparator;
import java.util.LinkedList;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.log4j.Logger;
import proj.zoie.api.DataConsumer;
import proj.zoie.api.DataProvider;
import proj.zoie.api.ZoieException;
import proj.zoie.mbean.DataProviderAdminMBean;

/* loaded from: input_file:proj/zoie/impl/indexing/StreamDataProvider.class */
public abstract class StreamDataProvider<D> implements DataProvider<D>, DataProviderAdminMBean {
    private static final Logger log = Logger.getLogger(StreamDataProvider.class);
    private DataThread<D> _thread;
    protected final Comparator<String> _versionComparator;
    private volatile int _retryTime = 100;
    private volatile long _maxEventsPerMinute = Long.MAX_VALUE;
    private volatile long _maxVolatileTimeInMillis = Long.MAX_VALUE;
    private int _batchSize = 1;
    private DataConsumer<D> _consumer = null;

    /* loaded from: input_file:proj/zoie/impl/indexing/StreamDataProvider$DataThread.class */
    private static final class DataThread<D> extends Thread {
        private Collection<DataConsumer.DataEvent<D>> _batch;
        private volatile String _flushedVersion;
        private volatile String _bufferedVersion;
        private final StreamDataProvider<D> _dataProvider;
        private volatile boolean _paused;
        private volatile boolean _stop;
        private final AtomicLong _eventCount;
        private volatile long _throttle;
        private volatile long _maxVolatileTimeInMillis;
        private volatile long _lastFlushTime;
        private boolean _flushing;
        private final Comparator<String> _versionComparator;
        private long lastcount;
        private final long[] last60;
        private final long[] last60slots;
        private volatile int currentslot;
        private static final int window = 3;

        private void resetEventTimer() {
            this._eventCount.set(0L);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public String getStatus() {
            synchronized (this) {
                return this._stop ? "stopped" : this._paused ? "paused" : "running";
            }
        }

        DataThread(StreamDataProvider<D> streamDataProvider) {
            super("Stream DataThread");
            this._eventCount = new AtomicLong(0L);
            this._throttle = 40000L;
            this._maxVolatileTimeInMillis = Long.MAX_VALUE;
            this._lastFlushTime = System.currentTimeMillis();
            this._flushing = false;
            this.lastcount = 0L;
            this.last60 = new long[60];
            this.last60slots = new long[60];
            this.currentslot = 0;
            setDaemon(false);
            this._dataProvider = streamDataProvider;
            this._flushedVersion = null;
            this._bufferedVersion = null;
            this._paused = false;
            this._stop = false;
            this._batch = new LinkedList();
            this._versionComparator = streamDataProvider._versionComparator;
        }

        @Override // java.lang.Thread
        public void start() {
            super.start();
            resetEventTimer();
        }

        void terminate() {
            this._stop = true;
            synchronized (this) {
                notifyAll();
            }
        }

        void pauseDataFeed() {
            this._paused = true;
        }

        void resumeDataFeed() {
            synchronized (this) {
                this._paused = false;
                resetEventTimer();
                notifyAll();
            }
        }

        private void flush() {
            Collection<DataConsumer.DataEvent<D>> collection = this._batch;
            this._batch = new LinkedList();
            try {
                if (((StreamDataProvider) this._dataProvider)._consumer != null) {
                    int size = collection.size();
                    ((StreamDataProvider) this._dataProvider)._consumer.consume(collection);
                    this._eventCount.getAndAdd(size);
                    updateStats();
                }
            } catch (ZoieException e) {
                StreamDataProvider.log.error(e.getMessage(), e);
            }
            this._lastFlushTime = System.currentTimeMillis();
        }

        private synchronized void updateStats() {
            long j = this._eventCount.get();
            long j2 = j - this.lastcount;
            this.lastcount = j;
            long nanoTime = System.nanoTime();
            if (nanoTime - this.last60slots[this.currentslot] > 1000000000) {
                this.currentslot = (this.currentslot + 1) % this.last60.length;
                this.last60slots[this.currentslot] = nanoTime;
                this.last60[this.currentslot] = 0;
            }
            long[] jArr = this.last60;
            int i = this.currentslot;
            jArr[i] = jArr[i] + j2;
        }

        /* JADX WARN: Code restructure failed: missing block: B:31:0x003e, code lost:
        
            throw new proj.zoie.api.ZoieException("sync timed out");
         */
        /* JADX WARN: Finally extract failed */
        /*
            Code decompiled incorrectly, please refer to instructions dump.
            To view partially-correct add '--show-bad-code' argument
        */
        public void syncWithVersion(long r7, java.lang.String r9) throws proj.zoie.api.ZoieException {
            /*
                r6 = this;
                r0 = r9
                if (r0 != 0) goto L5
                return
            L5:
                long r0 = java.lang.System.currentTimeMillis()
                r10 = r0
                r0 = r10
                r1 = r7
                long r0 = r0 + r1
                r12 = r0
                r0 = r6
                r1 = r0
                r14 = r1
                monitor-enter(r0)
            L15:
                r0 = r6
                java.lang.String r0 = r0._flushedVersion     // Catch: java.lang.Throwable -> L79 java.lang.Throwable -> L89
                if (r0 == 0) goto L2d
                r0 = r6
                java.util.Comparator<java.lang.String> r0 = r0._versionComparator     // Catch: java.lang.Throwable -> L79 java.lang.Throwable -> L89
                r1 = r6
                java.lang.String r1 = r1._flushedVersion     // Catch: java.lang.Throwable -> L79 java.lang.Throwable -> L89
                r2 = r9
                int r0 = r0.compare(r1, r2)     // Catch: java.lang.Throwable -> L79 java.lang.Throwable -> L89
                if (r0 >= 0) goto L71
            L2d:
                r0 = r10
                r1 = r12
                int r0 = (r0 > r1 ? 1 : (r0 == r1 ? 0 : -1))
                if (r0 < 0) goto L3f
                proj.zoie.api.ZoieException r0 = new proj.zoie.api.ZoieException     // Catch: java.lang.Throwable -> L79 java.lang.Throwable -> L89
                r1 = r0
                java.lang.String r2 = "sync timed out"
                r1.<init>(r2)     // Catch: java.lang.Throwable -> L79 java.lang.Throwable -> L89
                throw r0     // Catch: java.lang.Throwable -> L79 java.lang.Throwable -> L89
            L3f:
                r0 = r6
                r0.notifyAll()     // Catch: java.lang.InterruptedException -> L5a java.lang.Throwable -> L79 java.lang.Throwable -> L89
                r0 = r6
                r1 = 1
                r0._flushing = r1     // Catch: java.lang.InterruptedException -> L5a java.lang.Throwable -> L79 java.lang.Throwable -> L89
                r0 = r6
                r1 = r12
                r2 = r10
                long r1 = r1 - r2
                r2 = 200(0xc8, double:9.9E-322)
                long r1 = java.lang.Math.min(r1, r2)     // Catch: java.lang.InterruptedException -> L5a java.lang.Throwable -> L79 java.lang.Throwable -> L89
                r0.wait(r1)     // Catch: java.lang.InterruptedException -> L5a java.lang.Throwable -> L79 java.lang.Throwable -> L89
                goto L69
            L5a:
                r15 = move-exception
                org.apache.log4j.Logger r0 = proj.zoie.impl.indexing.StreamDataProvider.access$600()     // Catch: java.lang.Throwable -> L79 java.lang.Throwable -> L89
                r1 = r15
                java.lang.String r1 = r1.getMessage()     // Catch: java.lang.Throwable -> L79 java.lang.Throwable -> L89
                r2 = r15
                r0.warn(r1, r2)     // Catch: java.lang.Throwable -> L79 java.lang.Throwable -> L89
            L69:
                long r0 = java.lang.System.currentTimeMillis()     // Catch: java.lang.Throwable -> L79 java.lang.Throwable -> L89
                r10 = r0
                goto L15
            L71:
                r0 = r6
                r1 = 0
                r0._flushing = r1     // Catch: java.lang.Throwable -> L89
                goto L83
            L79:
                r16 = move-exception
                r0 = r6
                r1 = 0
                r0._flushing = r1     // Catch: java.lang.Throwable -> L89
                r0 = r16
                throw r0     // Catch: java.lang.Throwable -> L89
            L83:
                r0 = r14
                monitor-exit(r0)     // Catch: java.lang.Throwable -> L89
                goto L91
            L89:
                r17 = move-exception
                r0 = r14
                monitor-exit(r0)     // Catch: java.lang.Throwable -> L89
                r0 = r17
                throw r0
            L91:
                return
            */
            throw new UnsupportedOperationException("Method not decompiled: proj.zoie.impl.indexing.StreamDataProvider.DataThread.syncWithVersion(long, java.lang.String):void");
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (!this._stop) {
                updateStats();
                synchronized (this) {
                    while (!this._stop && (this._paused || getEventsPerMinute() > this._throttle)) {
                        try {
                            wait(500L);
                            updateStats();
                        } catch (InterruptedException e) {
                            Thread.interrupted();
                        }
                    }
                }
                if (!this._stop) {
                    DataConsumer.DataEvent<D> next = this._dataProvider.next();
                    if (next != null) {
                        this._bufferedVersion = this._versionComparator.compare(this._bufferedVersion, next.getVersion()) >= 0 ? this._bufferedVersion : next.getVersion();
                        synchronized (this) {
                            this._batch.add(next);
                            if (this._batch.size() >= ((StreamDataProvider) this._dataProvider)._batchSize || this._flushing || System.currentTimeMillis() - this._lastFlushTime > this._maxVolatileTimeInMillis) {
                                flush();
                                this._flushedVersion = this._bufferedVersion;
                                notifyAll();
                            }
                        }
                    } else {
                        synchronized (this) {
                            if (this._batch.size() > 0) {
                                flush();
                                this._flushedVersion = this._bufferedVersion;
                            }
                            notifyAll();
                            try {
                                wait(this._dataProvider.getRetryTime());
                            } catch (InterruptedException e2) {
                                Thread.interrupted();
                            }
                        }
                    }
                }
            }
            flush();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public long getEventCount() {
            return this._eventCount.get();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public long getEventsPerMinute() {
            int i = this.currentslot;
            long j = 0;
            long j2 = 0;
            for (int i2 = 0; i2 < 60; i2++) {
                int i3 = ((i - i2) + 60) % 60;
                if (i2 < window) {
                    j += this.last60[i3];
                }
                j2 += this.last60[i3];
            }
            return Math.max((j * 60) / 3, j2);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void setMaxEventsPerMinute(long j) {
            this._throttle = j;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void setMaxVolatileTime(long j) {
            this._maxVolatileTimeInMillis = j;
        }
    }

    public StreamDataProvider(Comparator<String> comparator) {
        this._versionComparator = comparator;
    }

    public void setRetryTime(int i) {
        this._retryTime = i;
    }

    public int getRetryTime() {
        return this._retryTime;
    }

    public void setDataConsumer(DataConsumer<D> dataConsumer) {
        this._consumer = dataConsumer;
    }

    public DataConsumer<D> getDataConsumer() {
        return this._consumer;
    }

    public abstract DataConsumer.DataEvent<D> next();

    public abstract void setStartingOffset(String str);

    public abstract void reset();

    @Override // proj.zoie.mbean.DataProviderAdminMBean
    public int getBatchSize() {
        return this._batchSize;
    }

    @Override // proj.zoie.mbean.DataProviderAdminMBean
    public long getEventsPerMinute() {
        DataThread<D> dataThread = this._thread;
        if (dataThread == null) {
            return 0L;
        }
        return dataThread.getEventsPerMinute();
    }

    @Override // proj.zoie.mbean.DataProviderAdminMBean
    public long getMaxEventsPerMinute() {
        return this._maxEventsPerMinute;
    }

    @Override // proj.zoie.mbean.DataProviderAdminMBean
    public void setMaxEventsPerMinute(long j) {
        this._maxEventsPerMinute = j;
        DataThread<D> dataThread = this._thread;
        if (dataThread == null) {
            return;
        }
        dataThread.setMaxEventsPerMinute(this._maxEventsPerMinute);
    }

    public void setMaxVolatileTime(long j) {
        this._maxVolatileTimeInMillis = j;
        DataThread<D> dataThread = this._thread;
        if (dataThread == null) {
            return;
        }
        dataThread.setMaxVolatileTime(this._maxVolatileTimeInMillis);
    }

    @Override // proj.zoie.mbean.DataProviderAdminMBean
    public String getStatus() {
        DataThread<D> dataThread = this._thread;
        return dataThread == null ? "dead" : dataThread.getStatus() + " : " + dataThread.getState();
    }

    @Override // proj.zoie.mbean.DataProviderAdminMBean
    public void pause() {
        if (this._thread != null) {
            this._thread.pauseDataFeed();
        }
    }

    @Override // proj.zoie.mbean.DataProviderAdminMBean
    public void resume() {
        if (this._thread != null) {
            this._thread.resumeDataFeed();
        }
    }

    @Override // proj.zoie.mbean.DataProviderAdminMBean
    public void setBatchSize(int i) {
        this._batchSize = Math.max(1, i);
    }

    @Override // proj.zoie.mbean.DataProviderAdminMBean
    public long getEventCount() {
        if (this._thread != null) {
            return this._thread.getEventCount();
        }
        return 0L;
    }

    public void stop() {
        if (this._thread == null || !this._thread.isAlive()) {
            return;
        }
        this._thread.terminate();
        try {
            this._thread.join();
        } catch (InterruptedException e) {
            log.warn("stopping interrupted");
        }
    }

    @Override // proj.zoie.mbean.DataProviderAdminMBean
    public void start() {
        if (this._thread == null || !this._thread.isAlive()) {
            reset();
            this._thread = new DataThread<>(this);
            this._thread.setMaxEventsPerMinute(this._maxEventsPerMinute);
            this._thread.setMaxVolatileTime(this._maxVolatileTimeInMillis);
            this._thread.start();
        }
    }

    public void syncWithVersion(long j, String str) throws ZoieException {
        this._thread.syncWithVersion(j, str);
    }
}
