package org.apache.flink.connector.kafka.sink.internal;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.Deque;
import java.util.Iterator;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Objects;
import java.util.Properties;
import java.util.TreeMap;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.apache.kafka.common.KafkaException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
@NotThreadSafe
/* loaded from: input_file:org/apache/flink/connector/kafka/sink/internal/ProducerPoolImpl.class */
public class ProducerPoolImpl implements ProducerPool {
    private static final Logger LOG = LoggerFactory.getLogger(ProducerPoolImpl.class);
    private final Properties kafkaProducerConfig;
    private final Consumer<FlinkKafkaInternalProducer<byte[], byte[]>> producerInit;
    private final Deque<FlinkKafkaInternalProducer<byte[], byte[]>> producerPool = new ArrayDeque();
    private final Map<String, ProducerEntry> producerByTransactionalId = new TreeMap();
    private final NavigableMap<CheckpointTransaction, String> transactionalIdsByCheckpoint = new TreeMap(Comparator.comparing((v0) -> {
        return v0.getCheckpointId();
    }).thenComparing((v0) -> {
        return v0.getTransactionalId();
    }));

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/connector/kafka/sink/internal/ProducerPoolImpl$ProducerEntry.class */
    public static class ProducerEntry {

        @Nullable
        private final FlinkKafkaInternalProducer<byte[], byte[]> producer;
        private final CheckpointTransaction checkpointedTransaction;

        private ProducerEntry(@Nullable FlinkKafkaInternalProducer<byte[], byte[]> flinkKafkaInternalProducer, CheckpointTransaction checkpointTransaction) {
            this.producer = flinkKafkaInternalProducer;
            this.checkpointedTransaction = (CheckpointTransaction) Preconditions.checkNotNull(checkpointTransaction, "checkpointedTransaction must not be null");
        }

        public CheckpointTransaction getCheckpointedTransaction() {
            return this.checkpointedTransaction;
        }

        @Nullable
        public FlinkKafkaInternalProducer<byte[], byte[]> getProducer() {
            return this.producer;
        }

        public String toString() {
            return this.producer != null ? this.producer.toString() : this.checkpointedTransaction.toString();
        }
    }

    public ProducerPoolImpl(Properties properties, Consumer<FlinkKafkaInternalProducer<byte[], byte[]>> consumer, Collection<CheckpointTransaction> collection) {
        this.kafkaProducerConfig = (Properties) Preconditions.checkNotNull(properties, "kafkaProducerConfig must not be null");
        this.producerInit = (Consumer) Preconditions.checkNotNull(consumer, "producerInit must not be null");
        initPrecommittedTransactions(collection);
    }

    @Override // org.apache.flink.connector.kafka.sink.internal.ProducerPool
    public void recycleByTransactionId(String str, boolean z) {
        ProducerEntry remove = this.producerByTransactionalId.remove(str);
        LOG.debug("Transaction {} finished, producer {}", str, remove);
        if (remove == null) {
            LOG.info("Received unmatched producer for transaction {}. This is expected during rescale.", str);
            return;
        }
        long checkpointId = remove.getCheckpointedTransaction().getCheckpointId();
        boolean z2 = this.transactionalIdsByCheckpoint.firstKey().getCheckpointId() != checkpointId;
        this.transactionalIdsByCheckpoint.remove(remove.getCheckpointedTransaction());
        if (z) {
            recycleProducer(remove.getProducer());
        } else {
            closeProducer(remove.getProducer());
        }
        if (z2) {
            Iterator<Map.Entry<CheckpointTransaction, String>> it = this.transactionalIdsByCheckpoint.entrySet().iterator();
            while (it.hasNext()) {
                Map.Entry<CheckpointTransaction, String> next = it.next();
                if (next.getKey().getCheckpointId() < checkpointId) {
                    it.remove();
                    closeProducer(this.producerByTransactionalId.remove(next.getValue()).getProducer());
                }
            }
        }
    }

    private void closeProducer(@Nullable FlinkKafkaInternalProducer<byte[], byte[]> flinkKafkaInternalProducer) {
        if (flinkKafkaInternalProducer != null) {
            flinkKafkaInternalProducer.close();
        }
    }

    @Override // org.apache.flink.connector.kafka.sink.internal.ProducerPool
    public void recycle(FlinkKafkaInternalProducer<byte[], byte[]> flinkKafkaInternalProducer) {
        recycleProducer(flinkKafkaInternalProducer);
        this.transactionalIdsByCheckpoint.remove(this.producerByTransactionalId.remove(flinkKafkaInternalProducer.getTransactionalId()).getCheckpointedTransaction());
    }

    private void recycleProducer(@Nullable FlinkKafkaInternalProducer<byte[], byte[]> flinkKafkaInternalProducer) {
        if (flinkKafkaInternalProducer == null) {
            return;
        }
        try {
            if (flinkKafkaInternalProducer.isInTransaction()) {
                flinkKafkaInternalProducer.commitTransaction();
            }
            this.producerPool.add(flinkKafkaInternalProducer);
            LOG.debug("Recycling {}, new pool size {}", flinkKafkaInternalProducer, Integer.valueOf(this.producerPool.size()));
        } catch (KafkaException e) {
            closeProducer(flinkKafkaInternalProducer);
            LOG.debug("Encountered exception while double-committing, discarding producer {}: {}", flinkKafkaInternalProducer, e);
        }
    }

    private void initPrecommittedTransactions(Collection<CheckpointTransaction> collection) {
        for (CheckpointTransaction checkpointTransaction : collection) {
            this.transactionalIdsByCheckpoint.put(checkpointTransaction, checkpointTransaction.getTransactionalId());
            this.producerByTransactionalId.put(checkpointTransaction.getTransactionalId(), new ProducerEntry(null, checkpointTransaction));
        }
        LOG.debug("Initialized ongoing transactions from state {}", collection);
    }

    @Override // org.apache.flink.connector.kafka.sink.internal.ProducerPool
    public FlinkKafkaInternalProducer<byte[], byte[]> getTransactionalProducer(String str, long j) {
        FlinkKafkaInternalProducer<byte[], byte[]> poll = this.producerPool.poll();
        if (poll == null) {
            poll = new FlinkKafkaInternalProducer<>(this.kafkaProducerConfig, str);
            this.producerInit.accept(poll);
        } else if (str != null) {
            poll.setTransactionId(str);
        }
        if (str != null) {
            CheckpointTransaction checkpointTransaction = new CheckpointTransaction(str, j);
            ProducerEntry put = this.producerByTransactionalId.put(str, new ProducerEntry(poll, checkpointTransaction));
            this.transactionalIdsByCheckpoint.put(checkpointTransaction, str);
            Preconditions.checkState(put == null, "Transaction %s already ongoing existing producer %s; new producer %s", new Object[]{str, put, poll});
            poll.initTransactions();
        }
        LOG.debug("getProducer {}, new pool size {}", poll, Integer.valueOf(this.producerPool.size()));
        return poll;
    }

    @Override // org.apache.flink.connector.kafka.sink.internal.ProducerPool
    public Collection<CheckpointTransaction> getOngoingTransactions() {
        return new ArrayList(this.transactionalIdsByCheckpoint.keySet());
    }

    @VisibleForTesting
    public Collection<FlinkKafkaInternalProducer<byte[], byte[]>> getProducers() {
        return this.producerPool;
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        LOG.debug("Closing used producers {} and free producers {}", this.producerByTransactionalId, this.producerPool);
        Deque<FlinkKafkaInternalProducer<byte[], byte[]>> deque = this.producerPool;
        Objects.requireNonNull(deque);
        Map<String, ProducerEntry> map = this.producerByTransactionalId;
        Objects.requireNonNull(map);
        IOUtils.closeAll(new AutoCloseable[]{() -> {
            IOUtils.closeAll(this.producerPool);
        }, () -> {
            IOUtils.closeAll((Iterable) this.producerByTransactionalId.values().stream().map((v0) -> {
                return v0.getProducer();
            }).filter((v0) -> {
                return Objects.nonNull(v0);
            }).collect(Collectors.toList()));
        }, deque::clear, map::clear});
    }
}
