package org.apache.flink.connector.kafka.testutils;

import java.net.URL;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.regex.Pattern;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.KafkaSourceBuilder;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.connector.testframe.external.ExternalSystemSplitDataWriter;
import org.apache.flink.connector.testframe.external.source.DataStreamSourceExternalContext;
import org.apache.flink.connector.testframe.external.source.TestingSourceSettings;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewPartitions;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/connector/kafka/testutils/KafkaSourceExternalContext.class */
public class KafkaSourceExternalContext implements DataStreamSourceExternalContext<String> {
    private static final String TOPIC_NAME_PREFIX = "kafka-test-topic-";
    private static final String GROUP_ID_PREFIX = "kafka-source-external-context-";
    private static final int NUM_RECORDS_UPPER_BOUND = 500;
    private static final int NUM_RECORDS_LOWER_BOUND = 100;
    private final List<URL> connectorJarPaths;
    private final String bootstrapServers;
    private final SplitMappingMode splitMappingMode;
    private static final Logger LOG = LoggerFactory.getLogger(KafkaSourceExternalContext.class);
    private static final Pattern TOPIC_NAME_PATTERN = Pattern.compile("kafka-test-topic-.*");
    private final List<KafkaPartitionDataWriter> writers = new ArrayList();
    private final String topicName = randomize(TOPIC_NAME_PREFIX);
    private final AdminClient adminClient = createAdminClient();

    /* loaded from: input_file:org/apache/flink/connector/kafka/testutils/KafkaSourceExternalContext$SplitMappingMode.class */
    public enum SplitMappingMode {
        TOPIC,
        PARTITION
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public KafkaSourceExternalContext(String str, SplitMappingMode splitMappingMode, List<URL> list) {
        this.connectorJarPaths = list;
        this.bootstrapServers = str;
        this.splitMappingMode = splitMappingMode;
    }

    public List<URL> getConnectorJarPaths() {
        return this.connectorJarPaths;
    }

    public Source<String, ?, ?> createSource(TestingSourceSettings testingSourceSettings) {
        KafkaSourceBuilder builder = KafkaSource.builder();
        builder.setBootstrapServers(this.bootstrapServers).setTopicPattern(TOPIC_NAME_PATTERN).setGroupId(randomize(GROUP_ID_PREFIX)).setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class));
        if (testingSourceSettings.getBoundedness().equals(Boundedness.BOUNDED)) {
            builder.setBounded(OffsetsInitializer.latest());
        }
        return builder.build();
    }

    public ExternalSystemSplitDataWriter<String> createSourceSplitDataWriter(TestingSourceSettings testingSourceSettings) {
        KafkaPartitionDataWriter scaleOutTopic;
        try {
            switch (this.splitMappingMode) {
                case TOPIC:
                    scaleOutTopic = createSinglePartitionTopic(this.writers.size());
                    break;
                case PARTITION:
                    scaleOutTopic = scaleOutTopic(this.topicName);
                    break;
                default:
                    throw new IllegalArgumentException("Split mode should be either TOPIC or PARTITION");
            }
            this.writers.add(scaleOutTopic);
            return scaleOutTopic;
        } catch (Exception e) {
            throw new RuntimeException("Failed to create new splits", e);
        }
    }

    public List<String> generateTestData(TestingSourceSettings testingSourceSettings, int i, long j) {
        Random random = new Random(j);
        int nextInt = random.nextInt(400) + NUM_RECORDS_LOWER_BOUND;
        ArrayList arrayList = new ArrayList(nextInt);
        for (int i2 = 0; i2 < nextInt; i2++) {
            arrayList.add(i + "-" + RandomStringUtils.randomAlphanumeric(random.nextInt(50) + 1));
        }
        return arrayList;
    }

    public TypeInformation<String> getProducedType() {
        return TypeInformation.of(String.class);
    }

    public void close() throws Exception {
        ArrayList arrayList = new ArrayList();
        this.writers.forEach(kafkaPartitionDataWriter -> {
            arrayList.add(kafkaPartitionDataWriter.getTopicPartition().topic());
            kafkaPartitionDataWriter.close();
        });
        this.adminClient.deleteTopics(arrayList).all().get();
    }

    public String toString() {
        return "KafkaSource-" + this.splitMappingMode.toString();
    }

    private String randomize(String str) {
        return str + ThreadLocalRandom.current().nextLong(0L, Long.MAX_VALUE);
    }

    private AdminClient createAdminClient() {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", this.bootstrapServers);
        return AdminClient.create(properties);
    }

    private KafkaPartitionDataWriter createSinglePartitionTopic(int i) throws Exception {
        String str = this.topicName + "-" + i;
        LOG.info("Creating topic '{}'", str);
        this.adminClient.createTopics(Collections.singletonList(new NewTopic(str, 1, (short) 1))).all().get();
        return new KafkaPartitionDataWriter(getKafkaProducerProperties(i), new TopicPartition(str, 0));
    }

    private KafkaPartitionDataWriter scaleOutTopic(String str) throws Exception {
        if (!((Set) this.adminClient.listTopics().names().get()).contains(str)) {
            LOG.info("Creating topic '{}'", str);
            this.adminClient.createTopics(Collections.singletonList(new NewTopic(str, 1, (short) 1))).all().get();
            return new KafkaPartitionDataWriter(getKafkaProducerProperties(0), new TopicPartition(str, 0));
        }
        int size = ((TopicDescription) ((Map) this.adminClient.describeTopics(Collections.singletonList(str)).allTopicNames().get()).get(str)).partitions().size();
        LOG.info("Creating partition {} for topic '{}'", Integer.valueOf(size + 1), str);
        this.adminClient.createPartitions(Collections.singletonMap(str, NewPartitions.increaseTo(size + 1))).all().get();
        return new KafkaPartitionDataWriter(getKafkaProducerProperties(size), new TopicPartition(str, size));
    }

    private Properties getKafkaProducerProperties(int i) {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", this.bootstrapServers);
        properties.setProperty("client.id", String.join("-", "flink-kafka-split-writer", Integer.toString(i), Long.toString(ThreadLocalRandom.current().nextLong(Long.MAX_VALUE))));
        properties.setProperty("key.serializer", ByteArraySerializer.class.getName());
        properties.setProperty("value.serializer", ByteArraySerializer.class.getName());
        return properties;
    }
}
