/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StoreQueryParameters;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.processor.api.ContextualProcessor;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.QueryableStoreType;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;

@Timeout(value=600L)
@Tag(value="integration")
public class GlobalStateReprocessTest {
    private static final int NUM_BROKERS = 1;
    private static final Properties BROKER_CONFIG = new Properties();
    public static final EmbeddedKafkaCluster CLUSTER;
    private final MockTime mockTime;
    private final String globalStore = "globalStore";
    private StreamsBuilder builder;
    private Properties streamsConfiguration;
    private KafkaStreams kafkaStreams;
    private String globalStoreTopic;

    public GlobalStateReprocessTest() {
        this.mockTime = GlobalStateReprocessTest.CLUSTER.time;
        this.globalStore = "globalStore";
    }

    @BeforeAll
    public static void startCluster() throws IOException {
        CLUSTER.start();
    }

    @AfterAll
    public static void closeCluster() {
        CLUSTER.stop();
    }

    @BeforeEach
    public void before(TestInfo testInfo) throws Exception {
        this.builder = new StreamsBuilder();
        this.createTopics();
        this.streamsConfiguration = new Properties();
        String safeTestName = IntegrationTestUtils.safeUniqueTestName(testInfo);
        this.streamsConfiguration.put("application.id", "app-" + safeTestName);
        this.streamsConfiguration.put("bootstrap.servers", CLUSTER.bootstrapServers());
        this.streamsConfiguration.put("auto.offset.reset", "earliest");
        this.streamsConfiguration.put("state.dir", TestUtils.tempDirectory().getPath());
        this.streamsConfiguration.put("statestore.cache.max.bytes", (Object)0);
        this.streamsConfiguration.put("commit.interval.ms", (Object)100L);
        final KeyValueStoreBuilder storeBuilder = new KeyValueStoreBuilder(Stores.persistentKeyValueStore((String)"globalStore"), Serdes.String(), Serdes.Long(), (Time)this.mockTime);
        ProcessorSupplier processorSupplier = () -> new ContextualProcessor<String, Long, Void, Void>(){

            public void process(Record<String, Long> record) {
                KeyValueStore stateStore = (KeyValueStore)this.context().getStateStore(storeBuilder.name());
                stateStore.put((Object)((String)record.key() + "- this is the right value."), record.value());
            }
        };
        this.builder.addGlobalStore((StoreBuilder)storeBuilder, this.globalStoreTopic, Consumed.with((Serde)Serdes.String(), (Serde)Serdes.Long()), processorSupplier);
    }

    @AfterEach
    public void after() throws Exception {
        if (this.kafkaStreams != null) {
            this.kafkaStreams.close();
        }
        IntegrationTestUtils.purgeLocalStreamsState(this.streamsConfiguration);
    }

    @Test
    public void shouldReprocessWithUserProvidedStore() throws Exception {
        this.kafkaStreams = new KafkaStreams(this.builder.build(), this.streamsConfiguration);
        this.populateTopics(this.globalStoreTopic);
        this.kafkaStreams.start();
        TestUtils.waitForCondition(() -> !this.storeContents(this.kafkaStreams).isEmpty(), (long)30000L, (String)"Has not processed record within 30 seconds");
        MatcherAssert.assertThat((Object)this.storeContents(this.kafkaStreams).get(0), (Matcher)Matchers.containsString((String)"- this is the right value."));
        this.kafkaStreams.close();
        this.kafkaStreams.cleanUp();
        this.kafkaStreams = new KafkaStreams(this.builder.build(), this.streamsConfiguration);
        this.kafkaStreams.start();
        TestUtils.waitForCondition(() -> !this.storeContents(this.kafkaStreams).isEmpty(), (long)30000L, (String)"Has not processed record within 30 seconds");
        MatcherAssert.assertThat((Object)this.storeContents(this.kafkaStreams).get(0), (Matcher)Matchers.containsString((String)"- this is the right value."));
    }

    private void createTopics() throws Exception {
        this.globalStoreTopic = "global-store-topic";
        CLUSTER.createTopic(this.globalStoreTopic);
    }

    private void populateTopics(String topicName) throws Exception {
        IntegrationTestUtils.produceKeyValuesSynchronously(topicName, Collections.singletonList(new KeyValue((Object)"A", (Object)1L)), TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), StringSerializer.class, LongSerializer.class, (Properties)new Properties()), (Time)this.mockTime);
    }

    private List<String> storeContents(KafkaStreams streams) {
        ArrayList<String> keySet = new ArrayList<String>();
        ReadOnlyKeyValueStore keyValueStore = (ReadOnlyKeyValueStore)streams.store(StoreQueryParameters.fromNameAndType((String)"globalStore", (QueryableStoreType)QueryableStoreTypes.keyValueStore()));
        KeyValueIterator range = keyValueStore.reverseAll();
        while (range.hasNext()) {
            keySet.add((String)((KeyValue)range.next()).key);
        }
        range.close();
        return keySet;
    }

    static {
        BROKER_CONFIG.put("transaction.state.log.replication.factor", (Object)1);
        BROKER_CONFIG.put("transaction.state.log.min.isr", (Object)1);
        CLUSTER = new EmbeddedKafkaCluster(1, BROKER_CONFIG);
    }
}

