package org.apache.kafka.streams.integration;

import java.io.IOException;
import java.util.Arrays;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;

@Category({IntegrationTest.class})
/* loaded from: input_file:org/apache/kafka/streams/integration/EmitOnChangeIntegrationTest.class */
public class EmitOnChangeIntegrationTest {

    @Rule
    public TestName testName = new TestName();
    private static String inputTopic;
    private static String outputTopic;
    private static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
    private static String appId = "";

    @BeforeClass
    public static void startCluster() throws IOException {
        CLUSTER.start();
    }

    @AfterClass
    public static void closeCluster() {
        CLUSTER.stop();
    }

    @Before
    public void setup() {
        String safeUniqueTestName = IntegrationTestUtils.safeUniqueTestName(getClass(), this.testName);
        appId = "appId_" + safeUniqueTestName;
        inputTopic = "input" + safeUniqueTestName;
        outputTopic = "output" + safeUniqueTestName;
        IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, inputTopic, outputTopic);
    }

    @Test
    public void shouldEmitSameRecordAfterFailover() throws Exception {
        Properties mkObjectProperties = Utils.mkObjectProperties(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("bootstrap.servers", CLUSTER.bootstrapServers()), Utils.mkEntry("application.id", appId), Utils.mkEntry("state.dir", TestUtils.tempDirectory().getPath()), Utils.mkEntry("num.stream.threads", 1), Utils.mkEntry("cache.max.bytes.buffering", 0), Utils.mkEntry("commit.interval.ms", 300000L), Utils.mkEntry("default.key.serde", Serdes.IntegerSerde.class), Utils.mkEntry("default.value.serde", Serdes.StringSerde.class), Utils.mkEntry("session.timeout.ms", 10000)}));
        AtomicBoolean atomicBoolean = new AtomicBoolean(true);
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.table(inputTopic, Materialized.as("test-store")).toStream().map((obj, obj2) -> {
            if (atomicBoolean.compareAndSet(true, false)) {
                throw new RuntimeException("Kaboom");
            }
            return new KeyValue(obj, obj2);
        }).to(outputTopic);
        KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), mkObjectProperties);
        Throwable th = null;
        try {
            kafkaStreams.setUncaughtExceptionHandler(th2 -> {
                return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
            });
            StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
            IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(inputTopic, Arrays.asList(new KeyValue(1, "A"), new KeyValue(1, "B")), TestUtils.producerConfig(CLUSTER.bootstrapServers(), IntegerSerializer.class, StringSerializer.class, new Properties()), 0L);
            IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(TestUtils.consumerConfig(CLUSTER.bootstrapServers(), IntegerDeserializer.class, StringDeserializer.class), outputTopic, Arrays.asList(new KeyValue(1, "A"), new KeyValue(1, "B")));
            if (kafkaStreams != null) {
                if (0 == 0) {
                    kafkaStreams.close();
                    return;
                }
                try {
                    kafkaStreams.close();
                } catch (Throwable th3) {
                    th.addSuppressed(th3);
                }
            }
        } catch (Throwable th4) {
            if (kafkaStreams != null) {
                if (0 != 0) {
                    try {
                        kafkaStreams.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    kafkaStreams.close();
                }
            }
            throw th4;
        }
    }
}
