/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.connect.runtime;

import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.errors.InvalidProducerEpochException;
import org.apache.kafka.common.metrics.MeasurableStat;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Min;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.runtime.AbstractWorkerSourceTask;
import org.apache.kafka.connect.runtime.ConnectMetrics;
import org.apache.kafka.connect.runtime.ConnectMetricsRegistry;
import org.apache.kafka.connect.runtime.SourceConnectorConfig;
import org.apache.kafka.connect.runtime.SubmittedRecords;
import org.apache.kafka.connect.runtime.TargetState;
import org.apache.kafka.connect.runtime.TaskStatus;
import org.apache.kafka.connect.runtime.TransformationChain;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.WorkerSourceTaskContext;
import org.apache.kafka.connect.runtime.WorkerTransactionContext;
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.storage.CloseableOffsetStorageReader;
import org.apache.kafka.connect.storage.ClusterConfigState;
import org.apache.kafka.connect.storage.ConnectorOffsetBackingStore;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.HeaderConverter;
import org.apache.kafka.connect.storage.OffsetStorageWriter;
import org.apache.kafka.connect.storage.StatusBackingStore;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.LoggingContext;
import org.apache.kafka.connect.util.TopicAdmin;
import org.apache.kafka.connect.util.TopicCreationGroup;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class ExactlyOnceWorkerSourceTask
extends AbstractWorkerSourceTask {
    private static final Logger log = LoggerFactory.getLogger(ExactlyOnceWorkerSourceTask.class);
    private boolean transactionOpen = false;
    private final LinkedHashMap<SourceRecord, RecordMetadata> commitableRecords = new LinkedHashMap();
    private final TransactionBoundaryManager transactionBoundaryManager;
    private final TransactionMetricsGroup transactionMetrics;
    private final Runnable preProducerCheck;
    private final Runnable postProducerCheck;

    public ExactlyOnceWorkerSourceTask(ConnectorTaskId id, SourceTask task, TaskStatus.Listener statusListener, TargetState initialState, Converter keyConverter, Converter valueConverter, HeaderConverter headerConverter, TransformationChain<SourceRecord> transformationChain, Producer<byte[], byte[]> producer, TopicAdmin admin, Map<String, TopicCreationGroup> topicGroups, CloseableOffsetStorageReader offsetReader, OffsetStorageWriter offsetWriter, ConnectorOffsetBackingStore offsetStore, WorkerConfig workerConfig, ClusterConfigState configState, ConnectMetrics connectMetrics, ClassLoader loader, Time time, RetryWithToleranceOperator retryWithToleranceOperator, StatusBackingStore statusBackingStore, SourceConnectorConfig sourceConfig, Executor closeExecutor, Runnable preProducerCheck, Runnable postProducerCheck) {
        super(id, task, statusListener, initialState, keyConverter, valueConverter, headerConverter, transformationChain, new WorkerSourceTaskContext(offsetReader, id, configState, ExactlyOnceWorkerSourceTask.buildTransactionContext(sourceConfig)), producer, admin, topicGroups, offsetReader, offsetWriter, offsetStore, workerConfig, connectMetrics, loader, time, retryWithToleranceOperator, statusBackingStore, closeExecutor);
        this.preProducerCheck = preProducerCheck;
        this.postProducerCheck = postProducerCheck;
        this.transactionBoundaryManager = this.buildTransactionManager(workerConfig, sourceConfig, this.sourceTaskContext.transactionContext());
        this.transactionMetrics = new TransactionMetricsGroup(id, connectMetrics);
    }

    private static WorkerTransactionContext buildTransactionContext(SourceConnectorConfig sourceConfig) {
        return SourceTask.TransactionBoundary.CONNECTOR.equals((Object)sourceConfig.transactionBoundary()) ? new WorkerTransactionContext() : null;
    }

    @Override
    protected void prepareToInitializeTask() {
        this.preProducerCheck.run();
        if (this.isStopping()) {
            return;
        }
        this.producer.initTransactions();
        this.postProducerCheck.run();
    }

    @Override
    protected void prepareToEnterSendLoop() {
        this.transactionBoundaryManager.initialize();
    }

    @Override
    protected void beginSendIteration() {
    }

    @Override
    protected void prepareToPollTask() {
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void recordDropped(SourceRecord record) {
        LinkedHashMap<SourceRecord, RecordMetadata> linkedHashMap = this.commitableRecords;
        synchronized (linkedHashMap) {
            this.commitableRecords.put(record, null);
        }
        this.transactionBoundaryManager.maybeCommitTransactionForRecord(record);
    }

    @Override
    protected Optional<SubmittedRecords.SubmittedRecord> prepareToSendRecord(SourceRecord sourceRecord, ProducerRecord<byte[], byte[]> producerRecord) {
        if (this.offsetStore.primaryOffsetsTopic().equals(producerRecord.topic())) {
            throw new ConnectException("Source tasks may not produce to their own offsets topics when exactly-once support is enabled");
        }
        this.maybeBeginTransaction();
        return Optional.empty();
    }

    @Override
    protected void recordDispatched(SourceRecord record) {
        this.offsetWriter.offset(record.sourcePartition(), record.sourceOffset());
        this.transactionMetrics.addRecord();
        this.transactionBoundaryManager.maybeCommitTransactionForRecord(record);
    }

    @Override
    protected void batchDispatched() {
        this.transactionBoundaryManager.maybeCommitTransactionForBatch();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void recordSent(SourceRecord sourceRecord, ProducerRecord<byte[], byte[]> producerRecord, RecordMetadata recordMetadata) {
        LinkedHashMap<SourceRecord, RecordMetadata> linkedHashMap = this.commitableRecords;
        synchronized (linkedHashMap) {
            this.commitableRecords.put(sourceRecord, recordMetadata);
        }
    }

    @Override
    protected void producerSendFailed(boolean synchronous, ProducerRecord<byte[], byte[]> producerRecord, SourceRecord preTransformRecord, Exception e) {
        if (synchronous) {
            throw this.maybeWrapProducerSendException("Unrecoverable exception trying to send", e);
        }
    }

    @Override
    protected void finalOffsetCommit(boolean failed) {
        if (failed) {
            log.debug("Skipping final offset commit as task has failed");
            return;
        }
        this.transactionBoundaryManager.maybeCommitFinalTransaction();
    }

    @Override
    public void removeMetrics() {
        Utils.closeQuietly((AutoCloseable)this.transactionMetrics, (String)"source task transaction metrics tracker");
    }

    @Override
    protected void onPause() {
        super.onPause();
        this.transactionBoundaryManager.maybeCommitFinalTransaction();
    }

    private void maybeBeginTransaction() {
        if (!this.transactionOpen) {
            this.producer.beginTransaction();
            this.transactionOpen = true;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void commitTransaction() {
        Throwable error2;
        log.debug("{} Committing offsets", (Object)this);
        long started = this.time.milliseconds();
        AtomicReference<Throwable> flushError = new AtomicReference<Throwable>();
        boolean shouldFlush = false;
        try {
            shouldFlush = this.offsetWriter.beginFlush();
        }
        catch (Throwable e) {
            flushError.compareAndSet(null, e);
        }
        if (flushError.get() == null && !this.transactionOpen && !shouldFlush) {
            long durationMillis = this.time.milliseconds() - started;
            this.recordCommitSuccess(durationMillis);
            log.debug("{} Finished commitOffsets successfully in {} ms", (Object)this, (Object)durationMillis);
            this.commitSourceTask();
            return;
        }
        this.maybeBeginTransaction();
        if (shouldFlush) {
            this.offsetWriter.doFlush((error, result) -> {
                if (error != null) {
                    log.error("{} Failed to flush offsets to storage: ", (Object)this, (Object)error);
                    flushError.compareAndSet(null, error);
                } else {
                    log.trace("{} Finished flushing offsets to storage", (Object)this);
                }
            });
        }
        if ((error2 = (Throwable)flushError.get()) == null) {
            try {
                this.producer.commitTransaction();
            }
            catch (Throwable t) {
                log.error("{} Failed to commit producer transaction", (Object)this, (Object)t);
                flushError.compareAndSet(null, t);
            }
            this.transactionOpen = false;
        }
        if ((error2 = (Throwable)flushError.get()) != null) {
            this.recordCommitFailure(this.time.milliseconds() - started, null);
            this.offsetWriter.cancelFlush();
            throw this.maybeWrapProducerSendException("Failed to flush offsets and/or records for task " + this.id, error2);
        }
        this.transactionMetrics.commitTransaction();
        long durationMillis = this.time.milliseconds() - started;
        this.recordCommitSuccess(durationMillis);
        log.debug("{} Finished commitOffsets successfully in {} ms", (Object)this, (Object)durationMillis);
        LinkedHashMap<SourceRecord, RecordMetadata> linkedHashMap = this.commitableRecords;
        synchronized (linkedHashMap) {
            this.commitableRecords.forEach(this::commitTaskRecord);
            this.commitableRecords.clear();
        }
        this.commitSourceTask();
    }

    private RuntimeException maybeWrapProducerSendException(String message, Throwable error) {
        if (ExactlyOnceWorkerSourceTask.isPossibleTransactionTimeoutError(error)) {
            return this.wrapTransactionTimeoutError(error);
        }
        return new ConnectException(message, error);
    }

    private static boolean isPossibleTransactionTimeoutError(Throwable error) {
        return error instanceof InvalidProducerEpochException || error.getCause() instanceof InvalidProducerEpochException;
    }

    private ConnectException wrapTransactionTimeoutError(Throwable error) {
        return new ConnectException("The task " + this.id + " was unable to finish writing records to Kafka before its producer transaction expired. It may be necessary to reconfigure this connector in order for it to run healthily with exactly-once support. Options for this include: tune the connector's producer configuration for higher throughput, increase the transaction timeout for the connector's producers, decrease the offset commit interval (if using interval-based transaction boundaries), or use the 'poll' transaction boundary (if the connector is not already configured to use it).", error);
    }

    public String toString() {
        return "ExactlyOnceWorkerSourceTask{id=" + this.id + '}';
    }

    private TransactionBoundaryManager buildTransactionManager(WorkerConfig workerConfig, SourceConnectorConfig sourceConfig, final WorkerTransactionContext transactionContext) {
        SourceTask.TransactionBoundary boundary = sourceConfig.transactionBoundary();
        switch (boundary) {
            case POLL: {
                return new TransactionBoundaryManager(){

                    @Override
                    protected boolean shouldCommitTransactionForBatch(long currentTimeMs) {
                        return true;
                    }

                    @Override
                    protected boolean shouldCommitFinalTransaction() {
                        return true;
                    }
                };
            }
            case INTERVAL: {
                final long transactionBoundaryInterval = Optional.ofNullable(sourceConfig.transactionBoundaryInterval()).orElse(workerConfig.offsetCommitInterval());
                return new TransactionBoundaryManager(){
                    private final long commitInterval;
                    private long lastCommit;
                    {
                        this.commitInterval = transactionBoundaryInterval;
                    }

                    @Override
                    public void initialize() {
                        this.lastCommit = ExactlyOnceWorkerSourceTask.this.time.milliseconds();
                    }

                    @Override
                    protected boolean shouldCommitTransactionForBatch(long currentTimeMs) {
                        if (ExactlyOnceWorkerSourceTask.this.time.milliseconds() >= this.lastCommit + this.commitInterval) {
                            this.lastCommit = ExactlyOnceWorkerSourceTask.this.time.milliseconds();
                            return true;
                        }
                        return false;
                    }

                    @Override
                    protected boolean shouldCommitFinalTransaction() {
                        return true;
                    }
                };
            }
            case CONNECTOR: {
                Objects.requireNonNull(transactionContext, "Transaction context must be provided when using connector-defined transaction boundaries");
                return new TransactionBoundaryManager(){

                    @Override
                    protected boolean shouldCommitFinalTransaction() {
                        return this.shouldCommitTransactionForBatch(ExactlyOnceWorkerSourceTask.this.time.milliseconds());
                    }

                    @Override
                    protected boolean shouldCommitTransactionForBatch(long currentTimeMs) {
                        if (transactionContext.shouldAbortBatch()) {
                            log.info("Aborting transaction for batch as requested by connector");
                            this.abortTransaction();
                            return true;
                        }
                        return transactionContext.shouldCommitBatch();
                    }

                    @Override
                    protected boolean shouldCommitTransactionForRecord(SourceRecord record) {
                        if (transactionContext.shouldAbortOn(record)) {
                            log.info("Aborting transaction for record on topic {} as requested by connector", (Object)record.topic());
                            log.trace("Last record hash in aborted transaction: {}", (Object)record.hashCode());
                            this.abortTransaction();
                            return true;
                        }
                        return transactionContext.shouldCommitOn(record);
                    }

                    private void abortTransaction() {
                        ExactlyOnceWorkerSourceTask.this.producer.abortTransaction();
                        ExactlyOnceWorkerSourceTask.this.transactionMetrics.abortTransaction();
                        ExactlyOnceWorkerSourceTask.this.transactionOpen = false;
                    }
                };
            }
        }
        throw new IllegalArgumentException("Unrecognized transaction boundary: " + boundary);
    }

    TransactionMetricsGroup transactionMetricsGroup() {
        return this.transactionMetrics;
    }

    static class TransactionMetricsGroup
    implements AutoCloseable {
        private final Sensor transactionSize;
        private int size;
        private final ConnectMetrics.MetricGroup metricGroup;

        public TransactionMetricsGroup(ConnectorTaskId id, ConnectMetrics connectMetrics) {
            ConnectMetricsRegistry registry = connectMetrics.registry();
            this.metricGroup = connectMetrics.group(registry.sourceTaskGroupName(), registry.connectorTagName(), id.connector(), registry.taskTagName(), Integer.toString(id.task()));
            this.transactionSize = this.metricGroup.sensor("transaction-size");
            this.transactionSize.add(this.metricGroup.metricName(registry.transactionSizeAvg), (MeasurableStat)new Avg());
            this.transactionSize.add(this.metricGroup.metricName(registry.transactionSizeMin), (MeasurableStat)new Min());
            this.transactionSize.add(this.metricGroup.metricName(registry.transactionSizeMax), (MeasurableStat)new Max());
        }

        @Override
        public void close() {
            this.metricGroup.close();
        }

        void addRecord() {
            ++this.size;
        }

        void abortTransaction() {
            this.size = 0;
        }

        void commitTransaction() {
            this.transactionSize.record((double)this.size);
            this.size = 0;
        }

        protected ConnectMetrics.MetricGroup metricGroup() {
            return this.metricGroup;
        }
    }

    private abstract class TransactionBoundaryManager {
        private TransactionBoundaryManager() {
        }

        protected boolean shouldCommitTransactionForRecord(SourceRecord record) {
            return false;
        }

        protected boolean shouldCommitTransactionForBatch(long currentTimeMs) {
            return false;
        }

        protected boolean shouldCommitFinalTransaction() {
            return false;
        }

        protected void initialize() {
        }

        public void maybeCommitTransactionForRecord(SourceRecord record) {
            this.maybeCommitTransaction(this.shouldCommitTransactionForRecord(record));
        }

        public void maybeCommitTransactionForBatch() {
            this.maybeCommitTransaction(this.shouldCommitTransactionForBatch(ExactlyOnceWorkerSourceTask.this.time.milliseconds()));
        }

        public void maybeCommitFinalTransaction() {
            this.maybeCommitTransaction(this.shouldCommitFinalTransaction());
        }

        private void maybeCommitTransaction(boolean shouldCommit) {
            if (shouldCommit) {
                try (LoggingContext loggingContext = LoggingContext.forOffsets(ExactlyOnceWorkerSourceTask.this.id);){
                    ExactlyOnceWorkerSourceTask.this.commitTransaction();
                }
            }
        }
    }
}

