package org.apache.flink.connector.kafka.sink;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.connector.sink2.WriterInitContext;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.lineage.KafkaDatasetFacetProvider;
import org.apache.flink.connector.kafka.lineage.KafkaDatasetIdentifier;
import org.apache.flink.connector.kafka.sink.internal.BackchannelFactory;
import org.apache.flink.connector.kafka.sink.internal.CheckpointTransaction;
import org.apache.flink.connector.kafka.sink.internal.FlinkKafkaInternalProducer;
import org.apache.flink.connector.kafka.sink.internal.ProducerPool;
import org.apache.flink.connector.kafka.sink.internal.ProducerPoolImpl;
import org.apache.flink.connector.kafka.sink.internal.ReadableBackchannel;
import org.apache.flink.connector.kafka.sink.internal.TransactionAbortStrategyContextImpl;
import org.apache.flink.connector.kafka.sink.internal.TransactionAbortStrategyImpl;
import org.apache.flink.connector.kafka.sink.internal.TransactionFinished;
import org.apache.flink.connector.kafka.sink.internal.TransactionNamingStrategyContextImpl;
import org.apache.flink.connector.kafka.sink.internal.TransactionNamingStrategyImpl;
import org.apache.flink.connector.kafka.sink.internal.TransactionOwnership;
import org.apache.flink.connector.kafka.util.AdminUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/flink/connector/kafka/sink/ExactlyOnceKafkaWriter.class */
public class ExactlyOnceKafkaWriter<IN> extends KafkaWriter<IN> {
    private static final Logger LOG = LoggerFactory.getLogger(ExactlyOnceKafkaWriter.class);
    private final String transactionalIdPrefix;
    private final TransactionAbortStrategyImpl transactionAbortStrategy;
    private final TransactionNamingStrategyImpl transactionNamingStrategy;
    private final Collection<KafkaWriterState> recoveredStates;
    private final long restoredCheckpointId;
    private final ProducerPool producerPool;
    private final ReadableBackchannel<TransactionFinished> backchannel;
    private final TransactionNamingStrategyContextImpl namingContext;
    private final int totalNumberOfOwnedSubtasks;
    private final int[] ownedSubtaskIds;
    private AdminClient adminClient;

    /* JADX INFO: Access modifiers changed from: package-private */
    public ExactlyOnceKafkaWriter(DeliveryGuarantee deliveryGuarantee, Properties properties, String str, WriterInitContext writerInitContext, KafkaRecordSerializationSchema<IN> kafkaRecordSerializationSchema, SerializationSchema.InitializationContext initializationContext, TransactionAbortStrategyImpl transactionAbortStrategyImpl, TransactionNamingStrategyImpl transactionNamingStrategyImpl, Collection<KafkaWriterState> collection) {
        super(deliveryGuarantee, properties, writerInitContext, kafkaRecordSerializationSchema, initializationContext);
        this.transactionalIdPrefix = (String) Preconditions.checkNotNull(str, "transactionalIdPrefix must not be null");
        this.transactionAbortStrategy = (TransactionAbortStrategyImpl) Preconditions.checkNotNull(transactionAbortStrategyImpl, "transactionAbortStrategy must not be null");
        this.transactionNamingStrategy = (TransactionNamingStrategyImpl) Preconditions.checkNotNull(transactionNamingStrategyImpl, "transactionNamingStrategy must not be null");
        try {
            kafkaRecordSerializationSchema.open(initializationContext, this.kafkaSinkContext);
            this.recoveredStates = (Collection) Preconditions.checkNotNull(collection, "recoveredStates");
            TaskInfo taskInfo = writerInitContext.getTaskInfo();
            TransactionOwnership ownership = transactionNamingStrategyImpl.getOwnership();
            int indexOfThisSubtask = taskInfo.getIndexOfThisSubtask();
            int numberOfParallelSubtasks = taskInfo.getNumberOfParallelSubtasks();
            this.ownedSubtaskIds = ownership.getOwnedSubtaskIds(indexOfThisSubtask, numberOfParallelSubtasks, collection);
            this.totalNumberOfOwnedSubtasks = ownership.getTotalNumberOfOwnedSubtasks(indexOfThisSubtask, numberOfParallelSubtasks, collection);
            initFlinkMetrics();
            this.restoredCheckpointId = writerInitContext.getRestoredCheckpointId().orElse(0L);
            this.producerPool = new ProducerPoolImpl(properties, this::initKafkaMetrics, (Collection) collection.stream().flatMap(kafkaWriterState -> {
                return kafkaWriterState.getPrecommittedTransactionalIds().stream();
            }).collect(Collectors.toList()));
            this.backchannel = BackchannelFactory.getInstance().getReadableBackchannel(indexOfThisSubtask, taskInfo.getAttemptNumber(), str);
            this.namingContext = new TransactionNamingStrategyContextImpl(str, this.ownedSubtaskIds[0], this.restoredCheckpointId, this.producerPool);
        } catch (Exception e) {
            throw new FlinkRuntimeException("Cannot initialize schema.", e);
        }
    }

    @Override // org.apache.flink.connector.kafka.sink.KafkaWriter
    public void initialize() {
        try {
            abortLingeringTransactions((Collection) Preconditions.checkNotNull(this.recoveredStates, "recoveredStates"), this.restoredCheckpointId + 1);
            this.currentProducer = startTransaction(this.restoredCheckpointId + 1);
        } catch (Throwable th) {
            try {
                close();
            } catch (Exception e) {
                th.addSuppressed(e);
            }
            throw th;
        }
    }

    private FlinkKafkaInternalProducer<byte[], byte[]> startTransaction(long j) {
        this.namingContext.setNextCheckpointId(j);
        this.namingContext.setOngoingTransactions((Collection) this.producerPool.getOngoingTransactions().stream().map((v0) -> {
            return v0.getTransactionalId();
        }).collect(Collectors.toSet()));
        FlinkKafkaInternalProducer<byte[], byte[]> transactionalProducer = this.transactionNamingStrategy.getTransactionalProducer(this.namingContext);
        this.namingContext.setLastCheckpointId(j);
        transactionalProducer.beginTransaction();
        return transactionalProducer;
    }

    @Override // org.apache.flink.connector.kafka.sink.KafkaWriter
    public Collection<KafkaCommittable> prepareCommit() {
        if (!this.currentProducer.hasRecordsInTransaction()) {
            this.producerPool.recycle(this.currentProducer);
            return Collections.emptyList();
        }
        KafkaCommittable of = KafkaCommittable.of(this.currentProducer);
        LOG.debug("Prepare {}.", of);
        this.currentProducer.precommitTransaction();
        return Collections.singletonList(of);
    }

    @Override // org.apache.flink.connector.kafka.sink.KafkaWriter
    public List<KafkaWriterState> snapshotState(long j) throws IOException {
        while (true) {
            TransactionFinished poll = this.backchannel.poll();
            if (poll == null) {
                Collection<CheckpointTransaction> ongoingTransactions = this.producerPool.getOngoingTransactions();
                this.currentProducer = startTransaction(j + 1);
                return createSnapshots(ongoingTransactions);
            }
            this.producerPool.recycleByTransactionId(poll.getTransactionId(), poll.isSuccess());
        }
    }

    private List<KafkaWriterState> createSnapshots(Collection<CheckpointTransaction> collection) {
        ArrayList arrayList = new ArrayList();
        int[] iArr = this.ownedSubtaskIds;
        int i = 0;
        while (i < iArr.length) {
            arrayList.add(new KafkaWriterState(this.transactionalIdPrefix, iArr[i], this.totalNumberOfOwnedSubtasks, this.transactionNamingStrategy.getOwnership(), i == 0 ? collection : List.of()));
            i++;
        }
        LOG.debug("Snapshotting state {}", arrayList);
        return arrayList;
    }

    @Override // org.apache.flink.connector.kafka.sink.KafkaWriter
    public void close() throws Exception {
        IOUtils.closeAll(new AutoCloseable[]{this::abortCurrentProducer, () -> {
            IOUtils.closeAll(new AutoCloseable[]{this.producerPool});
        }, this.backchannel, () -> {
            super.close();
        }});
    }

    private void abortCurrentProducer() {
        if (this.currentProducer.hasRecordsInTransaction()) {
            try {
                this.currentProducer.abortTransaction();
            } catch (ProducerFencedException e) {
                LOG.debug("Producer {} fenced while aborting", this.currentProducer.getTransactionalId());
            }
        }
    }

    @VisibleForTesting
    ProducerPool getProducerPool() {
        return this.producerPool;
    }

    @VisibleForTesting
    public String getTransactionalIdPrefix() {
        return this.transactionalIdPrefix;
    }

    private void abortLingeringTransactions(Collection<KafkaWriterState> collection, long j) {
        ArrayList arrayList = new ArrayList();
        arrayList.add(this.transactionalIdPrefix);
        LOG.info("Aborting lingering transactions from previous execution. Recovered states: {}.", collection);
        Optional<KafkaWriterState> findFirst = collection.stream().findFirst();
        if (findFirst.isPresent()) {
            KafkaWriterState kafkaWriterState = findFirst.get();
            if (!kafkaWriterState.getTransactionalIdPrefix().equals(this.transactionalIdPrefix)) {
                arrayList.add(kafkaWriterState.getTransactionalIdPrefix());
                LOG.warn("Transactional id prefix from previous execution {} has changed to {}.", kafkaWriterState.getTransactionalIdPrefix(), this.transactionalIdPrefix);
            }
        }
        LOG.info("Aborting lingering transactions with prefixes {} using {}", arrayList, this.transactionAbortStrategy);
        this.transactionAbortStrategy.abortTransactions(getTransactionAbortStrategyContext(j, arrayList));
    }

    private TransactionAbortStrategyContextImpl getTransactionAbortStrategyContext(long j, List<String> list) {
        return new TransactionAbortStrategyContextImpl(this::getTopicNames, this.kafkaSinkContext.getParallelInstanceId(), this.kafkaSinkContext.getNumberOfParallelInstances(), this.ownedSubtaskIds, this.totalNumberOfOwnedSubtasks, list, j, str -> {
            FlinkKafkaInternalProducer<byte[], byte[]> transactionalProducer = this.producerPool.getTransactionalProducer(str, 0L);
            LOG.debug("Aborting transaction {}", str);
            transactionalProducer.flush();
            short epoch = transactionalProducer.getEpoch();
            this.producerPool.recycle(transactionalProducer);
            return epoch;
        }, this::getAdminClient, (Set) this.recoveredStates.stream().flatMap(kafkaWriterState -> {
            return kafkaWriterState.getPrecommittedTransactionalIds().stream().map((v0) -> {
                return v0.getTransactionalId();
            });
        }).collect(Collectors.toSet()));
    }

    private Collection<String> getTopicNames() {
        KafkaDatasetIdentifier orElseThrow = getDatasetIdentifier().orElseThrow(() -> {
            return new IllegalStateException("The record serializer does not expose a static list of target topics.");
        });
        return orElseThrow.getTopics() != null ? orElseThrow.getTopics() : AdminUtils.getTopicsByPattern(getAdminClient(), orElseThrow.getTopicPattern());
    }

    private Optional<KafkaDatasetIdentifier> getDatasetIdentifier() {
        return this.recordSerializer instanceof KafkaDatasetFacetProvider ? ((KafkaDatasetFacetProvider) this.recordSerializer).getKafkaDatasetFacet().map((v0) -> {
            return v0.getTopicIdentifier();
        }) : Optional.empty();
    }

    private Admin getAdminClient() {
        if (this.adminClient == null) {
            this.adminClient = AdminClient.create(this.kafkaProducerConfig);
        }
        return this.adminClient;
    }
}
