package org.apache.kafka.streams.integration;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.server.util.MockTime;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

@Tag("integration")
@Timeout(600)
/* loaded from: input_file:org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.class */
public class FineGrainedAutoResetIntegrationTest {
    private static final String DEFAULT_OUTPUT_TOPIC = "outputTopic";
    private static final String OUTPUT_TOPIC_0 = "outputTopic_0";
    private static final String OUTPUT_TOPIC_1 = "outputTopic_1";
    private static final String OUTPUT_TOPIC_2 = "outputTopic_2";
    private static final String TOPIC_1_0 = "topic-1_0";
    private static final String TOPIC_2_0 = "topic-2_0";
    private static final String TOPIC_A_0 = "topic-A_0";
    private static final String TOPIC_C_0 = "topic-C_0";
    private static final String TOPIC_Y_0 = "topic-Y_0";
    private static final String TOPIC_Z_0 = "topic-Z_0";
    private static final String TOPIC_1_1 = "topic-1_1";
    private static final String TOPIC_2_1 = "topic-2_1";
    private static final String TOPIC_A_1 = "topic-A_1";
    private static final String TOPIC_C_1 = "topic-C_1";
    private static final String TOPIC_Y_1 = "topic-Y_1";
    private static final String TOPIC_Z_1 = "topic-Z_1";
    private static final String TOPIC_1_2 = "topic-1_2";
    private static final String TOPIC_2_2 = "topic-2_2";
    private static final String TOPIC_A_2 = "topic-A_2";
    private static final String TOPIC_C_2 = "topic-C_2";
    private static final String TOPIC_Y_2 = "topic-Y_2";
    private static final String TOPIC_Z_2 = "topic-Z_2";
    private static final String NOOP = "noop";
    private Properties streamsConfiguration;
    private static final int NUM_BROKERS = 1;
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
    private static final String STRING_SERDE_CLASSNAME = Serdes.String().getClass().getName();
    private final MockTime mockTime = CLUSTER.time;
    private final Serde<String> stringSerde = Serdes.String();
    private final String topic1TestMessage = "topic-1 test";
    private final String topic2TestMessage = "topic-2 test";
    private final String topicATestMessage = "topic-A test";
    private final String topicCTestMessage = "topic-C test";
    private final String topicYTestMessage = "topic-Y test";
    private final String topicZTestMessage = "topic-Z test";

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest$TestingUncaughtExceptionHandler.class */
    public static final class TestingUncaughtExceptionHandler implements StreamsUncaughtExceptionHandler {
        boolean correctExceptionThrown;

        private TestingUncaughtExceptionHandler() {
            this.correctExceptionThrown = false;
        }

        public StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse handle(Throwable th) {
            MatcherAssert.assertThat(th.getClass().getSimpleName(), CoreMatchers.is("StreamsException"));
            MatcherAssert.assertThat(th.getCause().getClass().getSimpleName(), CoreMatchers.is("NoOffsetForPartitionException"));
            this.correctExceptionThrown = true;
            return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
        }
    }

    @BeforeAll
    public static void startCluster() throws IOException, InterruptedException {
        CLUSTER.start();
        CLUSTER.createTopics(TOPIC_1_0, TOPIC_2_0, TOPIC_A_0, TOPIC_C_0, TOPIC_Y_0, TOPIC_Z_0, TOPIC_1_1, TOPIC_2_1, TOPIC_A_1, TOPIC_C_1, TOPIC_Y_1, TOPIC_Z_1, TOPIC_1_2, TOPIC_2_2, TOPIC_A_2, TOPIC_C_2, TOPIC_Y_2, TOPIC_Z_2, NOOP, DEFAULT_OUTPUT_TOPIC, OUTPUT_TOPIC_0, OUTPUT_TOPIC_1, OUTPUT_TOPIC_2);
    }

    @AfterAll
    public static void closeCluster() {
        CLUSTER.stop();
    }

    @BeforeEach
    public void setUp() throws IOException {
        Properties properties = new Properties();
        properties.put("statestore.cache.max.bytes", 0);
        properties.put("commit.interval.ms", 100L);
        properties.put("metadata.max.age.ms", "1000");
        properties.put("auto.offset.reset", "earliest");
        this.streamsConfiguration = StreamsTestUtils.getStreamsConfig("testAutoOffsetId", CLUSTER.bootstrapServers(), STRING_SERDE_CLASSNAME, STRING_SERDE_CLASSNAME, properties);
        IntegrationTestUtils.purgeLocalStreamsState(this.streamsConfiguration);
    }

    @Test
    public void shouldOnlyReadRecordsWhereEarliestSpecifiedWithNoCommittedOffsetsWithGlobalAutoOffsetResetLatest() throws Exception {
        this.streamsConfiguration.put(StreamsConfig.consumerPrefix("auto.offset.reset"), "latest");
        shouldOnlyReadForEarliest("_0", TOPIC_1_0, TOPIC_2_0, TOPIC_A_0, TOPIC_C_0, TOPIC_Y_0, TOPIC_Z_0, OUTPUT_TOPIC_0, Arrays.asList("topic-1 test", "topic-2 test"));
    }

    @Test
    public void shouldOnlyReadRecordsWhereEarliestSpecifiedWithNoCommittedOffsetsWithDefaultGlobalAutoOffsetResetEarliest() throws Exception {
        shouldOnlyReadForEarliest("_1", TOPIC_1_1, TOPIC_2_1, TOPIC_A_1, TOPIC_C_1, TOPIC_Y_1, TOPIC_Z_1, OUTPUT_TOPIC_1, Arrays.asList("topic-1 test", "topic-2 test", "topic-Y test", "topic-Z test"));
    }

    @Test
    public void shouldOnlyReadRecordsWhereEarliestSpecifiedWithInvalidCommittedOffsets() throws Exception {
        commitInvalidOffsets();
        shouldOnlyReadForEarliest("_2", TOPIC_1_2, TOPIC_2_2, TOPIC_A_2, TOPIC_C_2, TOPIC_Y_2, TOPIC_Z_2, OUTPUT_TOPIC_2, Arrays.asList("topic-1 test", "topic-2 test", "topic-Y test", "topic-Z test"));
    }

    private void shouldOnlyReadForEarliest(String str, String str2, String str3, String str4, String str5, String str6, String str7, String str8, List<String> list) throws Exception {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KStream stream = streamsBuilder.stream(Pattern.compile("topic-\\d" + str), Consumed.with(Topology.AutoOffsetReset.EARLIEST));
        KStream stream2 = streamsBuilder.stream(Pattern.compile("topic-[A-D]" + str), Consumed.with(Topology.AutoOffsetReset.LATEST));
        KStream stream3 = streamsBuilder.stream(Arrays.asList(str6, str7));
        stream.to(str8, Produced.with(this.stringSerde, this.stringSerde));
        stream2.to(str8, Produced.with(this.stringSerde, this.stringSerde));
        stream3.to(str8, Produced.with(this.stringSerde, this.stringSerde));
        Properties producerConfig = TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class);
        IntegrationTestUtils.produceValuesSynchronously(str2, Collections.singletonList("topic-1 test"), producerConfig, this.mockTime);
        IntegrationTestUtils.produceValuesSynchronously(str3, Collections.singletonList("topic-2 test"), producerConfig, this.mockTime);
        IntegrationTestUtils.produceValuesSynchronously(str4, Collections.singletonList("topic-A test"), producerConfig, this.mockTime);
        IntegrationTestUtils.produceValuesSynchronously(str5, Collections.singletonList("topic-C test"), producerConfig, this.mockTime);
        IntegrationTestUtils.produceValuesSynchronously(str6, Collections.singletonList("topic-Y test"), producerConfig, this.mockTime);
        IntegrationTestUtils.produceValuesSynchronously(str7, Collections.singletonList("topic-Z test"), producerConfig, this.mockTime);
        Properties consumerConfig = TestUtils.consumerConfig(CLUSTER.bootstrapServers(), StringDeserializer.class, StringDeserializer.class);
        KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), this.streamsConfiguration);
        kafkaStreams.start();
        List waitUntilMinKeyValueRecordsReceived = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, str8, list.size());
        ArrayList arrayList = new ArrayList(list.size());
        Iterator it = waitUntilMinKeyValueRecordsReceived.iterator();
        while (it.hasNext()) {
            arrayList.add(((KeyValue) it.next()).value);
        }
        kafkaStreams.close();
        Collections.sort(arrayList);
        Collections.sort(list);
        MatcherAssert.assertThat(arrayList, CoreMatchers.equalTo(list));
    }

    private void commitInvalidOffsets() {
        KafkaConsumer kafkaConsumer = new KafkaConsumer(TestUtils.consumerConfig(CLUSTER.bootstrapServers(), "commit_invalid_offset_app", StringDeserializer.class, StringDeserializer.class));
        HashMap hashMap = new HashMap();
        hashMap.put(new TopicPartition(TOPIC_1_2, 0), new OffsetAndMetadata(5L, (String) null));
        hashMap.put(new TopicPartition(TOPIC_2_2, 0), new OffsetAndMetadata(5L, (String) null));
        hashMap.put(new TopicPartition(TOPIC_A_2, 0), new OffsetAndMetadata(5L, (String) null));
        hashMap.put(new TopicPartition(TOPIC_C_2, 0), new OffsetAndMetadata(5L, (String) null));
        hashMap.put(new TopicPartition(TOPIC_Y_2, 0), new OffsetAndMetadata(5L, (String) null));
        hashMap.put(new TopicPartition(TOPIC_Z_2, 0), new OffsetAndMetadata(5L, (String) null));
        kafkaConsumer.commitSync(hashMap);
        kafkaConsumer.close();
    }

    @Test
    public void shouldThrowExceptionOverlappingPattern() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream(Pattern.compile("topic-[A-D]_1"), Consumed.with(Topology.AutoOffsetReset.EARLIEST));
        try {
            streamsBuilder.stream(Pattern.compile("topic-[A-D]_1"), Consumed.with(Topology.AutoOffsetReset.LATEST));
            streamsBuilder.build();
            Assertions.fail("Should have thrown TopologyException");
        } catch (TopologyException e) {
        }
    }

    @Test
    public void shouldThrowExceptionOverlappingTopic() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream(Pattern.compile("topic-[A-D]_1"), Consumed.with(Topology.AutoOffsetReset.EARLIEST));
        try {
            streamsBuilder.stream(Arrays.asList(TOPIC_A_1, TOPIC_Z_1), Consumed.with(Topology.AutoOffsetReset.LATEST));
            streamsBuilder.build();
            Assertions.fail("Should have thrown TopologyException");
        } catch (TopologyException e) {
        }
    }

    @Test
    public void shouldThrowStreamsExceptionNoResetSpecified() throws InterruptedException {
        Properties properties = new Properties();
        properties.put("statestore.cache.max.bytes", 0);
        properties.put("commit.interval.ms", 100L);
        properties.put("metadata.max.age.ms", "1000");
        properties.put("auto.offset.reset", "none");
        Properties streamsConfig = StreamsTestUtils.getStreamsConfig("testAutoOffsetWithNone", CLUSTER.bootstrapServers(), STRING_SERDE_CLASSNAME, STRING_SERDE_CLASSNAME, properties);
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream(NOOP).to(DEFAULT_OUTPUT_TOPIC, Produced.with(this.stringSerde, this.stringSerde));
        KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), streamsConfig);
        TestingUncaughtExceptionHandler testingUncaughtExceptionHandler = new TestingUncaughtExceptionHandler();
        kafkaStreams.setUncaughtExceptionHandler(testingUncaughtExceptionHandler);
        kafkaStreams.start();
        TestUtils.waitForCondition(() -> {
            return testingUncaughtExceptionHandler.correctExceptionThrown;
        }, "The expected NoOffsetForPartitionException was never thrown");
        kafkaStreams.close();
    }
}
