package org.apache.flink.connector.kafka.dynamic.source.reader;

import com.google.common.collect.ArrayListMultimap;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Properties;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.connector.kafka.dynamic.metadata.ClusterMetadata;
import org.apache.flink.connector.kafka.dynamic.metadata.KafkaStream;
import org.apache.flink.connector.kafka.dynamic.source.GetMetadataUpdateEvent;
import org.apache.flink.connector.kafka.dynamic.source.MetadataUpdateEvent;
import org.apache.flink.connector.kafka.dynamic.source.metrics.KafkaClusterMetricGroup;
import org.apache.flink.connector.kafka.dynamic.source.metrics.KafkaClusterMetricGroupManager;
import org.apache.flink.connector.kafka.dynamic.source.split.DynamicKafkaSourceSplit;
import org.apache.flink.connector.kafka.source.KafkaPropertiesUtil;
import org.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderMetrics;
import org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter;
import org.apache.flink.connector.kafka.source.reader.KafkaSourceReader;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.connector.kafka.source.reader.fetcher.KafkaSourceFetcherManager;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffleProducer;
import org.apache.flink.streaming.runtime.io.MultipleFuturesAvailabilityHelper;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.UserCodeClassLoader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReader.class */
public class DynamicKafkaSourceReader<T> implements SourceReader<T, DynamicKafkaSourceSplit> {
    private static final Logger logger = LoggerFactory.getLogger(DynamicKafkaSourceReader.class);
    private final KafkaRecordDeserializationSchema<T> deserializationSchema;
    private final Properties properties;
    private final MetricGroup dynamicKafkaSourceMetricGroup;
    private final Gauge<Integer> kafkaClusterCount;
    private final SourceReaderContext readerContext;
    private final KafkaClusterMetricGroupManager kafkaClusterMetricGroupManager;
    private final NavigableMap<String, KafkaSourceReader<T>> clusterReaderMap = new TreeMap();
    private final Map<String, Properties> clustersProperties;
    private final List<DynamicKafkaSourceSplit> pendingSplits;
    private MultipleFuturesAvailabilityHelper availabilityHelper;
    private boolean isActivelyConsumingSplits;
    private boolean isNoMoreSplits;
    private AtomicBoolean restartingReaders;

    /* renamed from: org.apache.flink.connector.kafka.dynamic.source.reader.DynamicKafkaSourceReader$2, reason: invalid class name */
    /* loaded from: input_file:org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReader$2.class */
    static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$flink$core$io$InputStatus = new int[InputStatus.values().length];

        static {
            try {
                $SwitchMap$org$apache$flink$core$io$InputStatus[InputStatus.MORE_AVAILABLE.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$flink$core$io$InputStatus[InputStatus.NOTHING_AVAILABLE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    public DynamicKafkaSourceReader(SourceReaderContext sourceReaderContext, KafkaRecordDeserializationSchema<T> kafkaRecordDeserializationSchema, Properties properties) {
        this.readerContext = sourceReaderContext;
        this.deserializationSchema = kafkaRecordDeserializationSchema;
        this.properties = properties;
        NavigableMap<String, KafkaSourceReader<T>> navigableMap = this.clusterReaderMap;
        navigableMap.getClass();
        this.kafkaClusterCount = navigableMap::size;
        this.dynamicKafkaSourceMetricGroup = sourceReaderContext.metricGroup().addGroup(KafkaClusterMetricGroup.DYNAMIC_KAFKA_SOURCE_METRIC_GROUP);
        this.kafkaClusterMetricGroupManager = new KafkaClusterMetricGroupManager();
        this.pendingSplits = new ArrayList();
        this.availabilityHelper = new MultipleFuturesAvailabilityHelper(0);
        this.isNoMoreSplits = false;
        this.isActivelyConsumingSplits = false;
        this.restartingReaders = new AtomicBoolean();
        this.clustersProperties = new HashMap();
    }

    public void start() {
        logger.trace("Starting reader for subtask index={}", Integer.valueOf(this.readerContext.getIndexOfSubtask()));
        this.readerContext.metricGroup().gauge("kafkaClusterCount", this.kafkaClusterCount);
        this.readerContext.sendSourceEventToCoordinator(new GetMetadataUpdateEvent());
    }

    public InputStatus pollNext(ReaderOutput<T> readerOutput) throws Exception {
        if (this.clusterReaderMap.isEmpty()) {
            return logAndReturnInputStatus(InputStatus.NOTHING_AVAILABLE);
        }
        if (this.restartingReaders.get()) {
            logger.info("Poll next invoked while restarting readers");
            return logAndReturnInputStatus(InputStatus.NOTHING_AVAILABLE);
        }
        boolean z = false;
        boolean z2 = false;
        Iterator<Map.Entry<String, KafkaSourceReader<T>>> it = this.clusterReaderMap.entrySet().iterator();
        while (it.hasNext()) {
            switch (AnonymousClass2.$SwitchMap$org$apache$flink$core$io$InputStatus[it.next().getValue().pollNext(readerOutput).ordinal()]) {
                case FlinkKafkaShuffleProducer.KafkaSerializer.TAG_REC_WITHOUT_TIMESTAMP /* 1 */:
                    z = true;
                    break;
                case FlinkKafkaShuffleProducer.KafkaSerializer.TAG_WATERMARK /* 2 */:
                    z2 = true;
                    break;
            }
        }
        return logAndReturnInputStatus(consolidateInputStatus(z, z2));
    }

    private InputStatus consolidateInputStatus(boolean z, boolean z2) {
        return z ? InputStatus.MORE_AVAILABLE : z2 ? InputStatus.NOTHING_AVAILABLE : InputStatus.END_OF_INPUT;
    }

    public void addSplits(List<DynamicKafkaSourceSplit> list) {
        logger.info("Adding splits to reader {}: {}", Integer.valueOf(this.readerContext.getIndexOfSubtask()), list);
        if (!this.isActivelyConsumingSplits) {
            this.pendingSplits.addAll(list);
            return;
        }
        ArrayListMultimap create = ArrayListMultimap.create();
        for (DynamicKafkaSourceSplit dynamicKafkaSourceSplit : list) {
            create.put(dynamicKafkaSourceSplit.getKafkaClusterId(), dynamicKafkaSourceSplit);
        }
        boolean z = false;
        for (String str : create.keySet()) {
            if (!this.clusterReaderMap.containsKey(str)) {
                try {
                    KafkaSourceReader<T> createReader = createReader(str);
                    this.clusterReaderMap.put(str, createReader);
                    createReader.start();
                    z = true;
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }
            ((KafkaSourceReader) this.clusterReaderMap.get(str)).addSplits(create.get(str));
        }
        if (z) {
            completeAndResetAvailabilityHelper();
        }
    }

    public void handleSourceEvents(SourceEvent sourceEvent) {
        Preconditions.checkArgument(sourceEvent instanceof MetadataUpdateEvent, "Received invalid source event: " + sourceEvent);
        logger.info("Received source event {}: subtask={}", sourceEvent, Integer.valueOf(this.readerContext.getIndexOfSubtask()));
        Set<KafkaStream> kafkaStreams = ((MetadataUpdateEvent) sourceEvent).getKafkaStreams();
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        Iterator<KafkaStream> it = kafkaStreams.iterator();
        while (it.hasNext()) {
            for (Map.Entry<String, ClusterMetadata> entry : it.next().getClusterMetadataMap().entrySet()) {
                ((Set) hashMap.computeIfAbsent(entry.getKey(), str -> {
                    return new HashSet();
                })).addAll(entry.getValue().getTopics());
                hashMap2.put(entry.getKey(), entry.getValue().getProperties());
            }
        }
        List<DynamicKafkaSourceSplit> snapshotStateFromAllReaders = snapshotStateFromAllReaders(-1L);
        logger.info("Snapshotting split state for reader {}: {}", Integer.valueOf(this.readerContext.getIndexOfSubtask()), snapshotStateFromAllReaders);
        HashMap hashMap3 = new HashMap();
        HashMap hashMap4 = new HashMap();
        for (DynamicKafkaSourceSplit dynamicKafkaSourceSplit : snapshotStateFromAllReaders) {
            ((Set) hashMap3.computeIfAbsent(dynamicKafkaSourceSplit.getKafkaClusterId(), str2 -> {
                return new HashSet();
            })).add(dynamicKafkaSourceSplit.getKafkaPartitionSplit().getTopic());
            if (hashMap.containsKey(dynamicKafkaSourceSplit.getKafkaClusterId()) && ((Set) hashMap.get(dynamicKafkaSourceSplit.getKafkaClusterId())).contains(dynamicKafkaSourceSplit.getKafkaPartitionSplit().getTopic())) {
                ((List) hashMap4.computeIfAbsent(dynamicKafkaSourceSplit.getKafkaClusterId(), str3 -> {
                    return new ArrayList();
                })).add(dynamicKafkaSourceSplit);
            } else {
                logger.info("Skipping outdated split due to metadata changes: {}", dynamicKafkaSourceSplit);
            }
        }
        if (hashMap.equals(hashMap3)) {
            this.clustersProperties.clear();
            this.clustersProperties.putAll(hashMap2);
        } else {
            this.restartingReaders.set(true);
            closeAllReadersAndClearState();
            this.clustersProperties.putAll(hashMap2);
            for (String str4 : hashMap.keySet()) {
                try {
                    KafkaSourceReader<T> createReader = createReader(str4);
                    this.clusterReaderMap.put(str4, createReader);
                    if (hashMap4.containsKey(str4)) {
                        createReader.addSplits((List) hashMap4.get(str4));
                    }
                    createReader.start();
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }
            completeAndResetAvailabilityHelper();
        }
        if (!this.isActivelyConsumingSplits) {
            this.isActivelyConsumingSplits = true;
        }
        if (this.pendingSplits.isEmpty()) {
            return;
        }
        addSplits((List) this.pendingSplits.stream().filter(dynamicKafkaSourceSplit2 -> {
            boolean isSplitForActiveClusters = isSplitForActiveClusters(dynamicKafkaSourceSplit2, hashMap);
            if (!isSplitForActiveClusters) {
                logger.info("Removing invalid split for reader: {}", dynamicKafkaSourceSplit2);
            }
            return isSplitForActiveClusters;
        }).collect(Collectors.toList()));
        this.pendingSplits.clear();
        if (this.isNoMoreSplits) {
            notifyNoMoreSplits();
        }
    }

    private static boolean isSplitForActiveClusters(DynamicKafkaSourceSplit dynamicKafkaSourceSplit, Map<String, Set<String>> map) {
        return map.containsKey(dynamicKafkaSourceSplit.getKafkaClusterId()) && map.get(dynamicKafkaSourceSplit.getKafkaClusterId()).contains(dynamicKafkaSourceSplit.getKafkaPartitionSplit().getTopic());
    }

    public List<DynamicKafkaSourceSplit> snapshotState(long j) {
        List<DynamicKafkaSourceSplit> snapshotStateFromAllReaders = snapshotStateFromAllReaders(j);
        snapshotStateFromAllReaders.addAll(this.pendingSplits);
        return snapshotStateFromAllReaders;
    }

    private List<DynamicKafkaSourceSplit> snapshotStateFromAllReaders(long j) {
        ArrayList arrayList = new ArrayList();
        for (Map.Entry<String, KafkaSourceReader<T>> entry : this.clusterReaderMap.entrySet()) {
            entry.getValue().snapshotState(j).forEach(kafkaPartitionSplit -> {
                arrayList.add(new DynamicKafkaSourceSplit((String) entry.getKey(), kafkaPartitionSplit));
            });
        }
        return arrayList;
    }

    public CompletableFuture<Void> isAvailable() {
        this.availabilityHelper.resetToUnAvailable();
        syncAvailabilityHelperWithReaders();
        return this.availabilityHelper.getAvailableFuture();
    }

    public void notifyNoMoreSplits() {
        logger.info("notify no more splits for reader {}", Integer.valueOf(this.readerContext.getIndexOfSubtask()));
        if (this.pendingSplits.isEmpty()) {
            this.clusterReaderMap.values().forEach((v0) -> {
                v0.notifyNoMoreSplits();
            });
        }
        this.isNoMoreSplits = true;
    }

    public void notifyCheckpointComplete(long j) throws Exception {
        logger.debug("Notify checkpoint complete for {}", this.clusterReaderMap.keySet());
        Iterator<KafkaSourceReader<T>> it = this.clusterReaderMap.values().iterator();
        while (it.hasNext()) {
            it.next().notifyCheckpointComplete(j);
        }
    }

    public void close() throws Exception {
        Iterator<KafkaSourceReader<T>> it = this.clusterReaderMap.values().iterator();
        while (it.hasNext()) {
            it.next().close();
        }
        this.kafkaClusterMetricGroupManager.close();
    }

    private KafkaSourceReader<T> createReader(String str) throws Exception {
        FutureCompletingBlockingQueue futureCompletingBlockingQueue = new FutureCompletingBlockingQueue();
        Properties properties = new Properties();
        KafkaPropertiesUtil.copyProperties(this.properties, properties);
        KafkaPropertiesUtil.copyProperties((Properties) Preconditions.checkNotNull(this.clustersProperties.get(str), "Properties for cluster %s is not found. Current Kafka cluster ids: %s", new Object[]{str, this.clustersProperties.keySet()}), properties);
        KafkaPropertiesUtil.setClientIdPrefix(properties, str);
        final KafkaClusterMetricGroup kafkaClusterMetricGroup = new KafkaClusterMetricGroup(this.dynamicKafkaSourceMetricGroup, this.readerContext.metricGroup(), str);
        this.kafkaClusterMetricGroupManager.register(str, kafkaClusterMetricGroup);
        KafkaSourceReaderMetrics kafkaSourceReaderMetrics = new KafkaSourceReaderMetrics(kafkaClusterMetricGroup);
        this.deserializationSchema.open(new DeserializationSchema.InitializationContext() { // from class: org.apache.flink.connector.kafka.dynamic.source.reader.DynamicKafkaSourceReader.1
            public MetricGroup getMetricGroup() {
                return kafkaClusterMetricGroup.addGroup("deserializer");
            }

            public UserCodeClassLoader getUserCodeClassLoader() {
                return DynamicKafkaSourceReader.this.readerContext.getUserCodeClassLoader();
            }
        });
        return new KafkaSourceReader<>(futureCompletingBlockingQueue, new KafkaSourceFetcherManager(futureCompletingBlockingQueue, () -> {
            return new KafkaPartitionSplitReaderWrapper(properties, this.readerContext, kafkaSourceReaderMetrics, str);
        }, collection -> {
        }), new KafkaRecordEmitter(this.deserializationSchema), toConfiguration(properties), this.readerContext, kafkaSourceReaderMetrics);
    }

    private void completeAndResetAvailabilityHelper() {
        CompletableFuture availableFuture = this.availabilityHelper.getAvailableFuture();
        this.availabilityHelper = new MultipleFuturesAvailabilityHelper(this.clusterReaderMap.size());
        syncAvailabilityHelperWithReaders();
        this.availabilityHelper.getAvailableFuture().whenComplete((BiConsumer) (obj, th) -> {
            this.restartingReaders.set(false);
            availableFuture.complete(null);
        });
    }

    private void syncAvailabilityHelperWithReaders() {
        int i = 0;
        Iterator<String> it = this.clusterReaderMap.navigableKeySet().iterator();
        while (it.hasNext()) {
            this.availabilityHelper.anyOf(i, ((KafkaSourceReader) this.clusterReaderMap.get(it.next())).isAvailable());
            i++;
        }
    }

    private void closeAllReadersAndClearState() {
        for (Map.Entry<String, KafkaSourceReader<T>> entry : this.clusterReaderMap.entrySet()) {
            try {
                logger.info("Closing sub reader in reader {} for cluster: {}", Integer.valueOf(this.readerContext.getIndexOfSubtask()), entry.getKey());
                entry.getValue().close();
                this.kafkaClusterMetricGroupManager.close(entry.getKey());
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
        this.clusterReaderMap.clear();
        this.clustersProperties.clear();
    }

    static Configuration toConfiguration(Properties properties) {
        Configuration configuration = new Configuration();
        properties.stringPropertyNames().forEach(str -> {
            configuration.setString(str, properties.getProperty(str));
        });
        return configuration;
    }

    private InputStatus logAndReturnInputStatus(InputStatus inputStatus) {
        if (InputStatus.END_OF_INPUT.equals(inputStatus)) {
            logger.info("inputStatus={}, subtaskIndex={}", inputStatus, Integer.valueOf(this.readerContext.getIndexOfSubtask()));
        } else {
            logger.trace("inputStatus={}, subtaskIndex={}", inputStatus, Integer.valueOf(this.readerContext.getIndexOfSubtask()));
        }
        return inputStatus;
    }

    @VisibleForTesting
    public MultipleFuturesAvailabilityHelper getAvailabilityHelper() {
        return this.availabilityHelper;
    }

    @VisibleForTesting
    public boolean isActivelyConsumingSplits() {
        return this.isActivelyConsumingSplits;
    }
}
