package org.apache.flink.connector.kafka.dynamic.source;

import java.util.Properties;
import java.util.Set;
import java.util.regex.Pattern;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.connector.kafka.dynamic.metadata.KafkaMetadataService;
import org.apache.flink.connector.kafka.dynamic.source.enumerator.subscriber.KafkaStreamSetSubscriber;
import org.apache.flink.connector.kafka.dynamic.source.enumerator.subscriber.KafkaStreamSubscriber;
import org.apache.flink.connector.kafka.dynamic.source.enumerator.subscriber.StreamPatternSubscriber;
import org.apache.flink.connector.kafka.source.KafkaSourceOptions;
import org.apache.flink.connector.kafka.source.enumerator.initializer.NoStoppingOffsetsInitializer;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.util.Preconditions;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Experimental
/* loaded from: input_file:org/apache/flink/connector/kafka/dynamic/source/DynamicKafkaSourceBuilder.class */
public class DynamicKafkaSourceBuilder<T> {
    private static final Logger logger = LoggerFactory.getLogger(DynamicKafkaSourceBuilder.class);
    private KafkaStreamSubscriber kafkaStreamSubscriber = null;
    private KafkaMetadataService kafkaMetadataService = null;
    private KafkaRecordDeserializationSchema<T> deserializationSchema = null;
    private OffsetsInitializer startingOffsetsInitializer = OffsetsInitializer.earliest();
    private OffsetsInitializer stoppingOffsetsInitializer = new NoStoppingOffsetsInitializer();
    private Boundedness boundedness = Boundedness.CONTINUOUS_UNBOUNDED;
    private final Properties props = new Properties();

    public DynamicKafkaSourceBuilder<T> setStreamIds(Set<String> set) {
        Preconditions.checkNotNull(set);
        ensureSubscriberIsNull("streamIds");
        this.kafkaStreamSubscriber = new KafkaStreamSetSubscriber(set);
        return this;
    }

    public DynamicKafkaSourceBuilder<T> setStreamPattern(Pattern pattern) {
        Preconditions.checkNotNull(pattern);
        ensureSubscriberIsNull("stream pattern");
        this.kafkaStreamSubscriber = new StreamPatternSubscriber(pattern);
        return this;
    }

    public DynamicKafkaSourceBuilder<T> setKafkaStreamSubscriber(KafkaStreamSubscriber kafkaStreamSubscriber) {
        Preconditions.checkNotNull(kafkaStreamSubscriber);
        ensureSubscriberIsNull("custom");
        this.kafkaStreamSubscriber = kafkaStreamSubscriber;
        return this;
    }

    public DynamicKafkaSourceBuilder<T> setBounded(OffsetsInitializer offsetsInitializer) {
        this.boundedness = Boundedness.BOUNDED;
        this.stoppingOffsetsInitializer = offsetsInitializer;
        return this;
    }

    public DynamicKafkaSourceBuilder<T> setKafkaMetadataService(KafkaMetadataService kafkaMetadataService) {
        this.kafkaMetadataService = kafkaMetadataService;
        return this;
    }

    public DynamicKafkaSourceBuilder<T> setDeserializer(KafkaRecordDeserializationSchema<T> kafkaRecordDeserializationSchema) {
        this.deserializationSchema = kafkaRecordDeserializationSchema;
        return this;
    }

    public DynamicKafkaSourceBuilder<T> setStartingOffsets(OffsetsInitializer offsetsInitializer) {
        this.startingOffsetsInitializer = offsetsInitializer;
        return this;
    }

    public DynamicKafkaSourceBuilder<T> setProperties(Properties properties) {
        this.props.putAll(properties);
        return this;
    }

    public DynamicKafkaSourceBuilder<T> setProperty(String str, String str2) {
        this.props.setProperty(str, str2);
        return this;
    }

    public DynamicKafkaSourceBuilder<T> setGroupId(String str) {
        return setProperty("group.id", str);
    }

    public DynamicKafkaSourceBuilder<T> setClientIdPrefix(String str) {
        return setProperty(KafkaSourceOptions.CLIENT_ID_PREFIX.key(), str);
    }

    public DynamicKafkaSource<T> build() {
        logger.info("Building the DynamicKafkaSource");
        sanityCheck();
        setRequiredConsumerProperties();
        return new DynamicKafkaSource<>(this.kafkaStreamSubscriber, this.kafkaMetadataService, this.deserializationSchema, this.startingOffsetsInitializer, this.stoppingOffsetsInitializer, this.props, this.boundedness);
    }

    private void setRequiredConsumerProperties() {
        maybeOverride("key.deserializer", ByteArrayDeserializer.class.getName(), true);
        maybeOverride("value.deserializer", ByteArrayDeserializer.class.getName(), true);
        if (!this.props.containsKey("group.id")) {
            logger.warn("Offset commit on checkpoint is disabled because {} is not specified", "group.id");
            maybeOverride(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT.key(), "false", false);
        }
        maybeOverride("enable.auto.commit", "false", false);
        maybeOverride("auto.offset.reset", this.startingOffsetsInitializer.getAutoOffsetResetStrategy().name().toLowerCase(), true);
        maybeOverride(KafkaSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS.key(), "-1", this.boundedness == Boundedness.BOUNDED);
        maybeOverride(DynamicKafkaSourceOptions.STREAM_METADATA_DISCOVERY_INTERVAL_MS.key(), "-1", this.boundedness == Boundedness.BOUNDED);
        maybeOverride(DynamicKafkaSourceOptions.STREAM_METADATA_DISCOVERY_FAILURE_THRESHOLD.key(), "0", this.boundedness == Boundedness.BOUNDED);
        maybeOverride(KafkaSourceOptions.CLIENT_ID_PREFIX.key(), this.props.containsKey("group.id") ? this.props.getProperty("group.id") : "DynamicKafkaSource-" + RandomStringUtils.randomAlphabetic(8), false);
    }

    private boolean maybeOverride(String str, String str2, boolean z) {
        boolean z2 = false;
        String property = this.props.getProperty(str);
        if (property == null) {
            this.props.setProperty(str, str2);
        } else if (z) {
            logger.warn(String.format("Property %s is provided but will be overridden from %s to %s", str, property, str2));
            this.props.setProperty(str, str2);
            z2 = true;
        }
        return z2;
    }

    private void sanityCheck() {
        Preconditions.checkNotNull(this.kafkaStreamSubscriber, "Kafka stream subscriber is required but not provided");
        Preconditions.checkNotNull(this.kafkaMetadataService, "Kafka Metadata Service is required but not provided");
        Preconditions.checkNotNull(this.deserializationSchema, "Deserialization schema is required but not provided.");
        Preconditions.checkState(this.props.containsKey("group.id") || !offsetCommitEnabledManually(), String.format("Property %s is required when offset commit is enabled", "group.id"));
    }

    private boolean offsetCommitEnabledManually() {
        return (this.props.containsKey("enable.auto.commit") && Boolean.parseBoolean(this.props.getProperty("enable.auto.commit"))) || (this.props.containsKey(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT.key()) && Boolean.parseBoolean(this.props.getProperty(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT.key())));
    }

    private void ensureSubscriberIsNull(String str) {
        if (this.kafkaStreamSubscriber != null) {
            throw new IllegalStateException(String.format("Cannot use %s for consumption because a %s is already set for consumption.", str, this.kafkaStreamSubscriber.getClass().getSimpleName()));
        }
    }
}
