package org.apache.kafka.streams.integration;

import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
@Category({IntegrationTest.class})
/* loaded from: input_file:org/apache/kafka/streams/integration/EOSUncleanShutdownIntegrationTest.class */
public class EOSUncleanShutdownIntegrationTest {

    @Rule
    public Timeout globalTimeout = Timeout.seconds(600);

    @Parameterized.Parameter
    public String eosConfig;
    private static final int RECORD_TOTAL = 3;
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(RECORD_TOTAL);

    @ClassRule
    public static final TemporaryFolder TEST_FOLDER = new TemporaryFolder(TestUtils.tempDirectory());
    private static final Properties STREAMS_CONFIG = new Properties();
    private static final StringSerializer STRING_SERIALIZER = new StringSerializer();
    private static final Long COMMIT_INTERVAL = 100L;

    /* JADX WARN: Multi-variable type inference failed */
    @Parameterized.Parameters(name = "{0}")
    public static Collection<String[]> data() {
        return Arrays.asList(new String[]{"exactly_once"}, new String[]{"exactly_once_v2"});
    }

    @BeforeClass
    public static void startCluster() throws IOException {
        CLUSTER.start();
        STREAMS_CONFIG.put("auto.offset.reset", "earliest");
        STREAMS_CONFIG.put("bootstrap.servers", CLUSTER.bootstrapServers());
        STREAMS_CONFIG.put("default.key.serde", Serdes.String().getClass());
        STREAMS_CONFIG.put("default.value.serde", Serdes.String().getClass());
        STREAMS_CONFIG.put("commit.interval.ms", COMMIT_INTERVAL);
        STREAMS_CONFIG.put("state.dir", TEST_FOLDER.getRoot().getPath());
    }

    @AfterClass
    public static void closeCluster() {
        CLUSTER.stop();
    }

    @Test
    public void shouldWorkWithUncleanShutdownWipeOutStateStore() throws InterruptedException {
        STREAMS_CONFIG.put("application.id", "shouldWorkWithUncleanShutdownWipeOutStateStore");
        STREAMS_CONFIG.put("processing.guarantee", this.eosConfig);
        IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, "input-topic");
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KStream stream = streamsBuilder.stream("input-topic");
        AtomicInteger atomicInteger = new AtomicInteger(0);
        stream.groupByKey().aggregate(() -> {
            return "()";
        }, (str, str2, str3) -> {
            return str3 + ",(" + str + ": " + str2 + ")";
        }, Materialized.as("aggregated_value")).toStream().peek((str4, str5) -> {
            if (atomicInteger.incrementAndGet() >= RECORD_TOTAL) {
                throw new IllegalStateException("Crash on the 3 record");
            }
        });
        Properties mkProperties = Utils.mkProperties(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("client.id", "anything"), Utils.mkEntry("key.serializer", STRING_SERIALIZER.getClass().getName()), Utils.mkEntry("value.serializer", STRING_SERIALIZER.getClass().getName()), Utils.mkEntry("bootstrap.servers", CLUSTER.bootstrapServers())}));
        KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), STREAMS_CONFIG);
        kafkaStreams.cleanUp();
        kafkaStreams.start();
        File file = new File(String.join("/", TEST_FOLDER.getRoot().getPath(), "shouldWorkWithUncleanShutdownWipeOutStateStore", "0_0"));
        File file2 = new File(file, ".checkpoint");
        try {
            IntegrationTestUtils.produceSynchronously(mkProperties, false, "input-topic", Optional.empty(), Collections.singletonList(new KeyValueTimestamp("k1", "v1", 0L)));
            TestUtils.waitForCondition(() -> {
                return file.exists() && file.isDirectory() && file.list().length > 0;
            }, "Failed awaiting CreateTopics first request failure");
            IntegrationTestUtils.produceSynchronously(mkProperties, false, "input-topic", Optional.empty(), Arrays.asList(new KeyValueTimestamp("k2", "v2", 1L), new KeyValueTimestamp("k3", "v3", 2L)));
            TestUtils.waitForCondition(() -> {
                return atomicInteger.get() == RECORD_TOTAL;
            }, "Expected 3 records processed but only got " + atomicInteger.get());
            TestUtils.waitForCondition(() -> {
                return kafkaStreams.state().equals(KafkaStreams.State.ERROR);
            }, "Expected ERROR state but driver is on " + kafkaStreams.state());
            kafkaStreams.close();
            Assert.assertTrue(!file.exists() || (file.exists() && file.list().length > 0 && !file2.exists()) || (file2.exists() && file2.length() == 0));
            IntegrationTestUtils.quietlyCleanStateAfterTest(CLUSTER, kafkaStreams);
        } catch (Throwable th) {
            TestUtils.waitForCondition(() -> {
                return kafkaStreams.state().equals(KafkaStreams.State.ERROR);
            }, "Expected ERROR state but driver is on " + kafkaStreams.state());
            kafkaStreams.close();
            Assert.assertTrue(!file.exists() || (file.exists() && file.list().length > 0 && !file2.exists()) || (file2.exists() && file2.length() == 0));
            IntegrationTestUtils.quietlyCleanStateAfterTest(CLUSTER, kafkaStreams);
            throw th;
        }
    }
}
