package org.apache.flink.connector.kafka.testutils;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.flink.util.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.utility.DockerImageName;

/* loaded from: input_file:org/apache/flink/connector/kafka/testutils/KafkaUtil.class */
public class KafkaUtil {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaUtil.class);
    private static final Duration CONSUMER_POLL_DURATION = Duration.ofSeconds(1);

    private KafkaUtil() {
    }

    public static KafkaContainer createKafkaContainer(String str, Logger logger) {
        return createKafkaContainer(str, logger, null);
    }

    public static KafkaContainer createKafkaContainer(String str, Logger logger, String str2) {
        String str3 = logger.isTraceEnabled() ? "TRACE" : logger.isDebugEnabled() ? "DEBUG" : logger.isInfoEnabled() ? "INFO" : logger.isWarnEnabled() ? "WARN" : logger.isErrorEnabled() ? "ERROR" : "OFF";
        Slf4jLogConsumer slf4jLogConsumer = new Slf4jLogConsumer(logger);
        if (!StringUtils.isNullOrWhitespaceOnly(str2)) {
            slf4jLogConsumer.withPrefix(str2);
        }
        return new KafkaContainer(DockerImageName.parse(str)).withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1").withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1").withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "1").withEnv("KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE", "false").withEnv("KAFKA_LOG4J_ROOT_LOGLEVEL", str3).withEnv("KAFKA_LOG4J_LOGGERS", "state.change.logger=" + str3).withEnv("KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE", "false").withEnv("KAFKA_TRANSACTION_MAX_TIMEOUT_MS", String.valueOf(Duration.ofHours(2L).toMillis())).withEnv("KAFKA_LOG4J_TOOLS_ROOT_LOGLEVEL", str3).withLogConsumer(slf4jLogConsumer);
    }

    public static List<ConsumerRecord<byte[], byte[]>> drainAllRecordsFromTopic(String str, Properties properties, boolean z) throws KafkaException {
        Properties properties2 = new Properties();
        properties2.putAll(properties);
        properties2.put("isolation.level", z ? "read_committed" : "read_uncommitted");
        return drainAllRecordsFromTopic(str, properties2);
    }

    public static List<ConsumerRecord<byte[], byte[]>> drainAllRecordsFromTopic(String str, Properties properties) throws KafkaException {
        Properties properties2 = new Properties();
        properties2.putAll(properties);
        properties2.put("key.deserializer", ByteArrayDeserializer.class.getName());
        properties2.put("value.deserializer", ByteArrayDeserializer.class.getName());
        KafkaConsumer kafkaConsumer = new KafkaConsumer(properties2);
        try {
            Set<TopicPartition> allPartitions = getAllPartitions(kafkaConsumer, str);
            Map endOffsets = kafkaConsumer.endOffsets(allPartitions);
            kafkaConsumer.assign(allPartitions);
            kafkaConsumer.seekToBeginning(allPartitions);
            ArrayList arrayList = new ArrayList();
            while (!allPartitions.isEmpty()) {
                ConsumerRecords poll = kafkaConsumer.poll(CONSUMER_POLL_DURATION);
                LOG.debug("Fetched {} records from topic {}.", Integer.valueOf(poll.count()), str);
                ArrayList arrayList2 = new ArrayList();
                for (TopicPartition topicPartition : allPartitions) {
                    long position = kafkaConsumer.position(topicPartition);
                    long longValue = ((Long) endOffsets.get(topicPartition)).longValue();
                    LOG.debug("Endoffset {} and current position {} for partition {}", new Object[]{Long.valueOf(longValue), Long.valueOf(position), Integer.valueOf(topicPartition.partition())});
                    if (longValue - position <= 0) {
                        arrayList2.add(topicPartition);
                    }
                }
                if (allPartitions.removeAll(arrayList2)) {
                    kafkaConsumer.assign(allPartitions);
                }
                Iterator it = poll.iterator();
                while (it.hasNext()) {
                    arrayList.add((ConsumerRecord) it.next());
                }
            }
            kafkaConsumer.close();
            return arrayList;
        } catch (Throwable th) {
            try {
                kafkaConsumer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    private static Set<TopicPartition> getAllPartitions(KafkaConsumer<byte[], byte[]> kafkaConsumer, String str) {
        return (Set) kafkaConsumer.partitionsFor(str).stream().map(partitionInfo -> {
            return new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
        }).collect(Collectors.toSet());
    }
}
