package org.apache.flink.connector.pulsar.sink.writer;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.pulsar.sink.committer.PulsarCommittable;
import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration;
import org.apache.flink.connector.pulsar.sink.writer.context.PulsarSinkContext;
import org.apache.flink.connector.pulsar.sink.writer.context.PulsarSinkContextImpl;
import org.apache.flink.connector.pulsar.sink.writer.delayer.MessageDelayer;
import org.apache.flink.connector.pulsar.sink.writer.message.PulsarMessage;
import org.apache.flink.connector.pulsar.sink.writer.router.TopicRouter;
import org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema;
import org.apache.flink.connector.pulsar.sink.writer.topic.TopicMetadataListener;
import org.apache.flink.connector.pulsar.sink.writer.topic.TopicProducerRegister;
import org.apache.flink.shaded.guava30.com.google.common.base.Strings;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/connector/pulsar/sink/writer/PulsarWriter.class */
public class PulsarWriter<IN> implements TwoPhaseCommittingSink.PrecommittingSinkWriter<IN, PulsarCommittable> {
    private static final Logger LOG = LoggerFactory.getLogger(PulsarWriter.class);
    private final PulsarSerializationSchema<IN> serializationSchema;
    private final TopicMetadataListener metadataListener;
    private final TopicRouter<IN> topicRouter;
    private final MessageDelayer<IN> messageDelayer;
    private final DeliveryGuarantee deliveryGuarantee;
    private final PulsarSinkContext sinkContext;
    private final TopicProducerRegister producerRegister;
    private final MailboxExecutor mailboxExecutor;
    private final AtomicLong pendingMessages = new AtomicLong(0);

    public PulsarWriter(SinkConfiguration sinkConfiguration, PulsarSerializationSchema<IN> pulsarSerializationSchema, TopicMetadataListener topicMetadataListener, TopicRouter<IN> topicRouter, MessageDelayer<IN> messageDelayer, Sink.InitContext initContext) {
        Preconditions.checkNotNull(sinkConfiguration);
        this.serializationSchema = (PulsarSerializationSchema) Preconditions.checkNotNull(pulsarSerializationSchema);
        this.metadataListener = (TopicMetadataListener) Preconditions.checkNotNull(topicMetadataListener);
        this.topicRouter = (TopicRouter) Preconditions.checkNotNull(topicRouter);
        this.messageDelayer = (MessageDelayer) Preconditions.checkNotNull(messageDelayer);
        Preconditions.checkNotNull(initContext);
        this.deliveryGuarantee = sinkConfiguration.getDeliveryGuarantee();
        this.sinkContext = new PulsarSinkContextImpl(initContext, sinkConfiguration);
        LOG.debug("Initialize topic metadata after creating Pulsar writer.");
        this.metadataListener.open(sinkConfiguration, initContext.getProcessingTimeService());
        this.topicRouter.open(sinkConfiguration);
        try {
            this.serializationSchema.open(initContext.asSerializationSchemaInitializationContext(), this.sinkContext, sinkConfiguration);
            this.producerRegister = new TopicProducerRegister(sinkConfiguration);
            this.mailboxExecutor = initContext.getMailboxExecutor();
        } catch (Exception e) {
            throw new FlinkRuntimeException("Cannot initialize schema.", e);
        }
    }

    public void write(IN in, SinkWriter.Context context) throws IOException, InterruptedException {
        PulsarMessage<?> serialize = this.serializationSchema.serialize(in, this.sinkContext);
        String route = this.topicRouter.route(in, serialize.getKey(), this.metadataListener.availableTopics(), this.sinkContext);
        TypedMessageBuilder<?> createMessageBuilder = createMessageBuilder(route, context, serialize);
        long deliverAt = this.messageDelayer.deliverAt(in, this.sinkContext);
        if (deliverAt > 0) {
            createMessageBuilder.deliverAt(deliverAt);
        }
        if (this.deliveryGuarantee == DeliveryGuarantee.NONE) {
            createMessageBuilder.sendAsync();
        } else {
            this.pendingMessages.incrementAndGet();
            createMessageBuilder.sendAsync().whenComplete((messageId, th) -> {
                this.pendingMessages.decrementAndGet();
                if (th != null) {
                    this.mailboxExecutor.execute(() -> {
                        throw new FlinkRuntimeException("Failed to send data to Pulsar " + route, th);
                    }, "Failed to send data to Pulsar");
                } else {
                    LOG.debug("Sent message to Pulsar {} with message id {}", route, messageId);
                }
            });
        }
    }

    private TypedMessageBuilder<?> createMessageBuilder(String str, SinkWriter.Context context, PulsarMessage<?> pulsarMessage) {
        TypedMessageBuilder<?> createMessageBuilder = this.producerRegister.createMessageBuilder(str, pulsarMessage.getSchema());
        byte[] orderingKey = pulsarMessage.getOrderingKey();
        if (orderingKey != null && orderingKey.length > 0) {
            createMessageBuilder.orderingKey(orderingKey);
        }
        String key = pulsarMessage.getKey();
        if (!Strings.isNullOrEmpty(key)) {
            createMessageBuilder.key(key);
        }
        long eventTime = pulsarMessage.getEventTime();
        if (eventTime > 0) {
            createMessageBuilder.eventTime(eventTime);
        } else {
            Long timestamp = context.timestamp();
            if (timestamp != null && timestamp.longValue() > 0) {
                createMessageBuilder.eventTime(timestamp.longValue());
            }
        }
        createMessageBuilder.value(pulsarMessage.getValue());
        Map<String, String> properties = pulsarMessage.getProperties();
        if (properties != null && !properties.isEmpty()) {
            createMessageBuilder.properties(properties);
        }
        Long sequenceId = pulsarMessage.getSequenceId();
        if (sequenceId != null) {
            createMessageBuilder.sequenceId(sequenceId.longValue());
        }
        List<String> replicationClusters = pulsarMessage.getReplicationClusters();
        if (replicationClusters != null && !replicationClusters.isEmpty()) {
            createMessageBuilder.replicationClusters(replicationClusters);
        }
        if (pulsarMessage.isDisableReplication()) {
            createMessageBuilder.disableReplication();
        }
        return createMessageBuilder;
    }

    public void flush(boolean z) throws IOException {
        if (z || this.deliveryGuarantee != DeliveryGuarantee.NONE) {
            LOG.info("Flush the pending messages to Pulsar.");
            this.producerRegister.flush();
            while (this.pendingMessages.longValue() > 0) {
                this.producerRegister.flush();
            }
        }
    }

    public Collection<PulsarCommittable> prepareCommit() {
        return this.deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE ? this.producerRegister.prepareCommit() : Collections.emptyList();
    }

    public void close() throws Exception {
        IOUtils.closeAll(new AutoCloseable[]{this.metadataListener, this.producerRegister});
    }
}
