package org.apache.kafka.streams.integration;

import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import kafka.admin.AdminClient;
import kafka.tools.StreamsResetter;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.immutable.List;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/kafka/streams/integration/AbstractResetIntegrationTest.class */
public abstract class AbstractResetIntegrationTest {
    static final int NUM_BROKERS = 1;
    private static final String APP_ID = "cleanup-integration-test";
    private static final String INPUT_TOPIC = "inputTopic";
    private static final String OUTPUT_TOPIC = "outputTopic";
    private static final String OUTPUT_TOPIC_2 = "outputTopic2";
    private static final String OUTPUT_TOPIC_2_RERUN = "outputTopic2_rerun";
    private static final String INTERMEDIATE_USER_TOPIC = "userTopic";
    private static final long STREAMS_CONSUMER_TIMEOUT = 2000;
    private static final long CLEANUP_CONSUMER_TIMEOUT = 2000;
    private static final int TIMEOUT_MULTIPLIER = 5;
    static EmbeddedKafkaCluster cluster;
    static String bootstrapServers;
    static MockTime mockTime;
    private final WaitUntilConsumerGroupGotClosed consumerGroupInactive = new WaitUntilConsumerGroupGotClosed();
    private static final Logger log = LoggerFactory.getLogger(AbstractResetIntegrationTest.class);
    private static AdminClient adminClient = null;
    private static KafkaAdminClient kafkaAdminClient = null;
    private static int testNo = 0;

    /* loaded from: input_file:org/apache/kafka/streams/integration/AbstractResetIntegrationTest$WaitUntilConsumerGroupGotClosed.class */
    private class WaitUntilConsumerGroupGotClosed implements TestCondition {
        private WaitUntilConsumerGroupGotClosed() {
        }

        public boolean conditionMet() {
            return ((List) AbstractResetIntegrationTest.adminClient.describeConsumerGroup(AbstractResetIntegrationTest.APP_ID, 0L).consumers().get()).isEmpty();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static void afterClassGlobalCleanup() {
        if (adminClient != null) {
            adminClient.close();
            adminClient = null;
        }
        if (kafkaAdminClient != null) {
            kafkaAdminClient.close(10L, TimeUnit.SECONDS);
            kafkaAdminClient = null;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void beforePrepareTest() throws Exception {
        testNo += NUM_BROKERS;
        mockTime = cluster.time;
        bootstrapServers = cluster.bootstrapServers();
        mockTime.setCurrentTimeMs(((System.currentTimeMillis() / 1000) + 1) * 1000);
        Properties clientSslConfig = getClientSslConfig();
        if (clientSslConfig == null) {
            clientSslConfig = new Properties();
            clientSslConfig.put("bootstrap.servers", bootstrapServers);
        }
        if (adminClient == null) {
            adminClient = AdminClient.create(clientSslConfig);
        }
        if (kafkaAdminClient == null) {
            kafkaAdminClient = org.apache.kafka.clients.admin.AdminClient.create(clientSslConfig);
        }
        while (true) {
            Thread.sleep(50L);
            try {
                TestUtils.waitForCondition(this.consumerGroupInactive, 10000L, "Test consumer group active even after waiting 10000 ms.");
                prepareInputData();
                return;
            } catch (TimeoutException e) {
            }
        }
    }

    Properties getClientSslConfig() {
        return null;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void testReprocessingFromScratchAfterResetWithoutIntermediateUserTopic() throws Exception {
        Properties clientSslConfig = getClientSslConfig();
        Properties prepareTest = prepareTest();
        Properties properties = new Properties();
        if (clientSslConfig != null) {
            properties.putAll(clientSslConfig);
        }
        properties.putAll(TestUtils.consumerConfig(bootstrapServers, "cleanup-integration-test-standard-consumer-outputTopic", LongDeserializer.class, LongDeserializer.class));
        KafkaStreams kafkaStreams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), prepareTest);
        kafkaStreams.start();
        java.util.List waitUntilMinKeyValueRecordsReceived = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(properties, OUTPUT_TOPIC, 10);
        kafkaStreams.close();
        TestUtils.waitForCondition(this.consumerGroupInactive, 10000L, "Streams Application consumer group did not time out after 10000 ms.");
        KafkaStreams kafkaStreams2 = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), prepareTest);
        kafkaStreams2.cleanUp();
        cleanGlobal(null, clientSslConfig);
        TestUtils.waitForCondition(this.consumerGroupInactive, 10000L, "Reset Tool consumer group did not time out after 10000 ms.");
        assertInternalTopicsGotDeleted(null);
        kafkaStreams2.start();
        java.util.List waitUntilMinKeyValueRecordsReceived2 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(properties, OUTPUT_TOPIC, 10);
        kafkaStreams2.close();
        MatcherAssert.assertThat(waitUntilMinKeyValueRecordsReceived2, CoreMatchers.equalTo(waitUntilMinKeyValueRecordsReceived));
        TestUtils.waitForCondition(this.consumerGroupInactive, 10000L, "Reset Tool consumer group did not time out after 10000 ms.");
        cleanGlobal(null, clientSslConfig);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void testReprocessingFromScratchAfterResetWithIntermediateUserTopic() throws Exception {
        cluster.createTopic(INTERMEDIATE_USER_TOPIC);
        Properties clientSslConfig = getClientSslConfig();
        Properties prepareTest = prepareTest();
        Properties properties = new Properties();
        if (clientSslConfig != null) {
            properties.putAll(clientSslConfig);
        }
        properties.putAll(TestUtils.consumerConfig(bootstrapServers, "cleanup-integration-test-standard-consumer-outputTopic", LongDeserializer.class, LongDeserializer.class));
        KafkaStreams kafkaStreams = new KafkaStreams(setupTopologyWithIntermediateUserTopic(OUTPUT_TOPIC_2), prepareTest);
        kafkaStreams.start();
        java.util.List waitUntilMinKeyValueRecordsReceived = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(properties, OUTPUT_TOPIC, 10);
        java.util.List waitUntilMinKeyValueRecordsReceived2 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(properties, OUTPUT_TOPIC_2, 40);
        kafkaStreams.close();
        TestUtils.waitForCondition(this.consumerGroupInactive, 10000L, "Streams Application consumer group did not time out after 10000 ms.");
        mockTime.sleep(1L);
        Properties properties2 = clientSslConfig;
        if (properties2 == null) {
            properties2 = new Properties();
        }
        properties2.putAll(TestUtils.producerConfig(bootstrapServers, LongSerializer.class, StringSerializer.class));
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INTERMEDIATE_USER_TOPIC, Collections.singleton(new KeyValue(-1L, "badRecord-ShouldBeSkipped")), properties2, Long.valueOf(mockTime.milliseconds()));
        KafkaStreams kafkaStreams2 = new KafkaStreams(setupTopologyWithIntermediateUserTopic(OUTPUT_TOPIC_2_RERUN), prepareTest);
        kafkaStreams2.cleanUp();
        cleanGlobal(INTERMEDIATE_USER_TOPIC, clientSslConfig);
        TestUtils.waitForCondition(this.consumerGroupInactive, 10000L, "Reset Tool consumer group did not time out after 10000 ms.");
        assertInternalTopicsGotDeleted(INTERMEDIATE_USER_TOPIC);
        kafkaStreams2.start();
        java.util.List waitUntilMinKeyValueRecordsReceived3 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(properties, OUTPUT_TOPIC, 10);
        java.util.List waitUntilMinKeyValueRecordsReceived4 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(properties, OUTPUT_TOPIC_2_RERUN, 40);
        kafkaStreams2.close();
        MatcherAssert.assertThat(waitUntilMinKeyValueRecordsReceived3, CoreMatchers.equalTo(waitUntilMinKeyValueRecordsReceived));
        MatcherAssert.assertThat(waitUntilMinKeyValueRecordsReceived4, CoreMatchers.equalTo(waitUntilMinKeyValueRecordsReceived2));
        TestUtils.waitForCondition(this.consumerGroupInactive, 10000L, "Reset Tool consumer group did not time out after 10000 ms.");
        cleanGlobal(INTERMEDIATE_USER_TOPIC, clientSslConfig);
        cluster.deleteTopicAndWait(INTERMEDIATE_USER_TOPIC);
    }

    private Properties prepareTest() throws IOException {
        Properties clientSslConfig = getClientSslConfig();
        if (clientSslConfig == null) {
            clientSslConfig = new Properties();
        }
        clientSslConfig.put("application.id", APP_ID + testNo);
        clientSslConfig.put("bootstrap.servers", bootstrapServers);
        clientSslConfig.put("state.dir", TestUtils.tempDirectory().getPath());
        clientSslConfig.put("default.key.serde", Serdes.Long().getClass());
        clientSslConfig.put("default.value.serde", Serdes.String().getClass());
        clientSslConfig.put("cache.max.bytes.buffering", 0);
        clientSslConfig.put("commit.interval.ms", 100);
        clientSslConfig.put("heartbeat.interval.ms", 100);
        clientSslConfig.put("session.timeout.ms", "2000");
        clientSslConfig.put("auto.offset.reset", "earliest");
        clientSslConfig.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
        IntegrationTestUtils.purgeLocalStreamsState(clientSslConfig);
        return clientSslConfig;
    }

    private void prepareInputData() throws Exception {
        cluster.deleteAndRecreateTopics(INPUT_TOPIC, OUTPUT_TOPIC, OUTPUT_TOPIC_2, OUTPUT_TOPIC_2_RERUN);
        Properties clientSslConfig = getClientSslConfig();
        if (clientSslConfig == null) {
            clientSslConfig = new Properties();
        }
        clientSslConfig.putAll(TestUtils.producerConfig(bootstrapServers, LongSerializer.class, StringSerializer.class));
        mockTime.sleep(10L);
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue(0L, "aaa")), clientSslConfig, Long.valueOf(mockTime.milliseconds()));
        mockTime.sleep(10L);
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue(1L, "bbb")), clientSslConfig, Long.valueOf(mockTime.milliseconds()));
        mockTime.sleep(10L);
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue(0L, "ccc")), clientSslConfig, Long.valueOf(mockTime.milliseconds()));
        mockTime.sleep(10L);
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue(1L, "ddd")), clientSslConfig, Long.valueOf(mockTime.milliseconds()));
        mockTime.sleep(10L);
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue(0L, "eee")), clientSslConfig, Long.valueOf(mockTime.milliseconds()));
        mockTime.sleep(10L);
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue(1L, "fff")), clientSslConfig, Long.valueOf(mockTime.milliseconds()));
        mockTime.sleep(1L);
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue(0L, "ggg")), clientSslConfig, Long.valueOf(mockTime.milliseconds()));
        mockTime.sleep(1L);
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue(1L, "hhh")), clientSslConfig, Long.valueOf(mockTime.milliseconds()));
        mockTime.sleep(1L);
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue(0L, "iii")), clientSslConfig, Long.valueOf(mockTime.milliseconds()));
        mockTime.sleep(1L);
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue(1L, "jjj")), clientSslConfig, Long.valueOf(mockTime.milliseconds()));
    }

    private Topology setupTopologyWithIntermediateUserTopic(String str) {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KStream stream = streamsBuilder.stream(INPUT_TOPIC);
        stream.map(new KeyValueMapper<Long, String, KeyValue<Long, String>>() { // from class: org.apache.kafka.streams.integration.AbstractResetIntegrationTest.1
            public KeyValue<Long, String> apply(Long l, String str2) {
                return new KeyValue<>(l, str2);
            }
        }).groupByKey().count().toStream().to(OUTPUT_TOPIC, Produced.with(Serdes.Long(), Serdes.Long()));
        stream.through(INTERMEDIATE_USER_TOPIC).groupByKey().windowedBy(TimeWindows.of(35L).advanceBy(10L)).count().toStream().map(new KeyValueMapper<Windowed<Long>, Long, KeyValue<Long, Long>>() { // from class: org.apache.kafka.streams.integration.AbstractResetIntegrationTest.2
            public KeyValue<Long, Long> apply(Windowed<Long> windowed, Long l) {
                return new KeyValue<>(Long.valueOf(windowed.window().start() + windowed.window().end()), l);
            }
        }).to(str, Produced.with(Serdes.Long(), Serdes.Long()));
        return streamsBuilder.build();
    }

    private Topology setupTopologyWithoutIntermediateUserTopic() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream(INPUT_TOPIC).map(new KeyValueMapper<Long, String, KeyValue<Long, Long>>() { // from class: org.apache.kafka.streams.integration.AbstractResetIntegrationTest.3
            public KeyValue<Long, Long> apply(Long l, String str) {
                return new KeyValue<>(l, l);
            }
        }).to(OUTPUT_TOPIC, Produced.with(Serdes.Long(), Serdes.Long()));
        return streamsBuilder.build();
    }

    private void cleanGlobal(String str, Properties properties) throws Exception {
        String[] strArr;
        if (str != null) {
            strArr = new String[]{"--application-id", APP_ID + testNo, "--bootstrap-servers", bootstrapServers, "--input-topics", INPUT_TOPIC, "--intermediate-topics", INTERMEDIATE_USER_TOPIC, "--zookeeper", "localhost:2181"};
        } else if (properties != null) {
            File tempFile = TestUtils.tempFile();
            BufferedWriter bufferedWriter = new BufferedWriter(new FileWriter(tempFile));
            bufferedWriter.write("security.protocol=SSL\n");
            bufferedWriter.write("ssl.truststore.location=" + properties.get("ssl.truststore.location") + "\n");
            bufferedWriter.write("ssl.truststore.password=" + properties.get("ssl.truststore.password") + "\n");
            bufferedWriter.close();
            strArr = new String[]{"--application-id", APP_ID + testNo, "--bootstrap-servers", bootstrapServers, "--input-topics", INPUT_TOPIC, "--config-file", tempFile.getAbsolutePath()};
        } else {
            strArr = new String[]{"--application-id", APP_ID + testNo, "--bootstrap-servers", bootstrapServers, "--input-topics", INPUT_TOPIC};
        }
        Properties properties2 = new Properties();
        properties2.put("heartbeat.interval.ms", 100);
        properties2.put("session.timeout.ms", "2000");
        log.info("Calling StreamsResetter with parameters {} and configs {}", strArr, properties2);
        Assert.assertEquals(0L, new StreamsResetter().run(strArr, properties2));
    }

    private void assertInternalTopicsGotDeleted(String str) throws Exception {
        if (str != null) {
            cluster.waitForRemainingTopics(IntegrationTestUtils.DEFAULT_TIMEOUT, INPUT_TOPIC, OUTPUT_TOPIC, OUTPUT_TOPIC_2, OUTPUT_TOPIC_2_RERUN, "__consumer_offsets", str);
        } else {
            cluster.waitForRemainingTopics(IntegrationTestUtils.DEFAULT_TIMEOUT, INPUT_TOPIC, OUTPUT_TOPIC, OUTPUT_TOPIC_2, OUTPUT_TOPIC_2_RERUN, "__consumer_offsets");
        }
    }
}
