/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.WallclockTimestampExtractor;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;

@Timeout(value=600L)
@Tag(value="integration")
public class KTableSourceTopicRestartIntegrationTest {
    private static final int NUM_BROKERS = 3;
    private static final String SOURCE_TOPIC = "source-topic";
    private static final Properties PRODUCER_CONFIG = new Properties();
    private static final Properties STREAMS_CONFIG = new Properties();
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(3);
    private final Time time;
    private final StreamsBuilder streamsBuilder;
    private final Map<String, String> readKeyValues;
    private String sourceTopic;
    private KafkaStreams streams;
    private Map<String, String> expectedInitialResultsMap;
    private Map<String, String> expectedResultsWithDataWrittenDuringRestoreMap;

    public KTableSourceTopicRestartIntegrationTest() {
        this.time = KTableSourceTopicRestartIntegrationTest.CLUSTER.time;
        this.streamsBuilder = new StreamsBuilder();
        this.readKeyValues = new ConcurrentHashMap<String, String>();
    }

    @BeforeAll
    public static void startCluster() throws IOException {
        CLUSTER.start();
        STREAMS_CONFIG.put("bootstrap.servers", CLUSTER.bootstrapServers());
        STREAMS_CONFIG.put("default.key.serde", Serdes.String().getClass().getName());
        STREAMS_CONFIG.put("default.value.serde", Serdes.String().getClass().getName());
        STREAMS_CONFIG.put("state.dir", TestUtils.tempDirectory().getPath());
        STREAMS_CONFIG.put("statestore.cache.max.bytes", (Object)0);
        STREAMS_CONFIG.put("commit.interval.ms", (Object)5L);
        STREAMS_CONFIG.put("default.timestamp.extractor", WallclockTimestampExtractor.class);
        STREAMS_CONFIG.put("session.timeout.ms", (Object)1000);
        STREAMS_CONFIG.put("heartbeat.interval.ms", (Object)300);
        PRODUCER_CONFIG.put("bootstrap.servers", CLUSTER.bootstrapServers());
        PRODUCER_CONFIG.put("acks", "all");
        PRODUCER_CONFIG.put("key.serializer", StringSerializer.class);
        PRODUCER_CONFIG.put("value.serializer", StringSerializer.class);
    }

    @AfterAll
    public static void closeCluster() {
        CLUSTER.stop();
    }

    @BeforeEach
    public void before(TestInfo testInfo) throws Exception {
        String safeTestName = IntegrationTestUtils.safeUniqueTestName(testInfo);
        this.sourceTopic = "source-topic-" + safeTestName;
        CLUSTER.createTopic(this.sourceTopic);
        STREAMS_CONFIG.put("application.id", safeTestName);
        KTable kTable = this.streamsBuilder.table(this.sourceTopic, Materialized.as((String)"store"));
        kTable.toStream().foreach(this.readKeyValues::put);
        this.expectedInitialResultsMap = this.createExpectedResultsMap("a", "b", "c");
        this.expectedResultsWithDataWrittenDuringRestoreMap = this.createExpectedResultsMap("a", "b", "c", "d", "f", "g", "h");
    }

    @AfterEach
    public void after() throws Exception {
        IntegrationTestUtils.purgeLocalStreamsState(STREAMS_CONFIG);
    }

    @Test
    public void shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosDisabled() throws Exception {
        try {
            this.streams = new KafkaStreams(this.streamsBuilder.build(), STREAMS_CONFIG);
            this.streams.start();
            this.produceKeyValues("a", "b", "c");
            this.assertNumberValuesRead(this.readKeyValues, this.expectedInitialResultsMap, "Table did not read all values");
            this.streams.close();
            this.streams = new KafkaStreams(this.streamsBuilder.build(), STREAMS_CONFIG);
            this.streams.setGlobalStateRestoreListener((StateRestoreListener)new UpdatingSourceTopicOnRestoreStartStateRestoreListener());
            this.streams.start();
            this.produceKeyValues("f", "g", "h");
            this.assertNumberValuesRead(this.readKeyValues, this.expectedResultsWithDataWrittenDuringRestoreMap, "Table did not get all values after restart");
        }
        finally {
            this.streams.close(Duration.ofSeconds(5L));
        }
    }

    @Test
    public void shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosAlphaEnabled() throws Exception {
        STREAMS_CONFIG.put("processing.guarantee", "exactly_once");
        this.shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosEnabled();
    }

    @Test
    public void shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosV2Enabled() throws Exception {
        STREAMS_CONFIG.put("processing.guarantee", "exactly_once_v2");
        this.shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosEnabled();
    }

    private void shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosEnabled() throws Exception {
        try {
            this.streams = new KafkaStreams(this.streamsBuilder.build(), STREAMS_CONFIG);
            this.streams.start();
            this.produceKeyValues("a", "b", "c");
            this.assertNumberValuesRead(this.readKeyValues, this.expectedInitialResultsMap, "Table did not read all values");
            this.streams.close();
            this.streams = new KafkaStreams(this.streamsBuilder.build(), STREAMS_CONFIG);
            this.streams.setGlobalStateRestoreListener((StateRestoreListener)new UpdatingSourceTopicOnRestoreStartStateRestoreListener());
            this.streams.start();
            this.produceKeyValues("f", "g", "h");
            this.assertNumberValuesRead(this.readKeyValues, this.expectedResultsWithDataWrittenDuringRestoreMap, "Table did not get all values after restart");
        }
        finally {
            this.streams.close(Duration.ofSeconds(5L));
        }
    }

    @Test
    public void shouldRestoreAndProgressWhenTopicNotWrittenToDuringRestoration() throws Exception {
        try {
            this.streams = new KafkaStreams(this.streamsBuilder.build(), STREAMS_CONFIG);
            this.streams.start();
            this.produceKeyValues("a", "b", "c");
            this.assertNumberValuesRead(this.readKeyValues, this.expectedInitialResultsMap, "Table did not read all values");
            this.streams.close();
            this.streams = new KafkaStreams(this.streamsBuilder.build(), STREAMS_CONFIG);
            this.streams.start();
            this.produceKeyValues("f", "g", "h");
            Map<String, String> expectedValues = this.createExpectedResultsMap("a", "b", "c", "f", "g", "h");
            this.assertNumberValuesRead(this.readKeyValues, expectedValues, "Table did not get all values after restart");
        }
        finally {
            this.streams.close(Duration.ofSeconds(5L));
        }
    }

    private void assertNumberValuesRead(Map<String, String> valueMap, Map<String, String> expectedMap, String errorMessage) throws InterruptedException {
        TestUtils.waitForCondition(() -> valueMap.equals(expectedMap), (long)30000L, (String)errorMessage);
    }

    private void produceKeyValues(String ... keys) {
        ArrayList keyValueList = new ArrayList();
        for (String key : keys) {
            keyValueList.add(new KeyValue((Object)key, (Object)(key + "1")));
        }
        IntegrationTestUtils.produceKeyValuesSynchronously(this.sourceTopic, keyValueList, PRODUCER_CONFIG, this.time);
    }

    private Map<String, String> createExpectedResultsMap(String ... keys) {
        HashMap<String, String> expectedMap = new HashMap<String, String>();
        for (String key : keys) {
            expectedMap.put(key, key + "1");
        }
        return expectedMap;
    }

    private class UpdatingSourceTopicOnRestoreStartStateRestoreListener
    implements StateRestoreListener {
        private UpdatingSourceTopicOnRestoreStartStateRestoreListener() {
        }

        public void onRestoreStart(TopicPartition topicPartition, String storeName, long startingOffset, long endingOffset) {
            KTableSourceTopicRestartIntegrationTest.this.produceKeyValues(new String[]{"d"});
        }

        public void onBatchRestored(TopicPartition topicPartition, String storeName, long batchEndOffset, long numRestored) {
        }

        public void onRestoreEnd(TopicPartition topicPartition, String storeName, long totalRestored) {
        }
    }
}

