package org.apache.flink.streaming.connectors.kafka;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.commons.collections.list.UnmodifiableList;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.KafkaSourceBuilder;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.connector.kafka.testutils.KafkaUtil;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.operators.StreamSink;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironment;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
import org.assertj.core.api.Assertions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.DockerClientFactory;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.lifecycle.Startable;
import org.testcontainers.utility.DockerImageName;

/* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.class */
public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
    protected static final Logger LOG = LoggerFactory.getLogger(KafkaTestEnvironmentImpl.class);
    private static final String ZOOKEEPER_HOSTNAME = "zookeeper";
    private static final int ZOOKEEPER_PORT = 2181;

    @Nullable
    private GenericContainer<?> zookeeper;

    @Nullable
    private Network network;
    private Properties standardProps;
    private KafkaTestEnvironment.Config config;
    private static final int REQUEST_TIMEOUT_SECONDS = 30;
    private final Map<Integer, KafkaContainer> brokers = new HashMap();
    private final Set<Integer> pausedBroker = new HashSet();
    private String brokerConnectionString = "";
    private FlinkKafkaProducer.Semantic producerSemantic = FlinkKafkaProducer.Semantic.EXACTLY_ONCE;
    private int zkTimeout = 30000;

    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl$KafkaOffsetHandlerImpl.class */
    private class KafkaOffsetHandlerImpl implements KafkaTestEnvironment.KafkaOffsetHandler {
        private final KafkaConsumer<byte[], byte[]> offsetClient;

        public KafkaOffsetHandlerImpl() {
            Properties properties = new Properties();
            properties.putAll(KafkaTestEnvironmentImpl.this.standardProps);
            properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
            properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
            this.offsetClient = new KafkaConsumer<>(properties);
        }

        @Override // org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironment.KafkaOffsetHandler
        public Long getCommittedOffset(String str, int i) {
            OffsetAndMetadata committed = this.offsetClient.committed(new TopicPartition(str, i));
            if (committed != null) {
                return Long.valueOf(committed.offset());
            }
            return null;
        }

        @Override // org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironment.KafkaOffsetHandler
        public void setCommittedOffset(String str, int i, long j) {
            HashMap hashMap = new HashMap();
            hashMap.put(new TopicPartition(str, i), new OffsetAndMetadata(j));
            this.offsetClient.commitSync(hashMap);
        }

        @Override // org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironment.KafkaOffsetHandler
        public void close() {
            this.offsetClient.close();
        }
    }

    public void setProducerSemantic(FlinkKafkaProducer.Semantic semantic) {
        this.producerSemantic = semantic;
    }

    @Override // org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironment
    public void prepare(KafkaTestEnvironment.Config config) throws Exception {
        if (config.isSecureMode()) {
            config.setKafkaServersNumber(1);
            this.zkTimeout *= 15;
        }
        this.config = config;
        this.brokers.clear();
        LOG.info("Starting KafkaServer");
        startKafkaContainerCluster(config.getKafkaServersNumber());
        LOG.info("KafkaServer started.");
        this.standardProps = new Properties();
        this.standardProps.setProperty("bootstrap.servers", this.brokerConnectionString);
        this.standardProps.setProperty("group.id", "flink-tests");
        this.standardProps.setProperty("enable.auto.commit", "false");
        this.standardProps.setProperty("zookeeper.session.timeout.ms", String.valueOf(this.zkTimeout));
        this.standardProps.setProperty("zookeeper.connection.timeout.ms", String.valueOf(this.zkTimeout));
        this.standardProps.setProperty("auto.offset.reset", "earliest");
        this.standardProps.setProperty("max.partition.fetch.bytes", "256");
    }

    @Override // org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironment
    public void deleteTestTopic(String str) {
        LOG.info("Deleting topic {}", str);
        Properties secureProperties = getSecureProperties();
        secureProperties.putAll(getStandardProperties());
        String l = Long.toString(new Random().nextLong());
        secureProperties.put("client.id", l);
        AdminClient create = AdminClient.create(secureProperties);
        try {
            try {
                tryDelete(create, str);
                create.close(Duration.ofMillis(5000L));
                maybePrintDanglingThreadStacktrace(l);
            } catch (Exception e) {
                e.printStackTrace();
                Assertions.fail(String.format("Delete test topic : %s failed, %s", str, e.getMessage()));
                create.close(Duration.ofMillis(5000L));
                maybePrintDanglingThreadStacktrace(l);
            }
        } catch (Throwable th) {
            create.close(Duration.ofMillis(5000L));
            maybePrintDanglingThreadStacktrace(l);
            throw th;
        }
    }

    private void tryDelete(AdminClient adminClient, String str) throws Exception {
        try {
            adminClient.deleteTopics(Collections.singleton(str)).all().get();
            CommonTestUtils.waitUtil(() -> {
                try {
                    return Boolean.valueOf(((Collection) adminClient.listTopics().listings().get()).stream().map((v0) -> {
                        return v0.name();
                    }).noneMatch(str2 -> {
                        return str2.equals(str);
                    }));
                } catch (Exception e) {
                    LOG.warn("Exception caught when listing Kafka topics", e);
                    return false;
                }
            }, Duration.ofSeconds(30L), String.format("Topic \"%s\" was not deleted within timeout", str));
        } catch (TimeoutException e) {
            LOG.info("Did not receive delete topic response within {} seconds. Checking if it succeeded", Integer.valueOf(REQUEST_TIMEOUT_SECONDS));
            if (((Set) adminClient.listTopics().names().get()).contains(str)) {
                throw new Exception("Topic still exists after timeout", e);
            }
        }
    }

    @Override // org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironment
    public void createTestTopic(String str, int i, int i2, Properties properties) {
        LOG.info("Creating topic {}", str);
        try {
            AdminClient create = AdminClient.create(getStandardProperties());
            try {
                NewTopic newTopic = new NewTopic(str, i, (short) i2);
                create.createTopics(Collections.singleton(newTopic)).all().get();
                CommonTestUtils.waitUtil(() -> {
                    try {
                        Map map = (Map) create.describeTopics(Collections.singleton(str)).allTopicNames().get(30L, TimeUnit.SECONDS);
                        if (map == null || !map.containsKey(str)) {
                            return false;
                        }
                        return Boolean.valueOf(((TopicDescription) map.get(str)).partitions().size() == i);
                    } catch (Exception e) {
                        LOG.warn("Exception caught when describing Kafka topics", e);
                        return false;
                    }
                }, Duration.ofSeconds(30L), String.format("New topic \"%s\" is not ready within timeout", newTopic));
                if (create != null) {
                    create.close();
                }
            } finally {
            }
        } catch (Exception e) {
            e.printStackTrace();
            Assertions.fail("Create test topic : " + str + " failed, " + e.getMessage());
        }
    }

    @Override // org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironment
    public Properties getStandardProperties() {
        return this.standardProps;
    }

    @Override // org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironment
    public Properties getSecureProperties() {
        Properties properties = new Properties();
        if (this.config.isSecureMode()) {
            properties.put("security.inter.broker.protocol", "SASL_PLAINTEXT");
            properties.put("security.protocol", "SASL_PLAINTEXT");
            properties.put("sasl.kerberos.service.name", "kafka");
            properties.setProperty("zookeeper.session.timeout.ms", String.valueOf(this.zkTimeout));
            properties.setProperty("zookeeper.connection.timeout.ms", String.valueOf(this.zkTimeout));
            properties.setProperty("metadata.fetch.timeout.ms", "120000");
        }
        return properties;
    }

    @Override // org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironment
    public String getBrokerConnectionString() {
        return this.brokerConnectionString;
    }

    @Override // org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironment
    public String getVersion() {
        return "confluentinc/cp-kafka:7.2.2";
    }

    @Override // org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironment
    public <T> FlinkKafkaConsumerBase<T> getConsumer(List<String> list, KafkaDeserializationSchema<T> kafkaDeserializationSchema, Properties properties) {
        return new FlinkKafkaConsumer(list, kafkaDeserializationSchema, properties);
    }

    @Override // org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironment
    public <T> KafkaSourceBuilder<T> getSourceBuilder(List<String> list, KafkaDeserializationSchema<T> kafkaDeserializationSchema, Properties properties) {
        return KafkaSource.builder().setTopics(list).setDeserializer(KafkaRecordDeserializationSchema.of(kafkaDeserializationSchema)).setProperties(properties);
    }

    @Override // org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironment
    public <K, V> Collection<ConsumerRecord<K, V>> getAllRecordsFromTopic(Properties properties, String str) {
        return UnmodifiableList.decorate(KafkaUtil.drainAllRecordsFromTopic(str, properties));
    }

    @Override // org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironment
    public <T> StreamSink<T> getProducerSink(String str, SerializationSchema<T> serializationSchema, Properties properties, FlinkKafkaPartitioner<T> flinkKafkaPartitioner) {
        return new StreamSink<>(new FlinkKafkaProducer(str, serializationSchema, properties, flinkKafkaPartitioner, this.producerSemantic, 5));
    }

    @Override // org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironment
    public <T> DataStreamSink<T> produceIntoKafka(DataStream<T> dataStream, String str, KeyedSerializationSchema<T> keyedSerializationSchema, Properties properties, FlinkKafkaPartitioner<T> flinkKafkaPartitioner) {
        return dataStream.addSink(new FlinkKafkaProducer(str, keyedSerializationSchema, properties, Optional.ofNullable(flinkKafkaPartitioner), this.producerSemantic, 5));
    }

    @Override // org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironment
    public <T> DataStreamSink<T> produceIntoKafka(DataStream<T> dataStream, String str, SerializationSchema<T> serializationSchema, Properties properties, FlinkKafkaPartitioner<T> flinkKafkaPartitioner) {
        return dataStream.addSink(new FlinkKafkaProducer(str, serializationSchema, properties, flinkKafkaPartitioner, this.producerSemantic, 5));
    }

    @Override // org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironment
    public <T> DataStreamSink<T> produceIntoKafka(DataStream<T> dataStream, String str, KafkaSerializationSchema<T> kafkaSerializationSchema, Properties properties) {
        return dataStream.addSink(new FlinkKafkaProducer(str, kafkaSerializationSchema, properties, this.producerSemantic));
    }

    @Override // org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironment
    public KafkaTestEnvironment.KafkaOffsetHandler createOffsetHandler() {
        return new KafkaOffsetHandlerImpl();
    }

    @Override // org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironment
    public void restartBroker(int i) throws Exception {
        unpause(i);
    }

    @Override // org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironment
    public void stopBroker(int i) throws Exception {
        pause(i);
    }

    @Override // org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironment
    public int getLeaderToShutDown(String str) throws Exception {
        AdminClient create = AdminClient.create(getStandardProperties());
        try {
            int id = ((TopicPartitionInfo) ((TopicDescription) ((Map) create.describeTopics(Collections.singleton(str)).allTopicNames().get()).get(str)).partitions().get(0)).leader().id();
            if (create != null) {
                create.close();
            }
            return id;
        } catch (Throwable th) {
            if (create != null) {
                try {
                    create.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Override // org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironment
    public boolean isSecureRunSupported() {
        return true;
    }

    @Override // org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironment
    public void shutdown() throws Exception {
        this.brokers.values().forEach((v0) -> {
            v0.stop();
        });
        this.brokers.clear();
        if (this.zookeeper != null) {
            this.zookeeper.stop();
        }
        if (this.network != null) {
            this.network.close();
        }
    }

    private void startKafkaContainerCluster(int i) {
        if (i > 1) {
            this.network = Network.newNetwork();
            this.zookeeper = createZookeeperContainer(this.network);
            this.zookeeper.start();
            LOG.info("Zookeeper container started");
        }
        for (int i2 = 0; i2 < i; i2++) {
            this.brokers.put(Integer.valueOf(i2), createKafkaContainer(i2, this.zookeeper));
        }
        new ArrayList(this.brokers.values()).parallelStream().forEach((v0) -> {
            v0.start();
        });
        LOG.info("{} brokers started", Integer.valueOf(i));
        this.brokerConnectionString = (String) this.brokers.values().stream().map((v0) -> {
            return v0.getBootstrapServers();
        }).map(str -> {
            return str.split("://")[1];
        }).collect(Collectors.joining(","));
    }

    private GenericContainer<?> createZookeeperContainer(Network network) {
        return new GenericContainer(DockerImageName.parse("zookeeper:3.4.14")).withNetwork(network).withNetworkAliases(new String[]{ZOOKEEPER_HOSTNAME}).withEnv("ZOOKEEPER_CLIENT_PORT", String.valueOf(ZOOKEEPER_PORT));
    }

    /* JADX WARN: Multi-variable type inference failed */
    private KafkaContainer createKafkaContainer(int i, @Nullable GenericContainer<?> genericContainer) {
        String format = String.format("Kafka-%d", Integer.valueOf(i));
        KafkaContainer withEnv = KafkaUtil.createKafkaContainer("confluentinc/cp-kafka:7.2.2", LOG, format).withNetworkAliases(new String[]{format}).withEnv("KAFKA_BROKER_ID", String.valueOf(i)).withEnv("KAFKA_MESSAGE_MAX_BYTES", String.valueOf(52428800)).withEnv("KAFKA_REPLICA_FETCH_MAX_BYTES", String.valueOf(52428800)).withEnv("KAFKA_TRANSACTION_MAX_TIMEOUT_MS", Integer.toString(7200000)).withEnv("KAFKA_LOG_RETENTION_MS", "-1").withEnv("KAFKA_ZOOKEEPER_SESSION_TIMEOUT_MS", String.valueOf(this.zkTimeout)).withEnv("KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS", String.valueOf(this.zkTimeout));
        if (genericContainer != null) {
            withEnv.dependsOn(new Startable[]{genericContainer}).withNetwork(genericContainer.getNetwork()).withExternalZookeeper(String.format("%s:%d", ZOOKEEPER_HOSTNAME, Integer.valueOf(ZOOKEEPER_PORT)));
        } else {
            withEnv.withEmbeddedZookeeper();
        }
        return withEnv;
    }

    private void pause(int i) {
        if (this.pausedBroker.contains(Integer.valueOf(i))) {
            LOG.warn("Broker {} is already paused. Skipping pause operation", Integer.valueOf(i));
            return;
        }
        DockerClientFactory.instance().client().pauseContainerCmd(this.brokers.get(Integer.valueOf(i)).getContainerId()).exec();
        this.pausedBroker.add(Integer.valueOf(i));
        LOG.info("Broker {} is paused", Integer.valueOf(i));
    }

    private void unpause(int i) throws Exception {
        if (!this.pausedBroker.contains(Integer.valueOf(i))) {
            LOG.warn("Broker {} is already running. Skipping unpause operation", Integer.valueOf(i));
            return;
        }
        DockerClientFactory.instance().client().unpauseContainerCmd(this.brokers.get(Integer.valueOf(i)).getContainerId()).exec();
        AdminClient create = AdminClient.create(getStandardProperties());
        try {
            CommonTestUtils.waitUtil(() -> {
                try {
                    return Boolean.valueOf(((Collection) create.describeCluster().nodes().get()).stream().anyMatch(node -> {
                        return node.id() == i;
                    }));
                } catch (Exception e) {
                    return false;
                }
            }, Duration.ofSeconds(30L), String.format("The paused broker %d is not recovered within timeout", Integer.valueOf(i)));
            if (create != null) {
                create.close();
            }
            this.pausedBroker.remove(Integer.valueOf(i));
            LOG.info("Broker {} is resumed", Integer.valueOf(i));
        } catch (Throwable th) {
            if (create != null) {
                try {
                    create.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }
}
