/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.server.util.MockTime;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.test.MockMapper;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;

@Timeout(value=600L)
@Tag(value="integration")
public class KStreamAggregationDedupIntegrationTest {
    private static final int NUM_BROKERS = 1;
    private static final long COMMIT_INTERVAL_MS = 300L;
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
    private final MockTime mockTime;
    private StreamsBuilder builder;
    private Properties streamsConfiguration;
    private KafkaStreams kafkaStreams;
    private String streamOneInput;
    private String outputTopic;
    private KGroupedStream<String, String> groupedStream;
    private Reducer<String> reducer;
    private KStream<Integer, String> stream;

    public KStreamAggregationDedupIntegrationTest() {
        this.mockTime = KStreamAggregationDedupIntegrationTest.CLUSTER.time;
    }

    @BeforeAll
    public static void startCluster() throws IOException {
        CLUSTER.start();
    }

    @AfterAll
    public static void closeCluster() {
        CLUSTER.stop();
    }

    @BeforeEach
    public void before(TestInfo testInfo) throws InterruptedException {
        this.builder = new StreamsBuilder();
        String safeTestName = IntegrationTestUtils.safeUniqueTestName(testInfo);
        this.createTopics(safeTestName);
        this.streamsConfiguration = new Properties();
        this.streamsConfiguration.put("application.id", "app-" + safeTestName);
        this.streamsConfiguration.put("bootstrap.servers", CLUSTER.bootstrapServers());
        this.streamsConfiguration.put("auto.offset.reset", "earliest");
        this.streamsConfiguration.put("state.dir", TestUtils.tempDirectory().getPath());
        this.streamsConfiguration.put("commit.interval.ms", (Object)300L);
        this.streamsConfiguration.put("statestore.cache.max.bytes", (Object)0xA00000L);
        KeyValueMapper mapper = MockMapper.selectValueMapper();
        this.stream = this.builder.stream(this.streamOneInput, Consumed.with((Serde)Serdes.Integer(), (Serde)Serdes.String()));
        this.groupedStream = this.stream.groupBy(mapper, Grouped.with((Serde)Serdes.String(), (Serde)Serdes.String()));
        this.reducer = (value1, value2) -> value1 + ":" + value2;
    }

    @AfterEach
    public void whenShuttingDown() throws IOException {
        if (this.kafkaStreams != null) {
            this.kafkaStreams.close();
        }
        IntegrationTestUtils.purgeLocalStreamsState(this.streamsConfiguration);
    }

    @Test
    public void shouldReduce(TestInfo testInfo) throws Exception {
        this.produceMessages(System.currentTimeMillis());
        this.groupedStream.reduce(this.reducer, Materialized.as((String)"reduce-by-key")).toStream().to(this.outputTopic, Produced.with((Serde)Serdes.String(), (Serde)Serdes.String()));
        this.startStreams();
        long timestamp = System.currentTimeMillis();
        this.produceMessages(timestamp);
        this.validateReceivedMessages((Deserializer)new StringDeserializer(), (Deserializer)new StringDeserializer(), Arrays.asList(new KeyValueTimestamp<String, String>("A", "A:A", timestamp), new KeyValueTimestamp<String, String>("B", "B:B", timestamp), new KeyValueTimestamp<String, String>("C", "C:C", timestamp), new KeyValueTimestamp<String, String>("D", "D:D", timestamp), new KeyValueTimestamp<String, String>("E", "E:E", timestamp)), testInfo);
    }

    @Test
    public void shouldReduceWindowed(TestInfo testInfo) throws Exception {
        long firstBatchTimestamp = System.currentTimeMillis() - 1000L;
        this.produceMessages(firstBatchTimestamp);
        long secondBatchTimestamp = System.currentTimeMillis();
        this.produceMessages(secondBatchTimestamp);
        this.produceMessages(secondBatchTimestamp);
        this.groupedStream.windowedBy((Windows)TimeWindows.of((Duration)Duration.ofMillis(500L))).reduce(this.reducer, Materialized.as((String)"reduce-time-windows")).toStream((windowedKey, value) -> (String)windowedKey.key() + "@" + windowedKey.window().start()).to(this.outputTopic, Produced.with((Serde)Serdes.String(), (Serde)Serdes.String()));
        this.startStreams();
        long firstBatchWindow = firstBatchTimestamp / 500L * 500L;
        long secondBatchWindow = secondBatchTimestamp / 500L * 500L;
        this.validateReceivedMessages((Deserializer)new StringDeserializer(), (Deserializer)new StringDeserializer(), Arrays.asList(new KeyValueTimestamp<String, String>("A@" + firstBatchWindow, "A", firstBatchTimestamp), new KeyValueTimestamp<String, String>("A@" + secondBatchWindow, "A:A", secondBatchTimestamp), new KeyValueTimestamp<String, String>("B@" + firstBatchWindow, "B", firstBatchTimestamp), new KeyValueTimestamp<String, String>("B@" + secondBatchWindow, "B:B", secondBatchTimestamp), new KeyValueTimestamp<String, String>("C@" + firstBatchWindow, "C", firstBatchTimestamp), new KeyValueTimestamp<String, String>("C@" + secondBatchWindow, "C:C", secondBatchTimestamp), new KeyValueTimestamp<String, String>("D@" + firstBatchWindow, "D", firstBatchTimestamp), new KeyValueTimestamp<String, String>("D@" + secondBatchWindow, "D:D", secondBatchTimestamp), new KeyValueTimestamp<String, String>("E@" + firstBatchWindow, "E", firstBatchTimestamp), new KeyValueTimestamp<String, String>("E@" + secondBatchWindow, "E:E", secondBatchTimestamp)), testInfo);
    }

    @Test
    public void shouldGroupByKey(TestInfo testInfo) throws Exception {
        long timestamp = this.mockTime.milliseconds();
        this.produceMessages(timestamp);
        this.produceMessages(timestamp);
        this.stream.groupByKey(Grouped.with((Serde)Serdes.Integer(), (Serde)Serdes.String())).windowedBy((Windows)TimeWindows.of((Duration)Duration.ofMillis(500L))).count(Materialized.as((String)"count-windows")).toStream((windowedKey, value) -> windowedKey.key() + "@" + windowedKey.window().start()).to(this.outputTopic, Produced.with((Serde)Serdes.String(), (Serde)Serdes.Long()));
        this.startStreams();
        long window = timestamp / 500L * 500L;
        this.validateReceivedMessages((Deserializer)new StringDeserializer(), (Deserializer)new LongDeserializer(), Arrays.asList(new KeyValueTimestamp<String, Long>("1@" + window, 2L, timestamp), new KeyValueTimestamp<String, Long>("2@" + window, 2L, timestamp), new KeyValueTimestamp<String, Long>("3@" + window, 2L, timestamp), new KeyValueTimestamp<String, Long>("4@" + window, 2L, timestamp), new KeyValueTimestamp<String, Long>("5@" + window, 2L, timestamp)), testInfo);
    }

    private void produceMessages(long timestamp) throws Exception {
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(this.streamOneInput, Arrays.asList(new KeyValue((Object)1, (Object)"A"), new KeyValue((Object)2, (Object)"B"), new KeyValue((Object)3, (Object)"C"), new KeyValue((Object)4, (Object)"D"), new KeyValue((Object)5, (Object)"E")), TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), IntegerSerializer.class, StringSerializer.class, (Properties)new Properties()), timestamp);
    }

    private void createTopics(String safeTestName) throws InterruptedException {
        this.streamOneInput = "stream-one-" + safeTestName;
        this.outputTopic = "output-" + safeTestName;
        CLUSTER.createTopic(this.streamOneInput, 3, 1);
        CLUSTER.createTopic(this.outputTopic);
    }

    private void startStreams() {
        this.kafkaStreams = new KafkaStreams(this.builder.build(), this.streamsConfiguration);
        this.kafkaStreams.start();
    }

    private <K, V> void validateReceivedMessages(Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer, List<KeyValueTimestamp<K, V>> expectedRecords, TestInfo testInfo) throws Exception {
        String safeTestName = IntegrationTestUtils.safeUniqueTestName(testInfo);
        Properties consumerProperties = new Properties();
        consumerProperties.setProperty("bootstrap.servers", CLUSTER.bootstrapServers());
        consumerProperties.setProperty("group.id", "group-" + safeTestName);
        consumerProperties.setProperty("auto.offset.reset", "earliest");
        consumerProperties.setProperty("key.deserializer", keyDeserializer.getClass().getName());
        consumerProperties.setProperty("value.deserializer", valueDeserializer.getClass().getName());
        IntegrationTestUtils.waitUntilFinalKeyValueTimestampRecordsReceived(consumerProperties, this.outputTopic, expectedRecords);
    }
}

