package com.alibaba.otter.canal.store.memory;

import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.position.LogPosition;
import com.alibaba.otter.canal.protocol.position.Position;
import com.alibaba.otter.canal.protocol.position.PositionRange;
import com.alibaba.otter.canal.store.AbstractCanalStoreScavenge;
import com.alibaba.otter.canal.store.CanalEventStore;
import com.alibaba.otter.canal.store.CanalStoreException;
import com.alibaba.otter.canal.store.CanalStoreScavenge;
import com.alibaba.otter.canal.store.helper.CanalEventUtils;
import com.alibaba.otter.canal.store.model.BatchMode;
import com.alibaba.otter.canal.store.model.Event;
import com.alibaba.otter.canal.store.model.Events;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/* loaded from: input_file:com/alibaba/otter/canal/store/memory/MemoryEventStoreWithBuffer.class */
public class MemoryEventStoreWithBuffer extends AbstractCanalStoreScavenge implements CanalEventStore<Event>, CanalStoreScavenge {
    private static final long INIT_SQEUENCE = -1;
    private int bufferSize;
    private int bufferMemUnit;
    private int indexMask;
    private Event[] entries;
    private AtomicLong putSequence;
    private AtomicLong getSequence;
    private AtomicLong ackSequence;
    private AtomicLong putMemSize;
    private AtomicLong getMemSize;
    private AtomicLong ackMemSize;
    private ReentrantLock lock;
    private Condition notFull;
    private Condition notEmpty;
    private BatchMode batchMode;
    private boolean ddlIsolation;

    public MemoryEventStoreWithBuffer() {
        this.bufferSize = 16384;
        this.bufferMemUnit = 1024;
        this.putSequence = new AtomicLong(INIT_SQEUENCE);
        this.getSequence = new AtomicLong(INIT_SQEUENCE);
        this.ackSequence = new AtomicLong(INIT_SQEUENCE);
        this.putMemSize = new AtomicLong(0L);
        this.getMemSize = new AtomicLong(0L);
        this.ackMemSize = new AtomicLong(0L);
        this.lock = new ReentrantLock();
        this.notFull = this.lock.newCondition();
        this.notEmpty = this.lock.newCondition();
        this.batchMode = BatchMode.ITEMSIZE;
        this.ddlIsolation = false;
    }

    public MemoryEventStoreWithBuffer(BatchMode batchMode) {
        this.bufferSize = 16384;
        this.bufferMemUnit = 1024;
        this.putSequence = new AtomicLong(INIT_SQEUENCE);
        this.getSequence = new AtomicLong(INIT_SQEUENCE);
        this.ackSequence = new AtomicLong(INIT_SQEUENCE);
        this.putMemSize = new AtomicLong(0L);
        this.getMemSize = new AtomicLong(0L);
        this.ackMemSize = new AtomicLong(0L);
        this.lock = new ReentrantLock();
        this.notFull = this.lock.newCondition();
        this.notEmpty = this.lock.newCondition();
        this.batchMode = BatchMode.ITEMSIZE;
        this.ddlIsolation = false;
        this.batchMode = batchMode;
    }

    public void start() throws CanalStoreException {
        super.start();
        if (Integer.bitCount(this.bufferSize) != 1) {
            throw new IllegalArgumentException("bufferSize must be a power of 2");
        }
        this.indexMask = this.bufferSize - 1;
        this.entries = new Event[this.bufferSize];
    }

    public void stop() throws CanalStoreException {
        super.stop();
        cleanAll();
    }

    @Override // com.alibaba.otter.canal.store.CanalEventStore
    public void put(List<Event> list) throws InterruptedException, CanalStoreException {
        if (list == null || list.isEmpty()) {
            return;
        }
        ReentrantLock reentrantLock = this.lock;
        reentrantLock.lockInterruptibly();
        while (!checkFreeSlotAt(this.putSequence.get() + list.size())) {
            try {
                try {
                    this.notFull.await();
                } catch (InterruptedException e) {
                    this.notFull.signal();
                    throw e;
                }
            } finally {
                reentrantLock.unlock();
            }
        }
        doPut(list);
        if (Thread.interrupted()) {
            throw new InterruptedException();
        }
    }

    @Override // com.alibaba.otter.canal.store.CanalEventStore
    public boolean put(List<Event> list, long j, TimeUnit timeUnit) throws InterruptedException, CanalStoreException {
        if (list == null || list.isEmpty()) {
            return true;
        }
        long nanos = timeUnit.toNanos(j);
        ReentrantLock reentrantLock = this.lock;
        reentrantLock.lockInterruptibly();
        while (!checkFreeSlotAt(this.putSequence.get() + list.size())) {
            try {
                if (nanos <= 0) {
                    return false;
                }
                try {
                    nanos = this.notFull.awaitNanos(nanos);
                } catch (InterruptedException e) {
                    this.notFull.signal();
                    throw e;
                }
            } finally {
                reentrantLock.unlock();
            }
        }
        doPut(list);
        reentrantLock.unlock();
        return true;
    }

    @Override // com.alibaba.otter.canal.store.CanalEventStore
    public boolean tryPut(List<Event> list) throws CanalStoreException {
        if (list == null || list.isEmpty()) {
            return true;
        }
        ReentrantLock reentrantLock = this.lock;
        reentrantLock.lock();
        try {
            if (!checkFreeSlotAt(this.putSequence.get() + list.size())) {
                return false;
            }
            doPut(list);
            reentrantLock.unlock();
            return true;
        } finally {
            reentrantLock.unlock();
        }
    }

    @Override // com.alibaba.otter.canal.store.CanalEventStore
    public void put(Event event) throws InterruptedException, CanalStoreException {
        put(Arrays.asList(event));
    }

    @Override // com.alibaba.otter.canal.store.CanalEventStore
    public boolean put(Event event, long j, TimeUnit timeUnit) throws InterruptedException, CanalStoreException {
        return put(Arrays.asList(event), j, timeUnit);
    }

    @Override // com.alibaba.otter.canal.store.CanalEventStore
    public boolean tryPut(Event event) throws CanalStoreException {
        return tryPut(Arrays.asList(event));
    }

    private void doPut(List<Event> list) {
        long j = this.putSequence.get();
        long size = j + list.size();
        long j2 = j;
        while (true) {
            long j3 = j2 + 1;
            if (j3 > size) {
                break;
            }
            this.entries[getIndex(j3)] = list.get((int) ((j3 - j) - 1));
            j2 = j3;
        }
        this.putSequence.set(size);
        if (this.batchMode.isMemSize()) {
            long j4 = 0;
            Iterator<Event> it = list.iterator();
            while (it.hasNext()) {
                j4 += calculateSize(it.next());
            }
            this.putMemSize.getAndAdd(j4);
        }
        this.notEmpty.signal();
    }

    @Override // com.alibaba.otter.canal.store.CanalEventStore
    public Events<Event> get(Position position, int i) throws InterruptedException, CanalStoreException {
        ReentrantLock reentrantLock = this.lock;
        reentrantLock.lockInterruptibly();
        while (!checkUnGetSlotAt((LogPosition) position, i)) {
            try {
                try {
                    this.notEmpty.await();
                } catch (InterruptedException e) {
                    this.notEmpty.signal();
                    throw e;
                }
            } catch (Throwable th) {
                reentrantLock.unlock();
                throw th;
            }
        }
        Events<Event> doGet = doGet(position, i);
        reentrantLock.unlock();
        return doGet;
    }

    @Override // com.alibaba.otter.canal.store.CanalEventStore
    public Events<Event> get(Position position, int i, long j, TimeUnit timeUnit) throws InterruptedException, CanalStoreException {
        long nanos = timeUnit.toNanos(j);
        ReentrantLock reentrantLock = this.lock;
        reentrantLock.lockInterruptibly();
        while (!checkUnGetSlotAt((LogPosition) position, i)) {
            try {
                if (nanos <= 0) {
                    Events<Event> doGet = doGet(position, i);
                    reentrantLock.unlock();
                    return doGet;
                }
                try {
                    nanos = this.notEmpty.awaitNanos(nanos);
                } catch (InterruptedException e) {
                    this.notEmpty.signal();
                    throw e;
                }
            } catch (Throwable th) {
                reentrantLock.unlock();
                throw th;
            }
        }
        Events<Event> doGet2 = doGet(position, i);
        reentrantLock.unlock();
        return doGet2;
    }

    @Override // com.alibaba.otter.canal.store.CanalEventStore
    public Events<Event> tryGet(Position position, int i) throws CanalStoreException {
        ReentrantLock reentrantLock = this.lock;
        reentrantLock.lock();
        try {
            Events<Event> doGet = doGet(position, i);
            reentrantLock.unlock();
            return doGet;
        } catch (Throwable th) {
            reentrantLock.unlock();
            throw th;
        }
    }

    private Events<Event> doGet(Position position, int i) throws CanalStoreException {
        LogPosition logPosition = (LogPosition) position;
        long j = this.getSequence.get();
        long j2 = this.putSequence.get();
        long j3 = j;
        long j4 = j;
        if (logPosition == null || !logPosition.getPostion().isIncluded()) {
            j3++;
        }
        if (j >= j2) {
            return new Events<>();
        }
        Events<Event> events = new Events<>();
        List<Event> events2 = events.getEvents();
        long j5 = 0;
        if (!this.batchMode.isItemSize()) {
            long j6 = i * this.bufferMemUnit;
            while (true) {
                if (j5 > j6 || j3 > j2) {
                    break;
                }
                Event event = this.entries[getIndex(j3)];
                if (!this.ddlIsolation || !isDdl(event.getEntry().getHeader().getEventType())) {
                    events2.add(event);
                    j5 += calculateSize(event);
                    j4 = j3;
                    j3++;
                } else if (events2.size() == 0) {
                    events2.add(event);
                    j4 = j3;
                } else {
                    j4 = j3 - 1;
                }
            }
        } else {
            j4 = (j3 + ((long) i)) - 1 < j2 ? (j3 + i) - 1 : j2;
            while (true) {
                if (j3 > j4) {
                    break;
                }
                Event event2 = this.entries[getIndex(j3)];
                if (!this.ddlIsolation || !isDdl(event2.getEntry().getHeader().getEventType())) {
                    events2.add(event2);
                    j3++;
                } else if (events2.size() == 0) {
                    events2.add(event2);
                    j4 = j3;
                } else {
                    j4 = j3 - 1;
                }
            }
        }
        PositionRange positionRange = new PositionRange();
        events.setPositionRange(positionRange);
        positionRange.setStart(CanalEventUtils.createPosition(events2.get(0)));
        positionRange.setEnd(CanalEventUtils.createPosition(events2.get(events.getEvents().size() - 1)));
        for (int size = events2.size() - 1; size >= 0; size--) {
            Event event3 = events2.get(size);
            if (CanalEntry.EntryType.TRANSACTIONBEGIN == event3.getEntry().getEntryType() || CanalEntry.EntryType.TRANSACTIONEND == event3.getEntry().getEntryType() || isDdl(event3.getEntry().getHeader().getEventType())) {
                positionRange.setAck(CanalEventUtils.createPosition(event3));
                break;
            }
        }
        if (!this.getSequence.compareAndSet(j, j4)) {
            return new Events<>();
        }
        this.getMemSize.addAndGet(j5);
        this.notFull.signal();
        return events;
    }

    @Override // com.alibaba.otter.canal.store.CanalEventStore
    /* renamed from: getFirstPosition, reason: merged with bridge method [inline-methods] */
    public LogPosition mo0getFirstPosition() throws CanalStoreException {
        ReentrantLock reentrantLock = this.lock;
        reentrantLock.lock();
        try {
            long j = this.ackSequence.get();
            if (j == INIT_SQEUENCE && j < this.putSequence.get()) {
                LogPosition createPosition = CanalEventUtils.createPosition(this.entries[getIndex(j + 1)], false);
                reentrantLock.unlock();
                return createPosition;
            }
            if (j > INIT_SQEUENCE && j < this.putSequence.get()) {
                LogPosition createPosition2 = CanalEventUtils.createPosition(this.entries[getIndex(j + 1)], true);
                reentrantLock.unlock();
                return createPosition2;
            }
            if (j <= INIT_SQEUENCE || j != this.putSequence.get()) {
                return null;
            }
            LogPosition createPosition3 = CanalEventUtils.createPosition(this.entries[getIndex(j)], false);
            reentrantLock.unlock();
            return createPosition3;
        } finally {
            reentrantLock.unlock();
        }
    }

    @Override // com.alibaba.otter.canal.store.CanalEventStore
    /* renamed from: getLatestPosition, reason: merged with bridge method [inline-methods] */
    public LogPosition mo1getLatestPosition() throws CanalStoreException {
        ReentrantLock reentrantLock = this.lock;
        reentrantLock.lock();
        try {
            long j = this.putSequence.get();
            if (j > INIT_SQEUENCE && j != this.ackSequence.get()) {
                LogPosition createPosition = CanalEventUtils.createPosition(this.entries[((int) this.putSequence.get()) & this.indexMask], true);
                reentrantLock.unlock();
                return createPosition;
            }
            if (j <= INIT_SQEUENCE || j != this.ackSequence.get()) {
                return null;
            }
            LogPosition createPosition2 = CanalEventUtils.createPosition(this.entries[((int) this.putSequence.get()) & this.indexMask], false);
            reentrantLock.unlock();
            return createPosition2;
        } finally {
            reentrantLock.unlock();
        }
    }

    @Override // com.alibaba.otter.canal.store.CanalEventStore
    public void ack(Position position) throws CanalStoreException {
        cleanUntil(position);
    }

    @Override // com.alibaba.otter.canal.store.CanalStoreScavenge
    public void cleanUntil(Position position) throws CanalStoreException {
        ReentrantLock reentrantLock = this.lock;
        reentrantLock.lock();
        try {
            long j = this.ackSequence.get();
            long j2 = this.getSequence.get();
            boolean z = false;
            long j3 = 0;
            for (long j4 = j + 1; j4 <= j2; j4++) {
                Event event = this.entries[getIndex(j4)];
                j3 += calculateSize(event);
                if (CanalEventUtils.checkPosition(event, (LogPosition) position)) {
                    z = true;
                    if (this.batchMode.isMemSize()) {
                        this.ackMemSize.addAndGet(j3);
                        for (long j5 = j + 1; j5 < j4; j5++) {
                            this.entries[getIndex(j5)] = null;
                        }
                    }
                    if (this.ackSequence.compareAndSet(j, j4)) {
                        this.notFull.signal();
                        reentrantLock.unlock();
                        return;
                    }
                }
            }
            if (!z) {
                throw new CanalStoreException("no match ack position" + position.toString());
            }
        } finally {
            reentrantLock.unlock();
        }
    }

    @Override // com.alibaba.otter.canal.store.CanalEventStore
    public void rollback() throws CanalStoreException {
        ReentrantLock reentrantLock = this.lock;
        reentrantLock.lock();
        try {
            this.getSequence.set(this.ackSequence.get());
            this.getMemSize.set(this.ackMemSize.get());
            reentrantLock.unlock();
        } catch (Throwable th) {
            reentrantLock.unlock();
            throw th;
        }
    }

    @Override // com.alibaba.otter.canal.store.CanalStoreScavenge
    public void cleanAll() throws CanalStoreException {
        ReentrantLock reentrantLock = this.lock;
        reentrantLock.lock();
        try {
            this.putSequence.set(INIT_SQEUENCE);
            this.getSequence.set(INIT_SQEUENCE);
            this.ackSequence.set(INIT_SQEUENCE);
            this.putMemSize.set(0L);
            this.getMemSize.set(0L);
            this.ackMemSize.set(0L);
            this.entries = null;
            reentrantLock.unlock();
        } catch (Throwable th) {
            reentrantLock.unlock();
            throw th;
        }
    }

    private long getMinimumGetOrAck() {
        long j = this.getSequence.get();
        long j2 = this.ackSequence.get();
        return j2 <= j ? j2 : j;
    }

    private boolean checkFreeSlotAt(long j) {
        if (j - this.bufferSize > getMinimumGetOrAck()) {
            return false;
        }
        return !this.batchMode.isMemSize() || this.putMemSize.get() - this.ackMemSize.get() < ((long) (this.bufferSize * this.bufferMemUnit));
    }

    private boolean checkUnGetSlotAt(LogPosition logPosition, int i) {
        if (!this.batchMode.isItemSize()) {
            return this.putMemSize.get() - this.getMemSize.get() >= ((long) (i * this.bufferMemUnit));
        }
        long j = this.getSequence.get();
        long j2 = this.putSequence.get();
        long j3 = j;
        if (logPosition == null || !logPosition.getPostion().isIncluded()) {
            j3++;
        }
        return j < j2 && (j3 + ((long) i)) - 1 <= j2;
    }

    private long calculateSize(Event event) {
        return event.getEntry().getHeader().getEventLength();
    }

    private int getIndex(long j) {
        return ((int) j) & this.indexMask;
    }

    private boolean isDdl(CanalEntry.EventType eventType) {
        return eventType == CanalEntry.EventType.ALTER || eventType == CanalEntry.EventType.CREATE || eventType == CanalEntry.EventType.ERASE || eventType == CanalEntry.EventType.RENAME || eventType == CanalEntry.EventType.TRUNCATE || eventType == CanalEntry.EventType.CINDEX || eventType == CanalEntry.EventType.DINDEX;
    }

    public void setBufferSize(int i) {
        this.bufferSize = i;
    }

    public void setBufferMemUnit(int i) {
        this.bufferMemUnit = i;
    }

    public void setBatchMode(BatchMode batchMode) {
        this.batchMode = batchMode;
    }

    public void setDdlIsolation(boolean z) {
        this.ddlIsolation = z;
    }
}
