package org.apache.beam.sdk.io.kafka;

import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.regex.Pattern;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.transforms.Contextful;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Impulse;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.Watch;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.TopicPartition;
import org.joda.time.Duration;
import org.joda.time.Instant;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/beam/sdk/io/kafka/WatchForKafkaTopicPartitions.class */
public class WatchForKafkaTopicPartitions extends PTransform<PBegin, PCollection<KafkaSourceDescriptor>> {
    private static final Duration DEFAULT_CHECK_DURATION = Duration.standardHours(1);
    private static final String COUNTER_NAMESPACE = "watch_kafka_topic_partition";
    private final Duration checkDuration;
    private final SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> kafkaConsumerFactoryFn;
    private final Map<String, Object> kafkaConsumerConfig;
    private final CheckStopReadingFn checkStopReadingFn;
    private final Set<String> topics;
    private final Pattern topicPattern;
    private final Instant startReadTime;
    private final Instant stopReadTime;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/kafka/WatchForKafkaTopicPartitions$ConvertToDescriptor.class */
    public static class ConvertToDescriptor extends DoFn<KV<byte[], TopicPartition>, KafkaSourceDescriptor> {
        private final CheckStopReadingFn checkStopReadingFn;
        private final Instant startReadTime;
        private final Instant stopReadTime;

        private ConvertToDescriptor(CheckStopReadingFn checkStopReadingFn, Instant instant, Instant instant2) {
            this.checkStopReadingFn = checkStopReadingFn;
            this.startReadTime = instant;
            this.stopReadTime = instant2;
        }

        @DoFn.ProcessElement
        public void processElement(@DoFn.Element KV<byte[], TopicPartition> kv, DoFn.OutputReceiver<KafkaSourceDescriptor> outputReceiver) {
            TopicPartition topicPartition = (TopicPartition) Objects.requireNonNull((TopicPartition) kv.getValue());
            if (this.checkStopReadingFn == null || !((Boolean) this.checkStopReadingFn.apply(topicPartition)).booleanValue()) {
                Metrics.counter(WatchForKafkaTopicPartitions.COUNTER_NAMESPACE, topicPartition.toString()).inc();
                outputReceiver.output(KafkaSourceDescriptor.of(topicPartition, null, this.startReadTime, null, this.stopReadTime, null));
            }
        }

        @DoFn.Setup
        public void setup() throws Exception {
            if (this.checkStopReadingFn != null) {
                this.checkStopReadingFn.setup();
            }
        }

        @DoFn.Teardown
        public void teardown() throws Exception {
            if (this.checkStopReadingFn != null) {
                this.checkStopReadingFn.teardown();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/kafka/WatchForKafkaTopicPartitions$WatchPartitionFn.class */
    public static class WatchPartitionFn extends Watch.Growth.PollFn<byte[], TopicPartition> {
        private final SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> kafkaConsumerFactoryFn;
        private final Map<String, Object> kafkaConsumerConfig;
        private final Set<String> topics;
        private final Pattern topicPattern;

        private WatchPartitionFn(SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> serializableFunction, Map<String, Object> map, Set<String> set, Pattern pattern) {
            this.kafkaConsumerFactoryFn = serializableFunction;
            this.kafkaConsumerConfig = map;
            this.topics = set;
            this.topicPattern = pattern;
        }

        public Watch.Growth.PollResult<TopicPartition> apply(byte[] bArr, Contextful.Fn.Context context) throws Exception {
            Instant now = Instant.now();
            return Watch.Growth.PollResult.incomplete(now, WatchForKafkaTopicPartitions.getAllTopicPartitions(this.kafkaConsumerFactoryFn, this.kafkaConsumerConfig, this.topics, this.topicPattern)).withWatermark(now);
        }
    }

    public WatchForKafkaTopicPartitions(Duration duration, SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> serializableFunction, Map<String, Object> map, CheckStopReadingFn checkStopReadingFn, Set<String> set, Pattern pattern, Instant instant, Instant instant2) {
        this.checkDuration = (Duration) MoreObjects.firstNonNull(duration, DEFAULT_CHECK_DURATION);
        this.kafkaConsumerFactoryFn = serializableFunction;
        this.kafkaConsumerConfig = map;
        this.checkStopReadingFn = checkStopReadingFn;
        this.topics = set;
        this.topicPattern = pattern;
        this.startReadTime = instant;
        this.stopReadTime = instant2;
    }

    public PCollection<KafkaSourceDescriptor> expand(PBegin pBegin) {
        return pBegin.apply(Impulse.create()).apply("Match new TopicPartitions", Watch.growthOf(new WatchPartitionFn(this.kafkaConsumerFactoryFn, this.kafkaConsumerConfig, this.topics, this.topicPattern)).withPollInterval(this.checkDuration)).apply(ParDo.of(new ConvertToDescriptor(this.checkStopReadingFn, this.startReadTime, this.stopReadTime)));
    }

    /* JADX WARN: Removed duplicated region for block: B:21:0x0111  */
    @org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    static java.util.List<org.apache.kafka.common.TopicPartition> getAllTopicPartitions(org.apache.beam.sdk.transforms.SerializableFunction<java.util.Map<java.lang.String, java.lang.Object>, org.apache.kafka.clients.consumer.Consumer<byte[], byte[]>> r6, java.util.Map<java.lang.String, java.lang.Object> r7, java.util.Set<java.lang.String> r8, java.util.regex.Pattern r9) {
        /*
            Method dump skipped, instructions count: 315
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.beam.sdk.io.kafka.WatchForKafkaTopicPartitions.getAllTopicPartitions(org.apache.beam.sdk.transforms.SerializableFunction, java.util.Map, java.util.Set, java.util.regex.Pattern):java.util.List");
    }
}
