package io.debezium.connector.oracle.logminer.processor.infinispan;

import io.debezium.connector.oracle.OracleConnection;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.OracleDatabaseSchema;
import io.debezium.connector.oracle.OracleOffsetContext;
import io.debezium.connector.oracle.OraclePartition;
import io.debezium.connector.oracle.Scn;
import io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSourceMetrics;
import io.debezium.connector.oracle.logminer.events.LogMinerEvent;
import io.debezium.connector.oracle.logminer.events.LogMinerEventRow;
import io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.relational.TableId;
import io.debezium.util.Loggings;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.time.Instant;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.infinispan.commons.util.CloseableIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/connector/oracle/logminer/processor/infinispan/AbstractInfinispanLogMinerEventProcessor.class */
public abstract class AbstractInfinispanLogMinerEventProcessor extends AbstractLogMinerEventProcessor<InfinispanTransaction> implements CacheProvider {
    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractInfinispanLogMinerEventProcessor.class);
    private final OracleConnection jdbcConnection;
    private final LogMinerStreamingChangeEventSourceMetrics metrics;
    private final OraclePartition partition;
    private final OracleOffsetContext offsetContext;
    private final EventDispatcher<OraclePartition, TableId> dispatcher;
    private InMemoryPendingTransactionsCache inMemoryPendingTransactionsCache;
    private static AbstractInfinispanLogMinerEventProcessor instance;

    /* loaded from: input_file:io/debezium/connector/oracle/logminer/processor/infinispan/AbstractInfinispanLogMinerEventProcessor$EventKeySortComparator.class */
    private static class EventKeySortComparator implements Comparator<String> {
        public static EventKeySortComparator INSTANCE = new EventKeySortComparator();

        private EventKeySortComparator() {
        }

        @Override // java.util.Comparator
        public int compare(String str, String str2) {
            if (str == null || !str.contains("-")) {
                throw new IllegalStateException("Event Key must be in the format of <transaction>-<event>");
            }
            if (str2 == null || !str2.contains("-")) {
                throw new IllegalStateException("Event Key must be in the format of <transaction>-<event>");
            }
            String[] split = str.split("-");
            String[] split2 = str2.split("-");
            int compareTo = split[0].compareTo(split2[0]);
            if (compareTo == 0) {
                compareTo = Long.compare(Long.parseLong(split[1]), Long.parseLong(split2[1]));
            }
            return compareTo;
        }
    }

    public AbstractInfinispanLogMinerEventProcessor(ChangeEventSource.ChangeEventSourceContext changeEventSourceContext, OracleConnectorConfig oracleConnectorConfig, OracleConnection oracleConnection, EventDispatcher<OraclePartition, TableId> eventDispatcher, OraclePartition oraclePartition, OracleOffsetContext oracleOffsetContext, OracleDatabaseSchema oracleDatabaseSchema, LogMinerStreamingChangeEventSourceMetrics logMinerStreamingChangeEventSourceMetrics) {
        super(changeEventSourceContext, oracleConnectorConfig, oracleDatabaseSchema, oraclePartition, oracleOffsetContext, eventDispatcher, logMinerStreamingChangeEventSourceMetrics, oracleConnection);
        this.inMemoryPendingTransactionsCache = new InMemoryPendingTransactionsCache();
        this.jdbcConnection = oracleConnection;
        this.metrics = logMinerStreamingChangeEventSourceMetrics;
        this.partition = oraclePartition;
        this.offsetContext = oracleOffsetContext;
        this.dispatcher = eventDispatcher;
        instance = this;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void reCreateInMemoryCache() {
        Stream stream = mo59getTransactionCache().keySet().stream();
        try {
            stream.forEach(str -> {
                Stream stream2 = getEventCache().keySet().stream();
                try {
                    int count = (int) stream2.filter(str -> {
                        return str.startsWith(str + "-");
                    }).count();
                    LOGGER.info("Re-creating in memory cache of event count for transaction '" + str + "'. No of events found: " + count);
                    this.inMemoryPendingTransactionsCache.initKey(str, count);
                    if (stream2 != null) {
                        stream2.close();
                    }
                } catch (Throwable th) {
                    if (stream2 != null) {
                        try {
                            stream2.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            });
            if (stream != null) {
                stream.close();
            }
        } catch (Throwable th) {
            if (stream != null) {
                try {
                    stream.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    public static void logCacheStats() {
        if (instance != null) {
            instance.displayCacheStatistics();
        } else {
            LOGGER.trace("AbstractInfinispanLogMinerEventProcessor is not initialized, skipping logging stats.");
        }
    }

    @Override // io.debezium.connector.oracle.logminer.processor.infinispan.CacheProvider
    public void displayCacheStatistics() {
        LOGGER.info("Overall Cache Statistics:");
        LOGGER.info("\tTransactions        : {}", Integer.valueOf(mo59getTransactionCache().size()));
        LOGGER.info("\tRecent Transactions : {}", Integer.valueOf(getProcessedTransactionsCache().size()));
        LOGGER.info("\tSchema Changes      : {}", Integer.valueOf(getSchemaChangesCache().size()));
        LOGGER.info("\tEvents              : {}", Integer.valueOf(getEventCache().size()));
        if (getEventCache().isEmpty() || !LOGGER.isDebugEnabled()) {
            return;
        }
        Stream stream = getEventCache().keySet().stream();
        try {
            stream.forEach(str -> {
                LOGGER.debug("\t\tFound Key: {}", str);
            });
            if (stream != null) {
                stream.close();
            }
        } catch (Throwable th) {
            if (stream != null) {
                try {
                    stream.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Override // io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor
    protected boolean isRecentlyProcessed(String str) {
        return getProcessedTransactionsCache().containsKey(str);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* JADX WARN: Can't rename method to resolve collision */
    @Override // io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor
    public InfinispanTransaction createTransaction(LogMinerEventRow logMinerEventRow) {
        return new InfinispanTransaction(logMinerEventRow.getTransactionId(), logMinerEventRow.getScn(), logMinerEventRow.getChangeTime(), logMinerEventRow.getUserName(), Integer.valueOf(logMinerEventRow.getThread()));
    }

    @Override // io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor
    protected void removeEventWithRowId(LogMinerEventRow logMinerEventRow) {
        List<String> transactionKeysWithPrefix = getTransactionKeysWithPrefix(logMinerEventRow.getTransactionId() + "-");
        if (!transactionKeysWithPrefix.isEmpty() || !isTransactionIdWithNoSequence(logMinerEventRow.getTransactionId())) {
            transactionKeysWithPrefix.sort(EventKeySortComparator.INSTANCE.reversed());
            for (String str : transactionKeysWithPrefix) {
                LogMinerEvent logMinerEvent = (LogMinerEvent) getEventCache().get(str);
                if (logMinerEvent != null && logMinerEvent.getRowId().equals(logMinerEventRow.getRowId())) {
                    LOGGER.debug("Undo applied for event {}.", logMinerEvent);
                    getEventCache().remove(str);
                    this.inMemoryPendingTransactionsCache.decrement(logMinerEventRow.getTransactionId());
                    return;
                }
            }
            Loggings.logWarningAndTraceRecord(LOGGER, logMinerEventRow, "Cannot undo change on table '{}' since event with row-id {} was not found.", new Object[]{logMinerEventRow.getTableId(), logMinerEventRow.getRowId()});
            return;
        }
        String transactionIdPrefix = getTransactionIdPrefix(logMinerEventRow.getTransactionId());
        LOGGER.debug("Undo change refers to a transaction that has no explicit sequence, '{}'", logMinerEventRow.getTransactionId());
        LOGGER.debug("Checking all transactions with prefix '{}'", transactionIdPrefix);
        List<String> transactionKeysWithPrefix2 = getTransactionKeysWithPrefix(transactionIdPrefix);
        if (transactionKeysWithPrefix2.isEmpty()) {
            if (getConfig().isLobEnabled()) {
                return;
            }
            Loggings.logWarningAndTraceRecord(LOGGER, logMinerEventRow, "Cannot undo change on table '{}' since transaction '{}' was not found.", new Object[]{logMinerEventRow.getTableId(), logMinerEventRow.getTransactionId()});
            return;
        }
        transactionKeysWithPrefix2.sort(EventKeySortComparator.INSTANCE.reversed());
        for (String str2 : transactionKeysWithPrefix2) {
            LogMinerEvent logMinerEvent2 = (LogMinerEvent) getEventCache().get(str2);
            if (logMinerEvent2 != null && logMinerEvent2.getRowId().equals(logMinerEventRow.getRowId())) {
                Loggings.logDebugAndTraceRecord(LOGGER, logMinerEventRow, "Undo change on table '{}' applied to transaction '{}'", new Object[]{logMinerEventRow.getTableId(), str2});
                getEventCache().remove(str2);
                this.inMemoryPendingTransactionsCache.decrement(logMinerEventRow.getTransactionId());
                return;
            }
        }
        Loggings.logWarningAndTraceRecord(LOGGER, logMinerEventRow, "Cannot undo change on table '{}' since event with row-id {} was not found.", new Object[]{logMinerEventRow.getTableId(), logMinerEventRow.getRowId()});
    }

    private List<String> getTransactionKeysWithPrefix(String str) {
        Stream stream = getEventCache().keySet().stream();
        try {
            List<String> list = (List) stream.filter(str2 -> {
                return str2.startsWith(str);
            }).collect(Collectors.toList());
            if (stream != null) {
                stream.close();
            }
            return list;
        } catch (Throwable th) {
            if (stream != null) {
                try {
                    stream.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Override // io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor
    protected void processRow(OraclePartition oraclePartition, LogMinerEventRow logMinerEventRow) throws SQLException, InterruptedException {
        String transactionId = logMinerEventRow.getTransactionId();
        if (isRecentlyProcessed(transactionId)) {
            LOGGER.debug("Transaction {} has been seen by connector, skipped.", transactionId);
        } else {
            super.processRow(oraclePartition, logMinerEventRow);
        }
    }

    @Override // io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor
    protected boolean hasSchemaChangeBeenSeen(LogMinerEventRow logMinerEventRow) {
        return getSchemaChangesCache().containsKey(logMinerEventRow.getScn().toString());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* JADX WARN: Can't rename method to resolve collision */
    @Override // io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor
    public InfinispanTransaction getAndRemoveTransactionFromCache(String str) {
        InfinispanTransaction infinispanTransaction = (InfinispanTransaction) mo59getTransactionCache().get(str);
        if (infinispanTransaction != null) {
            mo59getTransactionCache().remove(str);
        }
        return infinispanTransaction;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor
    public void cleanupAfterTransactionRemovedFromCache(InfinispanTransaction infinispanTransaction, boolean z) {
        super.cleanupAfterTransactionRemovedFromCache((AbstractInfinispanLogMinerEventProcessor) infinispanTransaction, z);
        removeEventsWithTransaction(infinispanTransaction);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor
    public Iterator<LogMinerEvent> getTransactionEventIterator(final InfinispanTransaction infinispanTransaction) {
        return new Iterator<LogMinerEvent>() { // from class: io.debezium.connector.oracle.logminer.processor.infinispan.AbstractInfinispanLogMinerEventProcessor.1
            private final int count;
            private LogMinerEvent nextEvent;
            private int index = 0;

            {
                this.count = infinispanTransaction.getNumberOfEvents();
            }

            @Override // java.util.Iterator
            public boolean hasNext() {
                while (this.index < this.count) {
                    this.nextEvent = (LogMinerEvent) AbstractInfinispanLogMinerEventProcessor.this.getEventCache().get(infinispanTransaction.getEventId(this.index));
                    if (this.nextEvent != null) {
                        break;
                    }
                    AbstractInfinispanLogMinerEventProcessor.LOGGER.debug("Event {} must have been undone, skipped.", Integer.valueOf(this.index));
                    this.index++;
                }
                return this.index < this.count;
            }

            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.Iterator
            public LogMinerEvent next() {
                this.index++;
                return this.nextEvent;
            }
        };
    }

    @Override // io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor
    protected void finalizeTransactionCommit(String str, Scn scn) {
        getAbandonedTransactionsCache().remove(str);
        if (getConfig().isLobEnabled()) {
            getProcessedTransactionsCache().put(str, scn.toString());
        }
    }

    @Override // io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor
    protected void finalizeTransactionRollback(String str, Scn scn) {
        InfinispanTransaction infinispanTransaction = (InfinispanTransaction) mo59getTransactionCache().get(str);
        if (infinispanTransaction != null) {
            removeEventsWithTransaction(infinispanTransaction);
            mo59getTransactionCache().remove(str);
        }
        getAbandonedTransactionsCache().remove(str);
        if (getConfig().isLobEnabled()) {
            getProcessedTransactionsCache().put(str, scn.toString());
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor
    public void resetTransactionToStart(InfinispanTransaction infinispanTransaction) {
        super.resetTransactionToStart((AbstractInfinispanLogMinerEventProcessor) infinispanTransaction);
        mo59getTransactionCache().put(infinispanTransaction.getTransactionId(), infinispanTransaction);
    }

    @Override // io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor
    protected void handleSchemaChange(LogMinerEventRow logMinerEventRow) throws InterruptedException {
        super.handleSchemaChange(logMinerEventRow);
        if (logMinerEventRow.getTableName() != null) {
            getSchemaChangesCache().put(logMinerEventRow.getScn().toString(), logMinerEventRow.getTableId().identifier());
        }
    }

    @Override // io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor
    protected void addToTransaction(String str, LogMinerEventRow logMinerEventRow, Supplier<LogMinerEvent> supplier) {
        if (getAbandonedTransactionsCache().contains(str)) {
            LOGGER.warn("Event for abandoned transaction {}, skipped.", str);
            return;
        }
        if (isRecentlyProcessed(str)) {
            LOGGER.warn("Event for transaction {} skipped as transaction has been processed.", str);
            return;
        }
        InfinispanTransaction infinispanTransaction = (InfinispanTransaction) mo59getTransactionCache().get(str);
        if (infinispanTransaction == null) {
            LOGGER.trace("Transaction {} is not in cache, creating.", str);
            infinispanTransaction = createTransaction(logMinerEventRow);
        }
        if (isTransactionOverEventThreshold(infinispanTransaction)) {
            abandonTransactionOverEventThreshold(infinispanTransaction);
            return;
        }
        String eventId = infinispanTransaction.getEventId(infinispanTransaction.getNextEventId());
        if (!getEventCache().containsKey(eventId)) {
            LOGGER.trace("Transaction {}, adding event reference at key {}", str, eventId);
            getEventCache().put(eventId, supplier.get());
            this.metrics.calculateLagFromSource(logMinerEventRow.getChangeTime());
            this.inMemoryPendingTransactionsCache.putOrIncrement(infinispanTransaction.getTransactionId());
        }
        mo59getTransactionCache().put(str, infinispanTransaction);
        this.metrics.setActiveTransactionCount(mo59getTransactionCache().size());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor
    public int getTransactionEventCount(InfinispanTransaction infinispanTransaction) {
        return this.inMemoryPendingTransactionsCache.getNumPending(infinispanTransaction.getTransactionId()).intValue();
    }

    @Override // io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor
    protected PreparedStatement createQueryStatement() throws SQLException {
        return this.jdbcConnection.connection().prepareStatement(getQueryString(), 1003, 1007, 1);
    }

    @Override // io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor
    protected Scn calculateNewStartScn(Scn scn, Scn scn2) throws InterruptedException {
        Scn scn3;
        Instant instant;
        Optional<InfinispanTransaction> oldestTransactionInCache = getOldestTransactionInCache();
        if (oldestTransactionInCache.isPresent()) {
            scn3 = oldestTransactionInCache.get().getStartScn();
            instant = oldestTransactionInCache.get().getChangeTime();
        } else {
            scn3 = Scn.NULL;
            instant = null;
        }
        if (scn3.isNull()) {
            getSchemaChangesCache().entrySet().removeIf(entry -> {
                return true;
            });
        } else {
            abandonTransactions(getConfig().getLogMiningTransactionRetention());
            purgeCache(scn3);
        }
        if (!getConfig().isLobEnabled()) {
            if (!getLastProcessedScn().isNull() && getLastProcessedScn().compareTo(scn) < 0) {
                scn = getLastProcessedScn();
            }
            this.offsetContext.setScn(scn);
            this.metrics.setOldestScnDetails(scn3, instant);
            this.metrics.setOffsetScn(scn);
            this.dispatcher.dispatchHeartbeatEvent(this.partition, this.offsetContext);
            return scn;
        }
        if (mo59getTransactionCache().isEmpty() && !scn2.isNull()) {
            this.offsetContext.setScn(scn2);
            this.dispatcher.dispatchHeartbeatEvent(this.partition, this.offsetContext);
        } else if (!scn3.isNull()) {
            Scn scn4 = scn3;
            getProcessedTransactionsCache().entrySet().removeIf(entry2 -> {
                return Scn.valueOf((String) entry2.getValue()).compareTo(scn4) < 0;
            });
            this.offsetContext.setScn(scn3.subtract(Scn.valueOf(1)));
            this.dispatcher.dispatchHeartbeatEvent(this.partition, this.offsetContext);
        }
        return this.offsetContext.getScn();
    }

    protected abstract void purgeCache(Scn scn);

    /* JADX INFO: Access modifiers changed from: protected */
    public <K, V> void removeIf(CloseableIterator<Map.Entry<K, V>> closeableIterator, Predicate<Map.Entry<K, V>> predicate) {
        while (closeableIterator.hasNext()) {
            try {
                if (predicate.test((Map.Entry) closeableIterator.next())) {
                    closeableIterator.remove();
                }
            } catch (Throwable th) {
                if (closeableIterator != null) {
                    try {
                        closeableIterator.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
        if (closeableIterator != null) {
            closeableIterator.close();
        }
    }

    private void removeEventsWithTransaction(InfinispanTransaction infinispanTransaction) {
        for (int i = 0; i < infinispanTransaction.getNumberOfEvents(); i++) {
            getEventCache().remove(infinispanTransaction.getEventId(i));
        }
        this.inMemoryPendingTransactionsCache.remove(infinispanTransaction.getTransactionId());
    }
}
