package org.apache.flink.connector.pulsar.sink.writer.topic;

import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.apache.flink.annotation.Internal;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.pulsar.common.config.PulsarClientFactory;
import org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils;
import org.apache.flink.connector.pulsar.common.utils.PulsarTransactionUtils;
import org.apache.flink.connector.pulsar.sink.committer.PulsarCommittable;
import org.apache.flink.connector.pulsar.sink.config.PulsarSinkConfigUtils;
import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration;
import org.apache.flink.shaded.guava30.com.google.common.io.Closer;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.transaction.TransactionCoordinatorClientImpl;
import org.apache.pulsar.common.schema.SchemaInfo;

@Internal
/* loaded from: input_file:org/apache/flink/connector/pulsar/sink/writer/topic/TopicProducerRegister.class */
public class TopicProducerRegister implements Closeable {
    private final PulsarClient pulsarClient;
    private final SinkConfiguration sinkConfiguration;
    private final Map<String, Map<SchemaInfo, Producer<?>>> producerRegister = new HashMap();
    private final Map<String, Transaction> transactionRegister = new HashMap();

    public TopicProducerRegister(SinkConfiguration sinkConfiguration) {
        this.pulsarClient = PulsarClientFactory.createClient(sinkConfiguration);
        this.sinkConfiguration = sinkConfiguration;
    }

    public <T> TypedMessageBuilder<T> createMessageBuilder(String str, Schema<T> schema) {
        Producer<T> orCreateProducer = getOrCreateProducer(str, schema);
        return this.sinkConfiguration.getDeliveryGuarantee() == DeliveryGuarantee.EXACTLY_ONCE ? orCreateProducer.newMessage(getOrCreateTransaction(str)) : orCreateProducer.newMessage();
    }

    public List<PulsarCommittable> prepareCommit() {
        ArrayList arrayList = new ArrayList(this.transactionRegister.size());
        this.transactionRegister.forEach((str, transaction) -> {
            arrayList.add(new PulsarCommittable(transaction.getTxnID(), str));
        });
        clearTransactions();
        return arrayList;
    }

    public void flush() throws IOException {
        Iterator<Map<SchemaInfo, Producer<?>>> it = this.producerRegister.values().iterator();
        while (it.hasNext()) {
            Iterator<Producer<?>> it2 = it.next().values().iterator();
            while (it2.hasNext()) {
                it2.next().flush();
            }
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        Closer create = Closer.create();
        try {
            create.register(this::flush);
            create.register(this::abortTransactions);
            Map<String, Map<SchemaInfo, Producer<?>>> map = this.producerRegister;
            Objects.requireNonNull(map);
            create.register(map::clear);
            create.register(this.pulsarClient);
            if (create != null) {
                create.close();
            }
        } catch (Throwable th) {
            if (create != null) {
                try {
                    create.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private <T> Producer<T> getOrCreateProducer(String str, Schema<T> schema) {
        Map computeIfAbsent = this.producerRegister.computeIfAbsent(str, str2 -> {
            return new HashMap();
        });
        SchemaInfo schemaInfo = schema.getSchemaInfo();
        if (computeIfAbsent.containsKey(schemaInfo)) {
            return (Producer) computeIfAbsent.get(schemaInfo);
        }
        ProducerBuilder createProducerBuilder = PulsarSinkConfigUtils.createProducerBuilder(this.pulsarClient, schema, this.sinkConfiguration);
        createProducerBuilder.topic(str);
        Objects.requireNonNull(createProducerBuilder);
        Producer<T> producer = (Producer) PulsarExceptionUtils.sneakyClient(createProducerBuilder::create);
        computeIfAbsent.put(schemaInfo, producer);
        return producer;
    }

    private Transaction getOrCreateTransaction(String str) {
        return this.transactionRegister.computeIfAbsent(str, str2 -> {
            return PulsarTransactionUtils.createTransaction(this.pulsarClient, this.sinkConfiguration.getTransactionTimeoutMillis());
        });
    }

    private void abortTransactions() {
        if (this.transactionRegister.isEmpty()) {
            return;
        }
        TransactionCoordinatorClientImpl tcClient = ((PulsarClientImpl) this.pulsarClient).getTcClient();
        Preconditions.checkNotNull(tcClient);
        try {
            Closer create = Closer.create();
            try {
                Iterator<Transaction> it = this.transactionRegister.values().iterator();
                while (it.hasNext()) {
                    TxnID txnID = it.next().getTxnID();
                    create.register(() -> {
                        tcClient.abort(txnID);
                    });
                }
                clearTransactions();
                if (create != null) {
                    create.close();
                }
            } finally {
            }
        } catch (IOException e) {
            throw new FlinkRuntimeException(e);
        }
    }

    private void clearTransactions() {
        this.transactionRegister.clear();
    }
}
