package org.apache.flink.connector.kafka.sink;

import java.io.Closeable;
import java.io.IOException;
import java.util.Collection;
import java.util.Properties;
import java.util.function.BiFunction;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.connector.kafka.sink.internal.BackchannelFactory;
import org.apache.flink.connector.kafka.sink.internal.FlinkKafkaInternalProducer;
import org.apache.flink.connector.kafka.sink.internal.TransactionFinished;
import org.apache.flink.connector.kafka.sink.internal.WritableBackchannel;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.IOUtils;
import org.apache.kafka.common.errors.InvalidTxnStateException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.UnknownProducerIdException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/connector/kafka/sink/KafkaCommitter.class */
class KafkaCommitter implements Committer<KafkaCommittable>, Closeable {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaCommitter.class);
    public static final String UNKNOWN_PRODUCER_ID_ERROR_MESSAGE = "because of a bug in the Kafka broker (KAFKA-9310). Please upgrade to Kafka 2.5+. If you are running with concurrent checkpoints, you also may want to try without them.\nTo avoid data loss, the application will restart.";
    private final Properties kafkaProducerConfig;
    private final boolean reusesTransactionalIds;
    private final BiFunction<Properties, String, FlinkKafkaInternalProducer<?, ?>> producerFactory;
    private final WritableBackchannel<TransactionFinished> backchannel;

    @Nullable
    private FlinkKafkaInternalProducer<?, ?> committingProducer;

    /* JADX INFO: Access modifiers changed from: package-private */
    public KafkaCommitter(Properties properties, String str, int i, int i2, boolean z, BiFunction<Properties, String, FlinkKafkaInternalProducer<?, ?>> biFunction) {
        this.kafkaProducerConfig = properties;
        this.reusesTransactionalIds = z;
        this.producerFactory = biFunction;
        this.backchannel = BackchannelFactory.getInstance().getWritableBackchannel(i, i2, str);
    }

    @VisibleForTesting
    public WritableBackchannel<TransactionFinished> getBackchannel() {
        return this.backchannel;
    }

    @VisibleForTesting
    @Nullable
    FlinkKafkaInternalProducer<?, ?> getCommittingProducer() {
        return this.committingProducer;
    }

    public void commit(Collection<Committer.CommitRequest<KafkaCommittable>> collection) throws IOException, InterruptedException {
        for (Committer.CommitRequest<KafkaCommittable> commitRequest : collection) {
            KafkaCommittable kafkaCommittable = (KafkaCommittable) commitRequest.getCommittable();
            String transactionalId = kafkaCommittable.getTransactionalId();
            LOG.debug("Committing Kafka transaction {}", transactionalId);
            FlinkKafkaInternalProducer<?, ?> flinkKafkaInternalProducer = null;
            try {
                flinkKafkaInternalProducer = kafkaCommittable.getProducer().orElseGet(() -> {
                    return getProducer(kafkaCommittable);
                });
                flinkKafkaInternalProducer.commitTransaction();
                this.backchannel.send(TransactionFinished.successful(kafkaCommittable.getTransactionalId()));
            } catch (RetriableException e) {
                LOG.warn("Encountered retriable exception while committing {}.", transactionalId, e);
                commitRequest.retryLater();
            } catch (UnknownProducerIdException e2) {
                LOG.error("Unable to commit transaction ({}) because of a bug in the Kafka broker (KAFKA-9310). Please upgrade to Kafka 2.5+. If you are running with concurrent checkpoints, you also may want to try without them.\nTo avoid data loss, the application will restart.", commitRequest, e2);
                handleFailedTransaction(flinkKafkaInternalProducer);
                commitRequest.signalFailedWithKnownReason(e2);
            } catch (ProducerFencedException e3) {
                logFencedRequest(commitRequest, e3);
                handleFailedTransaction(flinkKafkaInternalProducer);
                commitRequest.signalFailedWithKnownReason(e3);
            } catch (Exception e4) {
                LOG.error("Transaction ({}) encountered error and data has been potentially lost.", commitRequest, e4);
                closeCommitterProducer(flinkKafkaInternalProducer);
                commitRequest.signalFailedWithUnknownReason(e4);
            } catch (InvalidTxnStateException e5) {
                LOG.error("Unable to commit transaction ({}) because it's in an invalid state. Most likely the transaction has been aborted for some reason. Please check the Kafka logs for more details.", commitRequest, e5);
                handleFailedTransaction(flinkKafkaInternalProducer);
                commitRequest.signalFailedWithKnownReason(e5);
            }
        }
    }

    private void logFencedRequest(Committer.CommitRequest<KafkaCommittable> commitRequest, ProducerFencedException producerFencedException) {
        if (this.reusesTransactionalIds) {
            LOG.warn("Unable to commit transaction ({}) because its producer is already fenced. If this warning appears as part of the recovery of a checkpoint, it is expected in some cases (e.g., aborted checkpoints in previous attempt). If it's outside of recovery, this means that you either have a different sink with the same '{}' or recovery took longer than '{}' ({}ms). In both cases this most likely signals data loss, please consult the Flink documentation for more details.", new Object[]{commitRequest, "transactional.id", "transaction.timeout.ms", this.kafkaProducerConfig.getProperty("transaction.timeout.ms"), producerFencedException});
        } else {
            LOG.error("Unable to commit transaction ({}) because its producer is already fenced. This means that you either have a different producer with the same '{}' (this is unlikely with the '{}' as all generated ids are unique and shouldn't be reused) or recovery took longer than '{}' ({}ms). In both cases this most likely signals data loss, please consult the Flink documentation for more details.", new Object[]{commitRequest, "transactional.id", KafkaSink.class.getSimpleName(), "transaction.timeout.ms", this.kafkaProducerConfig.getProperty("transaction.timeout.ms"), producerFencedException});
        }
    }

    private void handleFailedTransaction(FlinkKafkaInternalProducer<?, ?> flinkKafkaInternalProducer) {
        if (flinkKafkaInternalProducer == null) {
            return;
        }
        this.backchannel.send(TransactionFinished.erroneously(flinkKafkaInternalProducer.getTransactionalId()));
        closeCommitterProducer(flinkKafkaInternalProducer);
    }

    private void closeCommitterProducer(FlinkKafkaInternalProducer<?, ?> flinkKafkaInternalProducer) {
        if (flinkKafkaInternalProducer == this.committingProducer) {
            this.committingProducer.close();
            this.committingProducer = null;
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        try {
            IOUtils.closeAll(new AutoCloseable[]{this.backchannel, this.committingProducer});
        } catch (Exception e) {
            ExceptionUtils.rethrow(e);
        }
    }

    private FlinkKafkaInternalProducer<?, ?> getProducer(KafkaCommittable kafkaCommittable) {
        if (this.committingProducer == null) {
            this.committingProducer = this.producerFactory.apply(this.kafkaProducerConfig, kafkaCommittable.getTransactionalId());
        } else {
            this.committingProducer.setTransactionId(kafkaCommittable.getTransactionalId());
        }
        this.committingProducer.resumeTransaction(kafkaCommittable.getProducerId(), kafkaCommittable.getEpoch());
        return this.committingProducer;
    }
}
