package org.apache.flink.connector.kafka.testutils;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
import org.apache.flink.streaming.connectors.kafka.KafkaTestBase;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions;
import org.apache.kafka.clients.admin.RecordsToDelete;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.assertj.core.api.Assertions;

/* loaded from: input_file:org/apache/flink/connector/kafka/testutils/KafkaSourceTestEnv.class */
public class KafkaSourceTestEnv extends KafkaTestBase {
    public static final String GROUP_ID = "KafkaSourceTestEnv";
    public static final int NUM_PARTITIONS = 10;
    public static final int NUM_RECORDS_PER_PARTITION = 10;
    private static AdminClient adminClient;
    private static KafkaConsumer<String, Integer> consumer;

    public static void setup() throws Throwable {
        prepare();
        adminClient = getAdminClient();
        consumer = getConsumer();
    }

    public static void tearDown() throws Exception {
        consumer.close();
        adminClient.close();
        shutDownServices();
    }

    public static AdminClient getAdminClient() {
        Properties properties = new Properties();
        properties.putAll(standardProps);
        return AdminClient.create(properties);
    }

    public static KafkaConsumer<String, Integer> getConsumer() {
        Properties properties = new Properties();
        properties.putAll(standardProps);
        properties.setProperty("group.id", GROUP_ID);
        properties.setProperty("key.deserializer", StringDeserializer.class.getName());
        properties.setProperty("value.deserializer", IntegerDeserializer.class.getName());
        return new KafkaConsumer<>(properties);
    }

    public static Properties getConsumerProperties(Class<?> cls) {
        Properties properties = new Properties();
        properties.putAll(standardProps);
        properties.setProperty("group.id", GROUP_ID);
        properties.setProperty("key.deserializer", cls.getName());
        properties.setProperty("value.deserializer", cls.getName());
        return properties;
    }

    public static Map<Integer, Map<String, KafkaPartitionSplit>> getSplitsByOwners(Collection<String> collection, int i) {
        HashMap hashMap = new HashMap();
        Iterator<String> it = collection.iterator();
        while (it.hasNext()) {
            getPartitionsForTopic(it.next()).forEach(topicPartition -> {
                int abs = Math.abs(topicPartition.hashCode()) % i;
                ((Map) hashMap.computeIfAbsent(Integer.valueOf(abs), num -> {
                    return new HashMap();
                })).put(KafkaPartitionSplit.toSplitId(topicPartition), new KafkaPartitionSplit(topicPartition, getEarliestOffset(topicPartition), 10L));
            });
        }
        return hashMap;
    }

    public static List<ProducerRecord<String, Integer>> getRecordsForPartition(TopicPartition topicPartition) {
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 10; i++) {
            arrayList.add(new ProducerRecord(topicPartition.topic(), Integer.valueOf(topicPartition.partition()), Long.valueOf(i * 1000), topicPartition.toString(), Integer.valueOf(i)));
        }
        return arrayList;
    }

    public static List<ProducerRecord<String, Integer>> getRecordsForPartitionWithoutTimestamp(TopicPartition topicPartition) {
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 10; i++) {
            arrayList.add(new ProducerRecord(topicPartition.topic(), Integer.valueOf(topicPartition.partition()), (Long) null, topicPartition.toString(), Integer.valueOf(i)));
        }
        return arrayList;
    }

    public static List<ProducerRecord<String, Integer>> getRecordsForTopic(String str) {
        ArrayList arrayList = new ArrayList();
        Iterator<TopicPartition> it = getPartitionsForTopic(str).iterator();
        while (it.hasNext()) {
            arrayList.addAll(getRecordsForPartition(it.next()));
        }
        return arrayList;
    }

    public static List<ProducerRecord<String, Integer>> getRecordsForTopicWithoutTimestamp(String str) {
        ArrayList arrayList = new ArrayList();
        Iterator<TopicPartition> it = getPartitionsForTopic(str).iterator();
        while (it.hasNext()) {
            arrayList.addAll(getRecordsForPartitionWithoutTimestamp(it.next()));
        }
        return arrayList;
    }

    public static List<TopicPartition> getPartitionsForTopics(Collection<String> collection) {
        ArrayList arrayList = new ArrayList();
        collection.forEach(str -> {
            arrayList.addAll(getPartitionsForTopic(str));
        });
        return arrayList;
    }

    public static List<TopicPartition> getPartitionsForTopic(String str) {
        return (List) consumer.partitionsFor(str).stream().map(partitionInfo -> {
            return new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
        }).collect(Collectors.toList());
    }

    public static Map<TopicPartition, Long> getEarliestOffsets(List<TopicPartition> list) {
        HashMap hashMap = new HashMap();
        for (TopicPartition topicPartition : list) {
            hashMap.put(topicPartition, Long.valueOf(getEarliestOffset(topicPartition)));
        }
        return hashMap;
    }

    public static Map<TopicPartition, OffsetAndMetadata> getCommittedOffsets(List<TopicPartition> list) {
        HashMap hashMap = new HashMap();
        Iterator<TopicPartition> it = list.iterator();
        while (it.hasNext()) {
            hashMap.put(it.next(), new OffsetAndMetadata(r0.partition() + 2));
        }
        return hashMap;
    }

    public static long getEarliestOffset(TopicPartition topicPartition) {
        return topicPartition.partition();
    }

    public static void createTestTopic(String str) {
        createTestTopic(str, 10, 1);
    }

    public static void setupEarliestOffsets(String str) throws Throwable {
        setupEarliestOffsets(getPartitionsForTopic(str));
    }

    public static void setupEarliestOffsets(List<TopicPartition> list) throws Throwable {
        HashMap hashMap = new HashMap();
        getEarliestOffsets(list).forEach((topicPartition, l) -> {
            hashMap.put(topicPartition, RecordsToDelete.beforeOffset(l.longValue()));
        });
        adminClient.deleteRecords(hashMap).all().get();
    }

    public static void setupCommittedOffsets(String str) throws ExecutionException, InterruptedException {
        Map<TopicPartition, OffsetAndMetadata> committedOffsets = getCommittedOffsets(getPartitionsForTopic(str));
        consumer.commitSync(committedOffsets);
        Assertions.assertThat((Map) adminClient.listConsumerGroupOffsets(GROUP_ID, new ListConsumerGroupOffsetsOptions().topicPartitions(new ArrayList(committedOffsets.keySet()))).partitionsToOffsetAndMetadata().get()).as("The offsets are not committed", new Object[0]).isEqualTo(committedOffsets);
    }

    public static void produceToKafka(Collection<ProducerRecord<String, Integer>> collection) throws Throwable {
        produceToKafka(collection, StringSerializer.class, IntegerSerializer.class);
    }

    public static void setupTopic(String str, boolean z, boolean z2, Function<String, List<ProducerRecord<String, Integer>>> function) throws Throwable {
        createTestTopic(str);
        produceToKafka(function.apply(str));
        if (z) {
            setupEarliestOffsets(str);
        }
        if (z2) {
            setupCommittedOffsets(str);
        }
    }
}
