/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.otter.canal.store.memory;

import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.position.LogPosition;
import com.alibaba.otter.canal.protocol.position.Position;
import com.alibaba.otter.canal.protocol.position.PositionRange;
import com.alibaba.otter.canal.store.AbstractCanalStoreScavenge;
import com.alibaba.otter.canal.store.CanalEventStore;
import com.alibaba.otter.canal.store.CanalStoreException;
import com.alibaba.otter.canal.store.CanalStoreScavenge;
import com.alibaba.otter.canal.store.helper.CanalEventUtils;
import com.alibaba.otter.canal.store.model.BatchMode;
import com.alibaba.otter.canal.store.model.Event;
import com.alibaba.otter.canal.store.model.Events;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.lang.StringUtils;

public class MemoryEventStoreWithBuffer
extends AbstractCanalStoreScavenge
implements CanalEventStore<Event>,
CanalStoreScavenge {
    private static final long INIT_SEQUENCE = -1L;
    private int bufferSize = 16384;
    private int bufferMemUnit = 1024;
    private int indexMask;
    private Event[] entries;
    private AtomicLong putSequence = new AtomicLong(-1L);
    private AtomicLong getSequence = new AtomicLong(-1L);
    private AtomicLong ackSequence = new AtomicLong(-1L);
    private AtomicLong putMemSize = new AtomicLong(0L);
    private AtomicLong getMemSize = new AtomicLong(0L);
    private AtomicLong ackMemSize = new AtomicLong(0L);
    private AtomicLong putExecTime = new AtomicLong(System.currentTimeMillis());
    private AtomicLong getExecTime = new AtomicLong(System.currentTimeMillis());
    private AtomicLong ackExecTime = new AtomicLong(System.currentTimeMillis());
    private AtomicLong putTableRows = new AtomicLong(0L);
    private AtomicLong getTableRows = new AtomicLong(0L);
    private AtomicLong ackTableRows = new AtomicLong(0L);
    private ReentrantLock lock = new ReentrantLock();
    private Condition notFull = this.lock.newCondition();
    private Condition notEmpty = this.lock.newCondition();
    private BatchMode batchMode = BatchMode.ITEMSIZE;
    private boolean ddlIsolation = false;
    private boolean raw = true;

    public MemoryEventStoreWithBuffer() {
    }

    public MemoryEventStoreWithBuffer(BatchMode batchMode) {
        this.batchMode = batchMode;
    }

    public void start() throws CanalStoreException {
        super.start();
        if (Integer.bitCount(this.bufferSize) != 1) {
            throw new IllegalArgumentException("bufferSize must be a power of 2");
        }
        this.indexMask = this.bufferSize - 1;
        this.entries = new Event[this.bufferSize];
    }

    public void stop() throws CanalStoreException {
        super.stop();
        this.cleanAll();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void put(List<Event> data) throws InterruptedException, CanalStoreException {
        if (data == null || data.isEmpty()) {
            return;
        }
        ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            try {
                while (!this.checkFreeSlotAt(this.putSequence.get() + (long)data.size())) {
                    this.notFull.await();
                }
            }
            catch (InterruptedException ie) {
                this.notFull.signal();
                throw ie;
            }
            this.doPut(data);
            if (Thread.interrupted()) {
                throw new InterruptedException();
            }
        }
        finally {
            lock.unlock();
        }
    }

    /*
     * Exception decompiling
     */
    @Override
    public boolean put(List<Event> data, long timeout, TimeUnit unit) throws InterruptedException, CanalStoreException {
        /*
         * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
         * 
         * org.benf.cfr.reader.util.ConfusedCFRException: Tried to end blocks [0[TRYBLOCK]], but top level block is 7[UNCONDITIONALDOLOOP]
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.processEndingBlocks(Op04StructuredStatement.java:435)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:484)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
         *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
         *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
         *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
         *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
         *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
         *     at org.benf.cfr.reader.Main.main(Main.java:54)
         */
        throw new IllegalStateException("Decompilation failed");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean tryPut(List<Event> data) throws CanalStoreException {
        if (data == null || data.isEmpty()) {
            return true;
        }
        ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (!this.checkFreeSlotAt(this.putSequence.get() + (long)data.size())) {
                boolean bl = false;
                return bl;
            }
            this.doPut(data);
            boolean bl = true;
            return bl;
        }
        finally {
            lock.unlock();
        }
    }

    @Override
    public void put(Event data) throws InterruptedException, CanalStoreException {
        this.put(Arrays.asList(data));
    }

    @Override
    public boolean put(Event data, long timeout, TimeUnit unit) throws InterruptedException, CanalStoreException {
        return this.put(Arrays.asList(data), timeout, unit);
    }

    @Override
    public boolean tryPut(Event data) throws CanalStoreException {
        return this.tryPut(Arrays.asList(data));
    }

    private void doPut(List<Event> data) {
        long current = this.putSequence.get();
        long end = current + (long)data.size();
        for (long next = current + 1L; next <= end; ++next) {
            this.entries[this.getIndex((long)next)] = data.get((int)(next - current - 1L));
        }
        this.putSequence.set(end);
        if (this.batchMode.isMemSize()) {
            long size = 0L;
            for (Event event : data) {
                size += this.calculateSize(event);
            }
            this.putMemSize.getAndAdd(size);
        }
        this.profiling(data, OP.PUT);
        this.notEmpty.signal();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Events<Event> get(Position start, int batchSize) throws InterruptedException, CanalStoreException {
        ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            try {
                while (!this.checkUnGetSlotAt((LogPosition)start, batchSize)) {
                    this.notEmpty.await();
                }
            }
            catch (InterruptedException ie) {
                this.notEmpty.signal();
                throw ie;
            }
            Events<Event> events = this.doGet(start, batchSize);
            return events;
        }
        finally {
            lock.unlock();
        }
    }

    /*
     * Exception decompiling
     */
    @Override
    public Events<Event> get(Position start, int batchSize, long timeout, TimeUnit unit) throws InterruptedException, CanalStoreException {
        /*
         * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
         * 
         * org.benf.cfr.reader.util.ConfusedCFRException: Tried to end blocks [0[TRYBLOCK]], but top level block is 7[UNCONDITIONALDOLOOP]
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.processEndingBlocks(Op04StructuredStatement.java:435)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:484)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
         *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
         *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
         *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
         *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
         *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
         *     at org.benf.cfr.reader.Main.main(Main.java:54)
         */
        throw new IllegalStateException("Decompilation failed");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Events<Event> tryGet(Position start, int batchSize) throws CanalStoreException {
        ReentrantLock lock = this.lock;
        lock.lock();
        try {
            Events<Event> events = this.doGet(start, batchSize);
            return events;
        }
        finally {
            lock.unlock();
        }
    }

    private Events<Event> doGet(Position start, int batchSize) throws CanalStoreException {
        long memsize;
        Event event;
        LogPosition startPosition = (LogPosition)start;
        long current = this.getSequence.get();
        long maxAbleSequence = this.putSequence.get();
        long next = current;
        long end = current;
        if (startPosition == null || !startPosition.getPostion().isIncluded()) {
            ++next;
        }
        if (current >= maxAbleSequence) {
            return new Events<Event>();
        }
        Events<Event> result = new Events<Event>();
        List entrys = result.getEvents();
        if (this.batchMode.isItemSize()) {
            long l = end = next + (long)batchSize - 1L < maxAbleSequence ? next + (long)batchSize - 1L : maxAbleSequence;
            while (next <= end) {
                Event event2 = this.entries[this.getIndex(next)];
                if (this.ddlIsolation && this.isDdl(event2.getEventType())) {
                    if (entrys.size() == 0) {
                        entrys.add(event2);
                        end = next;
                    } else {
                        end = next - 1L;
                    }
                    break;
                }
                entrys.add(event2);
                ++next;
            }
        } else {
            long maxMemSize = batchSize * this.bufferMemUnit;
            for (memsize = 0L; memsize <= maxMemSize && next <= maxAbleSequence; memsize += this.calculateSize(event)) {
                event = this.entries[this.getIndex(next)];
                if (this.ddlIsolation && this.isDdl(event.getEventType())) {
                    if (entrys.size() == 0) {
                        entrys.add(event);
                        end = next;
                    } else {
                        end = next - 1L;
                    }
                    break;
                }
                entrys.add(event);
                end = next++;
            }
        }
        PositionRange range = new PositionRange();
        result.setPositionRange(range);
        range.setStart((Position)CanalEventUtils.createPosition((Event)entrys.get(0)));
        range.setEnd((Position)CanalEventUtils.createPosition((Event)entrys.get(result.getEvents().size() - 1)));
        range.setEndSeq(Long.valueOf(end));
        for (int i = entrys.size() - 1; i >= 0; --i) {
            event = (Event)entrys.get(i);
            if ((CanalEntry.EntryType.TRANSACTIONBEGIN != event.getEntryType() || !StringUtils.isEmpty((String)event.getGtid())) && CanalEntry.EntryType.TRANSACTIONEND != event.getEntryType() && !this.isDdl(event.getEventType())) continue;
            range.setAck((Position)CanalEventUtils.createPosition(event));
            break;
        }
        if (this.getSequence.compareAndSet(current, end)) {
            this.getMemSize.addAndGet(memsize);
            this.notFull.signal();
            this.profiling(result.getEvents(), OP.GET);
            return result;
        }
        return new Events<Event>();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public LogPosition getFirstPosition() throws CanalStoreException {
        ReentrantLock lock = this.lock;
        lock.lock();
        try {
            long firstSeqeuence = this.ackSequence.get();
            if (firstSeqeuence == -1L && firstSeqeuence < this.putSequence.get()) {
                Event event = this.entries[this.getIndex(firstSeqeuence + 1L)];
                LogPosition logPosition = CanalEventUtils.createPosition(event, false);
                return logPosition;
            }
            if (firstSeqeuence > -1L && firstSeqeuence < this.putSequence.get()) {
                Event event = this.entries[this.getIndex(firstSeqeuence)];
                LogPosition logPosition = CanalEventUtils.createPosition(event, false);
                return logPosition;
            }
            if (firstSeqeuence > -1L && firstSeqeuence == this.putSequence.get()) {
                Event event = this.entries[this.getIndex(firstSeqeuence)];
                LogPosition logPosition = CanalEventUtils.createPosition(event, false);
                return logPosition;
            }
            LogPosition logPosition = null;
            return logPosition;
        }
        finally {
            lock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public LogPosition getLatestPosition() throws CanalStoreException {
        ReentrantLock lock = this.lock;
        lock.lock();
        try {
            long latestSequence = this.putSequence.get();
            if (latestSequence > -1L && latestSequence != this.ackSequence.get()) {
                Event event = this.entries[(int)this.putSequence.get() & this.indexMask];
                LogPosition logPosition = CanalEventUtils.createPosition(event, true);
                return logPosition;
            }
            if (latestSequence > -1L && latestSequence == this.ackSequence.get()) {
                Event event = this.entries[(int)this.putSequence.get() & this.indexMask];
                LogPosition logPosition = CanalEventUtils.createPosition(event, false);
                return logPosition;
            }
            LogPosition logPosition = null;
            return logPosition;
        }
        finally {
            lock.unlock();
        }
    }

    @Override
    public void ack(Position position) throws CanalStoreException {
        this.cleanUntil(position, -1L);
    }

    @Override
    public void ack(Position position, Long seqId) throws CanalStoreException {
        this.cleanUntil(position, seqId);
    }

    @Override
    public void cleanUntil(Position position) throws CanalStoreException {
        this.cleanUntil(position, -1L);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void cleanUntil(Position position, Long seqId) throws CanalStoreException {
        ReentrantLock lock = this.lock;
        lock.lock();
        try {
            long sequence = this.ackSequence.get();
            long maxSequence = this.getSequence.get();
            boolean hasMatch = false;
            long memsize = 0L;
            long localExecTime = 0L;
            int deltaRows = 0;
            if (seqId > 0L) {
                maxSequence = seqId;
            }
            for (long next = sequence + 1L; next <= maxSequence; ++next) {
                Event event = this.entries[this.getIndex(next)];
                if (localExecTime == 0L && event.getExecuteTime() > 0L) {
                    localExecTime = event.getExecuteTime();
                }
                deltaRows += event.getRowsCount();
                memsize += this.calculateSize(event);
                if (seqId >= 0L && next != seqId || !CanalEventUtils.checkPosition(event, (LogPosition)position)) continue;
                hasMatch = true;
                if (this.batchMode.isMemSize()) {
                    this.ackMemSize.addAndGet(memsize);
                    for (long index = sequence + 1L; index < next; ++index) {
                        this.entries[this.getIndex((long)index)] = null;
                    }
                    Event lastEvent = this.entries[this.getIndex(next)];
                    lastEvent.setEntry(null);
                    lastEvent.setRawEntry(null);
                }
                if (!this.ackSequence.compareAndSet(sequence, next)) continue;
                this.notFull.signal();
                this.ackTableRows.addAndGet(deltaRows);
                if (localExecTime > 0L) {
                    this.ackExecTime.lazySet(localExecTime);
                }
                return;
            }
            if (!hasMatch) {
                throw new CanalStoreException("no match ack position" + position.toString());
            }
        }
        finally {
            lock.unlock();
        }
    }

    @Override
    public void rollback() throws CanalStoreException {
        ReentrantLock lock = this.lock;
        lock.lock();
        try {
            this.getSequence.set(this.ackSequence.get());
            this.getMemSize.set(this.ackMemSize.get());
        }
        finally {
            lock.unlock();
        }
    }

    @Override
    public void cleanAll() throws CanalStoreException {
        ReentrantLock lock = this.lock;
        lock.lock();
        try {
            this.putSequence.set(-1L);
            this.getSequence.set(-1L);
            this.ackSequence.set(-1L);
            this.putMemSize.set(0L);
            this.getMemSize.set(0L);
            this.ackMemSize.set(0L);
            this.entries = null;
        }
        finally {
            lock.unlock();
        }
    }

    private long getMinimumGetOrAck() {
        long get = this.getSequence.get();
        long ack = this.ackSequence.get();
        return ack <= get ? ack : get;
    }

    private boolean checkFreeSlotAt(long sequence) {
        long wrapPoint = sequence - (long)this.bufferSize;
        long minPoint = this.getMinimumGetOrAck();
        if (wrapPoint > minPoint) {
            return false;
        }
        if (this.batchMode.isMemSize()) {
            long memsize = this.putMemSize.get() - this.ackMemSize.get();
            return memsize < (long)(this.bufferSize * this.bufferMemUnit);
        }
        return true;
    }

    private boolean checkUnGetSlotAt(LogPosition startPosition, int batchSize) {
        if (this.batchMode.isItemSize()) {
            long current = this.getSequence.get();
            long maxAbleSequence = this.putSequence.get();
            long next = current;
            if (startPosition == null || !startPosition.getPostion().isIncluded()) {
                ++next;
            }
            return current < maxAbleSequence && next + (long)batchSize - 1L <= maxAbleSequence;
        }
        long currentSize = this.getMemSize.get();
        long maxAbleSize = this.putMemSize.get();
        return maxAbleSize - currentSize >= (long)(batchSize * this.bufferMemUnit);
    }

    private long calculateSize(Event event) {
        return event.getRawLength();
    }

    private int getIndex(long sequcnce) {
        return (int)sequcnce & this.indexMask;
    }

    private boolean isDdl(CanalEntry.EventType type) {
        return type == CanalEntry.EventType.ALTER || type == CanalEntry.EventType.CREATE || type == CanalEntry.EventType.ERASE || type == CanalEntry.EventType.RENAME || type == CanalEntry.EventType.TRUNCATE || type == CanalEntry.EventType.CINDEX || type == CanalEntry.EventType.DINDEX;
    }

    private void profiling(List<Event> events, OP op) {
        long localExecTime = 0L;
        int deltaRows = 0;
        if (events != null && !events.isEmpty()) {
            for (Event e : events) {
                if (localExecTime == 0L && e.getExecuteTime() > 0L) {
                    localExecTime = e.getExecuteTime();
                }
                deltaRows += e.getRowsCount();
            }
        }
        switch (op) {
            case PUT: {
                this.putTableRows.addAndGet(deltaRows);
                if (localExecTime <= 0L) break;
                this.putExecTime.lazySet(localExecTime);
                break;
            }
            case GET: {
                this.getTableRows.addAndGet(deltaRows);
                if (localExecTime <= 0L) break;
                this.getExecTime.lazySet(localExecTime);
                break;
            }
            case ACK: {
                this.ackTableRows.addAndGet(deltaRows);
                if (localExecTime <= 0L) break;
                this.ackExecTime.lazySet(localExecTime);
                break;
            }
        }
    }

    public int getBufferSize() {
        return this.bufferSize;
    }

    public void setBufferSize(int bufferSize) {
        this.bufferSize = bufferSize;
    }

    public void setBufferMemUnit(int bufferMemUnit) {
        this.bufferMemUnit = bufferMemUnit;
    }

    public void setBatchMode(BatchMode batchMode) {
        this.batchMode = batchMode;
    }

    public void setDdlIsolation(boolean ddlIsolation) {
        this.ddlIsolation = ddlIsolation;
    }

    public boolean isRaw() {
        return this.raw;
    }

    public void setRaw(boolean raw) {
        this.raw = raw;
    }

    public AtomicLong getPutSequence() {
        return this.putSequence;
    }

    public AtomicLong getAckSequence() {
        return this.ackSequence;
    }

    public AtomicLong getPutMemSize() {
        return this.putMemSize;
    }

    public AtomicLong getAckMemSize() {
        return this.ackMemSize;
    }

    public BatchMode getBatchMode() {
        return this.batchMode;
    }

    public AtomicLong getPutExecTime() {
        return this.putExecTime;
    }

    public AtomicLong getGetExecTime() {
        return this.getExecTime;
    }

    public AtomicLong getAckExecTime() {
        return this.ackExecTime;
    }

    public AtomicLong getPutTableRows() {
        return this.putTableRows;
    }

    public AtomicLong getGetTableRows() {
        return this.getTableRows;
    }

    public AtomicLong getAckTableRows() {
        return this.ackTableRows;
    }

    private static enum OP {
        PUT,
        GET,
        ACK;

    }
}

