package org.apache.flink.connector.pulsar.sink.writer.topic;

import java.io.Closeable;
import java.io.IOException;
import java.io.Serializable;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import javax.annotation.ParametersAreNonnullByDefault;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.operators.ProcessingTimeService;
import org.apache.flink.connector.pulsar.common.config.PulsarClientFactory;
import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration;
import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicMetadata;
import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils;
import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.shade.com.google.common.cache.CacheBuilder;
import org.apache.pulsar.shade.com.google.common.cache.CacheLoader;
import org.apache.pulsar.shade.com.google.common.cache.LoadingCache;
import org.apache.pulsar.shade.com.google.common.collect.ImmutableList;
import org.apache.pulsar.shade.com.google.common.collect.UnmodifiableIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/connector/pulsar/sink/writer/topic/MetadataListener.class */
public class MetadataListener implements Serializable, Closeable {
    private static final long serialVersionUID = 6186948471557507522L;
    private static final Logger LOG = LoggerFactory.getLogger(MetadataListener.class);
    private final ImmutableList<String> partitions;
    private final ImmutableList<String> topics;
    private ImmutableList<TopicPartition> availablePartitions;
    private transient PulsarAdmin pulsarAdmin;
    private transient Long topicMetadataRefreshInterval;
    private transient ProcessingTimeService timeService;
    private transient LoadingCache<String, Optional<Integer>> topicPartitionCache;

    public MetadataListener() {
        this(Collections.emptyList());
    }

    public MetadataListener(List<String> list) {
        ImmutableList.Builder builder = ImmutableList.builder();
        ImmutableList.Builder builder2 = ImmutableList.builder();
        for (String str : list) {
            if (TopicNameUtils.isPartition(str)) {
                builder.add((ImmutableList.Builder) str);
            } else {
                builder2.add((ImmutableList.Builder) str);
            }
        }
        this.partitions = builder.build();
        this.topics = builder2.build();
        this.availablePartitions = ImmutableList.of();
    }

    public void open(SinkConfiguration sinkConfiguration, ProcessingTimeService processingTimeService) throws PulsarClientException {
        this.pulsarAdmin = PulsarClientFactory.createAdmin(sinkConfiguration);
        this.topicMetadataRefreshInterval = Long.valueOf(sinkConfiguration.getTopicMetadataRefreshInterval());
        this.timeService = processingTimeService;
        this.topicPartitionCache = CacheBuilder.newBuilder().expireAfterWrite(this.topicMetadataRefreshInterval.longValue(), TimeUnit.MILLISECONDS).build(new CacheLoader<String, Optional<Integer>>() { // from class: org.apache.flink.connector.pulsar.sink.writer.topic.MetadataListener.1
            @Override // org.apache.pulsar.shade.com.google.common.cache.CacheLoader
            @ParametersAreNonnullByDefault
            public Optional<Integer> load(String str) throws PulsarAdminException {
                try {
                    return Optional.of(Integer.valueOf(MetadataListener.this.pulsarAdmin.topics().getPartitionedTopicMetadata(str).partitions));
                } catch (PulsarAdminException.NotFoundException e) {
                    return Optional.empty();
                }
            }
        });
        try {
            updateTopicMetadata();
            if (this.topics.isEmpty()) {
                LOG.info("No topics have been provided, skip metadata update timer.");
            } else {
                registerNextTopicMetadataUpdateTimer();
            }
        } catch (PulsarAdminException e) {
            throw new FlinkRuntimeException(e);
        }
    }

    public List<TopicPartition> availablePartitions() {
        return this.availablePartitions;
    }

    public Optional<TopicMetadata> queryTopicMetadata(String str) throws PulsarAdminException {
        if (TopicNameUtils.isPartition(str)) {
            return Optional.of(new TopicMetadata(str, 0));
        }
        try {
            return this.topicPartitionCache.get(str).map(num -> {
                return new TopicMetadata(str, num.intValue());
            });
        } catch (ExecutionException e) {
            Optional findThrowable = ExceptionUtils.findThrowable(e, PulsarAdminException.class);
            if (findThrowable.isPresent()) {
                throw ((PulsarAdminException) findThrowable.get());
            }
            throw new FlinkRuntimeException(e);
        }
    }

    @VisibleForTesting
    void refreshTopicMetadata(String str) {
        this.topicPartitionCache.refresh(str);
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        if (this.pulsarAdmin != null) {
            this.pulsarAdmin.close();
        }
    }

    private void registerNextTopicMetadataUpdateTimer() {
        this.timeService.registerTimer(this.timeService.getCurrentProcessingTime() + this.topicMetadataRefreshInterval.longValue(), j -> {
            triggerNextTopicMetadataUpdate();
        });
    }

    private void triggerNextTopicMetadataUpdate() {
        try {
            updateTopicMetadata();
        } catch (PulsarAdminException e) {
            LOG.warn("", e);
        }
        registerNextTopicMetadataUpdateTimer();
    }

    private void updateTopicMetadata() throws PulsarAdminException {
        ImmutableList.Builder builder = ImmutableList.builder();
        UnmodifiableIterator<String> it = this.topics.iterator();
        while (it.hasNext()) {
            String next = it.next();
            Optional<TopicMetadata> queryTopicMetadata = queryTopicMetadata(next);
            if (queryTopicMetadata.isPresent()) {
                TopicMetadata topicMetadata = queryTopicMetadata.get();
                int partitionSize = topicMetadata.getPartitionSize();
                if (topicMetadata.isPartitioned()) {
                    for (int i = 0; i < partitionSize; i++) {
                        builder.add((ImmutableList.Builder) new TopicPartition(next, i));
                    }
                } else {
                    builder.add((ImmutableList.Builder) new TopicPartition(next));
                }
            }
        }
        UnmodifiableIterator<String> it2 = this.partitions.iterator();
        while (it2.hasNext()) {
            TopicName topicName = TopicName.get(it2.next());
            builder.add((ImmutableList.Builder) new TopicPartition(topicName.getPartitionedTopicName(), topicName.getPartitionIndex()));
        }
        this.availablePartitions = builder.build();
    }
}
