/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.LogCaptureAppender;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.SlidingWindows;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.kstream.internals.KStreamAggregate;
import org.apache.kafka.streams.kstream.internals.KStreamReduce;
import org.apache.kafka.streams.kstream.internals.SessionWindow;
import org.apache.kafka.streams.kstream.internals.TimeWindow;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.test.MockAggregator;
import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.MockInitializer;
import org.apache.kafka.test.MockReducer;
import org.apache.kafka.test.StreamsTestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class KGroupedStreamImplTest {
    private static final String TOPIC = "topic";
    private static final String INVALID_STORE_NAME = "~foo bar~";
    private final StreamsBuilder builder = new StreamsBuilder();
    private KGroupedStream<String, String> groupedStream;
    private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());

    @Before
    public void before() {
        KStream stream = this.builder.stream(TOPIC, Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String()));
        this.groupedStream = stream.groupByKey(Grouped.with((Serde)Serdes.String(), (Serde)Serdes.String()));
    }

    @Test
    public void shouldNotHaveNullAggregatorOnCogroup() {
        Assert.assertThrows(NullPointerException.class, () -> this.groupedStream.cogroup(null));
    }

    @Test
    public void shouldNotHaveNullReducerOnReduce() {
        Assert.assertThrows(NullPointerException.class, () -> this.groupedStream.reduce(null));
    }

    @Test
    public void shouldNotHaveInvalidStoreNameOnReduce() {
        Assert.assertThrows(TopologyException.class, () -> this.groupedStream.reduce(MockReducer.STRING_ADDER, Materialized.as((String)INVALID_STORE_NAME)));
    }

    @Test
    public void shouldNotHaveNullReducerWithWindowedReduce() {
        Assert.assertThrows(NullPointerException.class, () -> this.groupedStream.windowedBy((Windows)TimeWindows.ofSizeWithNoGrace((Duration)Duration.ofMillis(10L))).reduce(null, Materialized.as((String)"store")));
    }

    @Test
    public void shouldNotHaveNullWindowsWithWindowedReduce() {
        Assert.assertThrows(NullPointerException.class, () -> this.groupedStream.windowedBy((Windows)null));
    }

    @Test
    public void shouldNotHaveInvalidStoreNameWithWindowedReduce() {
        Assert.assertThrows(TopologyException.class, () -> this.groupedStream.windowedBy((Windows)TimeWindows.ofSizeWithNoGrace((Duration)Duration.ofMillis(10L))).reduce(MockReducer.STRING_ADDER, Materialized.as((String)INVALID_STORE_NAME)));
    }

    @Test
    public void shouldNotHaveNullInitializerOnAggregate() {
        Assert.assertThrows(NullPointerException.class, () -> this.groupedStream.aggregate(null, MockAggregator.TOSTRING_ADDER, Materialized.as((String)"store")));
    }

    @Test
    public void shouldNotHaveNullAdderOnAggregate() {
        Assert.assertThrows(NullPointerException.class, () -> this.groupedStream.aggregate(MockInitializer.STRING_INIT, null, Materialized.as((String)"store")));
    }

    @Test
    public void shouldNotHaveInvalidStoreNameOnAggregate() {
        Assert.assertThrows(TopologyException.class, () -> this.groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, Materialized.as((String)INVALID_STORE_NAME)));
    }

    @Test
    public void shouldNotHaveNullInitializerOnWindowedAggregate() {
        Assert.assertThrows(NullPointerException.class, () -> this.groupedStream.windowedBy((Windows)TimeWindows.ofSizeWithNoGrace((Duration)Duration.ofMillis(10L))).aggregate(null, MockAggregator.TOSTRING_ADDER, Materialized.as((String)"store")));
    }

    @Test
    public void shouldNotHaveNullAdderOnWindowedAggregate() {
        Assert.assertThrows(NullPointerException.class, () -> this.groupedStream.windowedBy((Windows)TimeWindows.ofSizeWithNoGrace((Duration)Duration.ofMillis(10L))).aggregate(MockInitializer.STRING_INIT, null, Materialized.as((String)"store")));
    }

    @Test
    public void shouldNotHaveNullWindowsOnWindowedAggregate() {
        Assert.assertThrows(NullPointerException.class, () -> this.groupedStream.windowedBy((Windows)null));
    }

    @Test
    public void shouldNotHaveInvalidStoreNameOnWindowedAggregate() {
        Assert.assertThrows(TopologyException.class, () -> this.groupedStream.windowedBy((Windows)TimeWindows.ofSizeWithNoGrace((Duration)Duration.ofMillis(10L))).aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, Materialized.as((String)INVALID_STORE_NAME)));
    }

    @Test
    public void shouldNotHaveNullReducerWithSlidingWindowedReduce() {
        Assert.assertThrows(NullPointerException.class, () -> this.groupedStream.windowedBy(SlidingWindows.ofTimeDifferenceAndGrace((Duration)Duration.ofMillis(10L), (Duration)Duration.ofMillis(100L))).reduce(null, Materialized.as((String)"store")));
    }

    @Test
    public void shouldNotHaveNullWindowsWithSlidingWindowedReduce() {
        Assert.assertThrows(NullPointerException.class, () -> this.groupedStream.windowedBy((SlidingWindows)null));
    }

    @Test
    public void shouldNotHaveInvalidStoreNameWithSlidingWindowedReduce() {
        Assert.assertThrows(TopologyException.class, () -> this.groupedStream.windowedBy(SlidingWindows.ofTimeDifferenceAndGrace((Duration)Duration.ofMillis(10L), (Duration)Duration.ofMillis(100L))).reduce(MockReducer.STRING_ADDER, Materialized.as((String)INVALID_STORE_NAME)));
    }

    @Test
    public void shouldNotHaveNullInitializerOnSlidingWindowedAggregate() {
        Assert.assertThrows(NullPointerException.class, () -> this.groupedStream.windowedBy(SlidingWindows.ofTimeDifferenceAndGrace((Duration)Duration.ofMillis(10L), (Duration)Duration.ofMillis(100L))).aggregate(null, MockAggregator.TOSTRING_ADDER, Materialized.as((String)"store")));
    }

    @Test
    public void shouldNotHaveNullAdderOnSlidingWindowedAggregate() {
        Assert.assertThrows(NullPointerException.class, () -> this.groupedStream.windowedBy(SlidingWindows.ofTimeDifferenceAndGrace((Duration)Duration.ofMillis(10L), (Duration)Duration.ofMillis(100L))).aggregate(MockInitializer.STRING_INIT, null, Materialized.as((String)"store")));
    }

    @Test
    public void shouldNotHaveInvalidStoreNameOnSlidingWindowedAggregate() {
        Assert.assertThrows(TopologyException.class, () -> this.groupedStream.windowedBy(SlidingWindows.ofTimeDifferenceAndGrace((Duration)Duration.ofMillis(10L), (Duration)Duration.ofMillis(100L))).aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, Materialized.as((String)INVALID_STORE_NAME)));
    }

    @Test
    public void shouldCountSlidingWindows() {
        MockApiProcessorSupplier<Windowed<String>, Long, Void, Void> supplier = new MockApiProcessorSupplier<Windowed<String>, Long, Void, Void>();
        this.groupedStream.windowedBy(SlidingWindows.ofTimeDifferenceAndGrace((Duration)Duration.ofMillis(500L), (Duration)Duration.ofMillis(2000L))).count(Materialized.as((String)"aggregate-by-key-windowed")).toStream().process(supplier, new String[0]);
        this.doCountSlidingWindows(supplier);
    }

    @Test
    public void shouldCountSlidingWindowsWithInternalStoreName() {
        MockApiProcessorSupplier<Windowed<String>, Long, Void, Void> supplier = new MockApiProcessorSupplier<Windowed<String>, Long, Void, Void>();
        this.groupedStream.windowedBy(SlidingWindows.ofTimeDifferenceAndGrace((Duration)Duration.ofMillis(500L), (Duration)Duration.ofMillis(2000L))).count().toStream().process(supplier, new String[0]);
        this.doCountSlidingWindows(supplier);
    }

    private void doCountSlidingWindows(MockApiProcessorSupplier<Windowed<String>, Long, Void, Void> supplier) {
        try (TopologyTestDriver driver = new TopologyTestDriver(this.builder.build(), this.props);){
            TestInputTopic inputTopic = driver.createInputTopic(TOPIC, (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
            inputTopic.pipeInput((Object)"1", (Object)"A", 500L);
            inputTopic.pipeInput((Object)"1", (Object)"A", 999L);
            inputTopic.pipeInput((Object)"1", (Object)"A", 600L);
            inputTopic.pipeInput((Object)"2", (Object)"B", 500L);
            inputTopic.pipeInput((Object)"2", (Object)"B", 600L);
            inputTopic.pipeInput((Object)"2", (Object)"B", 700L);
            inputTopic.pipeInput((Object)"3", (Object)"C", 501L);
            inputTopic.pipeInput((Object)"1", (Object)"A", 1000L);
            inputTopic.pipeInput((Object)"1", (Object)"A", 1000L);
            inputTopic.pipeInput((Object)"2", (Object)"B", 1000L);
            inputTopic.pipeInput((Object)"2", (Object)"B", 1000L);
            inputTopic.pipeInput((Object)"3", (Object)"C", 600L);
        }
        Comparator<KeyValueTimestamp> comparator = Comparator.comparing(o -> (String)((Windowed)o.key()).key()).thenComparing(o -> ((Windowed)o.key()).window().start());
        ArrayList<KeyValueTimestamp<Windowed<String>, Long>> actual = supplier.theCapturedProcessor().processed();
        actual.sort(comparator);
        MatcherAssert.assertThat(actual, (Matcher)CoreMatchers.equalTo(Arrays.asList(new KeyValueTimestamp<Windowed, Long>(new Windowed((Object)"1", (Window)new TimeWindow(0L, 500L)), 1L, 500L), new KeyValueTimestamp<Windowed, Long>(new Windowed((Object)"1", (Window)new TimeWindow(100L, 600L)), 2L, 600L), new KeyValueTimestamp<Windowed, Long>(new Windowed((Object)"1", (Window)new TimeWindow(499L, 999L)), 2L, 999L), new KeyValueTimestamp<Windowed, Long>(new Windowed((Object)"1", (Window)new TimeWindow(499L, 999L)), 3L, 999L), new KeyValueTimestamp<Windowed, Long>(new Windowed((Object)"1", (Window)new TimeWindow(500L, 1000L)), 4L, 1000L), new KeyValueTimestamp<Windowed, Long>(new Windowed((Object)"1", (Window)new TimeWindow(500L, 1000L)), 5L, 1000L), new KeyValueTimestamp<Windowed, Long>(new Windowed((Object)"1", (Window)new TimeWindow(501L, 1001L)), 1L, 999L), new KeyValueTimestamp<Windowed, Long>(new Windowed((Object)"1", (Window)new TimeWindow(501L, 1001L)), 2L, 999L), new KeyValueTimestamp<Windowed, Long>(new Windowed((Object)"1", (Window)new TimeWindow(501L, 1001L)), 3L, 1000L), new KeyValueTimestamp<Windowed, Long>(new Windowed((Object)"1", (Window)new TimeWindow(501L, 1001L)), 4L, 1000L), new KeyValueTimestamp<Windowed, Long>(new Windowed((Object)"1", (Window)new TimeWindow(601L, 1101L)), 1L, 999L), new KeyValueTimestamp<Windowed, Long>(new Windowed((Object)"1", (Window)new TimeWindow(601L, 1101L)), 2L, 1000L), new KeyValueTimestamp<Windowed, Long>(new Windowed((Object)"1", (Window)new TimeWindow(601L, 1101L)), 3L, 1000L), new KeyValueTimestamp<Windowed, Long>(new Windowed((Object)"1", (Window)new TimeWindow(1000L, 1500L)), 1L, 1000L), new KeyValueTimestamp<Windowed, Long>(new Windowed((Object)"1", (Window)new TimeWindow(1000L, 1500L)), 2L, 1000L), new KeyValueTimestamp<Windowed, Long>(new Windowed((Object)"2", (Window)new TimeWindow(0L, 500L)), 1L, 500L), new KeyValueTimestamp<Windowed, Long>(new Windowed((Object)"2", (Window)new TimeWindow(100L, 600L)), 2L, 600L), new KeyValueTimestamp<Windowed, Long>(new Windowed((Object)"2", (Window)new TimeWindow(200L, 700L)), 3L, 700L), new KeyValueTimestamp<Windowed, Long>(new Windowed((Object)"2", (Window)new TimeWindow(500L, 1000L)), 4L, 1000L), new KeyValueTimestamp<Windowed, Long>(new Windowed((Object)"2", (Window)new TimeWindow(500L, 1000L)), 5L, 1000L), new KeyValueTimestamp<Windowed, Long>(new Windowed((Object)"2", (Window)new TimeWindow(501L, 1001L)), 1L, 600L), new KeyValueTimestamp<Windowed, Long>(new Windowed((Object)"2", (Window)new TimeWindow(501L, 1001L)), 2L, 700L), new KeyValueTimestamp<Windowed, Long>(new Windowed((Object)"2", (Window)new TimeWindow(501L, 1001L)), 3L, 1000L), new KeyValueTimestamp<Windowed, Long>(new Windowed((Object)"2", (Window)new TimeWindow(501L, 1001L)), 4L, 1000L), new KeyValueTimestamp<Windowed, Long>(new Windowed((Object)"2", (Window)new TimeWindow(601L, 1101L)), 1L, 700L), new KeyValueTimestamp<Windowed, Long>(new Windowed((Object)"2", (Window)new TimeWindow(601L, 1101L)), 2L, 1000L), new KeyValueTimestamp<Windowed, Long>(new Windowed((Object)"2", (Window)new TimeWindow(601L, 1101L)), 3L, 1000L), new KeyValueTimestamp<Windowed, Long>(new Windowed((Object)"2", (Window)new TimeWindow(701L, 1201L)), 1L, 1000L), new KeyValueTimestamp<Windowed, Long>(new Windowed((Object)"2", (Window)new TimeWindow(701L, 1201L)), 2L, 1000L), new KeyValueTimestamp<Windowed, Long>(new Windowed((Object)"3", (Window)new TimeWindow(1L, 501L)), 1L, 501L), new KeyValueTimestamp<Windowed, Long>(new Windowed((Object)"3", (Window)new TimeWindow(100L, 600L)), 2L, 600L), new KeyValueTimestamp<Windowed, Long>(new Windowed((Object)"3", (Window)new TimeWindow(502L, 1002L)), 1L, 600L))));
    }

    private void doAggregateSessionWindows(MockApiProcessorSupplier<Windowed<String>, Integer, Void, Void> supplier) {
        try (TopologyTestDriver driver = new TopologyTestDriver(this.builder.build(), this.props);){
            TestInputTopic inputTopic = driver.createInputTopic(TOPIC, (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
            inputTopic.pipeInput((Object)"1", (Object)"1", 10L);
            inputTopic.pipeInput((Object)"2", (Object)"2", 15L);
            inputTopic.pipeInput((Object)"1", (Object)"1", 30L);
            inputTopic.pipeInput((Object)"1", (Object)"1", 70L);
            inputTopic.pipeInput((Object)"1", (Object)"1", 100L);
            inputTopic.pipeInput((Object)"1", (Object)"1", 90L);
        }
        Map<Windowed<String>, ValueAndTimestamp<Integer>> result = supplier.theCapturedProcessor().lastValueAndTimestampPerKey();
        Assert.assertEquals((Object)ValueAndTimestamp.make((Object)2, (long)30L), result.get(new Windowed((Object)"1", (Window)new SessionWindow(10L, 30L))));
        Assert.assertEquals((Object)ValueAndTimestamp.make((Object)1, (long)15L), result.get(new Windowed((Object)"2", (Window)new SessionWindow(15L, 15L))));
        Assert.assertEquals((Object)ValueAndTimestamp.make((Object)3, (long)100L), result.get(new Windowed((Object)"1", (Window)new SessionWindow(70L, 100L))));
    }

    @Test
    public void shouldAggregateSessionWindows() {
        MockApiProcessorSupplier<Windowed<String>, Integer, Void, Void> supplier = new MockApiProcessorSupplier<Windowed<String>, Integer, Void, Void>();
        KTable table = this.groupedStream.windowedBy(SessionWindows.ofInactivityGapWithNoGrace((Duration)Duration.ofMillis(30L))).aggregate(() -> 0, (aggKey, value, aggregate) -> aggregate + 1, (aggKey, aggOne, aggTwo) -> aggOne + aggTwo, Materialized.as((String)"session-store").withValueSerde(Serdes.Integer()));
        table.toStream().process(supplier, new String[0]);
        this.doAggregateSessionWindows(supplier);
        Assert.assertEquals((Object)table.queryableStoreName(), (Object)"session-store");
    }

    @Test
    public void shouldAggregateSessionWindowsWithInternalStoreName() {
        MockApiProcessorSupplier<Windowed<String>, Integer, Void, Void> supplier = new MockApiProcessorSupplier<Windowed<String>, Integer, Void, Void>();
        KTable table = this.groupedStream.windowedBy(SessionWindows.ofInactivityGapWithNoGrace((Duration)Duration.ofMillis(30L))).aggregate(() -> 0, (aggKey, value, aggregate) -> aggregate + 1, (aggKey, aggOne, aggTwo) -> aggOne + aggTwo, Materialized.with(null, (Serde)Serdes.Integer()));
        table.toStream().process(supplier, new String[0]);
        this.doAggregateSessionWindows(supplier);
    }

    private void doCountSessionWindows(MockApiProcessorSupplier<Windowed<String>, Long, Void, Void> supplier) {
        try (TopologyTestDriver driver = new TopologyTestDriver(this.builder.build(), this.props);){
            TestInputTopic inputTopic = driver.createInputTopic(TOPIC, (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
            inputTopic.pipeInput((Object)"1", (Object)"1", 10L);
            inputTopic.pipeInput((Object)"2", (Object)"2", 15L);
            inputTopic.pipeInput((Object)"1", (Object)"1", 30L);
            inputTopic.pipeInput((Object)"1", (Object)"1", 70L);
            inputTopic.pipeInput((Object)"1", (Object)"1", 100L);
            inputTopic.pipeInput((Object)"1", (Object)"1", 90L);
        }
        Map<Windowed<String>, ValueAndTimestamp<Long>> result = supplier.theCapturedProcessor().lastValueAndTimestampPerKey();
        Assert.assertEquals((Object)ValueAndTimestamp.make((Object)2L, (long)30L), result.get(new Windowed((Object)"1", (Window)new SessionWindow(10L, 30L))));
        Assert.assertEquals((Object)ValueAndTimestamp.make((Object)1L, (long)15L), result.get(new Windowed((Object)"2", (Window)new SessionWindow(15L, 15L))));
        Assert.assertEquals((Object)ValueAndTimestamp.make((Object)3L, (long)100L), result.get(new Windowed((Object)"1", (Window)new SessionWindow(70L, 100L))));
    }

    @Test
    public void shouldCountSessionWindows() {
        MockApiProcessorSupplier<Windowed<String>, Long, Void, Void> supplier = new MockApiProcessorSupplier<Windowed<String>, Long, Void, Void>();
        KTable table = this.groupedStream.windowedBy(SessionWindows.ofInactivityGapWithNoGrace((Duration)Duration.ofMillis(30L))).count(Materialized.as((String)"session-store"));
        table.toStream().process(supplier, new String[0]);
        this.doCountSessionWindows(supplier);
        Assert.assertEquals((Object)table.queryableStoreName(), (Object)"session-store");
    }

    @Test
    public void shouldCountSessionWindowsWithInternalStoreName() {
        MockApiProcessorSupplier<Windowed<String>, Long, Void, Void> supplier = new MockApiProcessorSupplier<Windowed<String>, Long, Void, Void>();
        KTable table = this.groupedStream.windowedBy(SessionWindows.ofInactivityGapWithNoGrace((Duration)Duration.ofMillis(30L))).count();
        table.toStream().process(supplier, new String[0]);
        this.doCountSessionWindows(supplier);
        Assert.assertNull((Object)table.queryableStoreName());
    }

    private void doReduceSessionWindows(MockApiProcessorSupplier<Windowed<String>, String, Void, Void> supplier) {
        try (TopologyTestDriver driver = new TopologyTestDriver(this.builder.build(), this.props);){
            TestInputTopic inputTopic = driver.createInputTopic(TOPIC, (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
            inputTopic.pipeInput((Object)"1", (Object)"A", 10L);
            inputTopic.pipeInput((Object)"2", (Object)"Z", 15L);
            inputTopic.pipeInput((Object)"1", (Object)"B", 30L);
            inputTopic.pipeInput((Object)"1", (Object)"A", 70L);
            inputTopic.pipeInput((Object)"1", (Object)"B", 100L);
            inputTopic.pipeInput((Object)"1", (Object)"C", 90L);
        }
        Map<Windowed<String>, ValueAndTimestamp<String>> result = supplier.theCapturedProcessor().lastValueAndTimestampPerKey();
        Assert.assertEquals((Object)ValueAndTimestamp.make((Object)"A:B", (long)30L), result.get(new Windowed((Object)"1", (Window)new SessionWindow(10L, 30L))));
        Assert.assertEquals((Object)ValueAndTimestamp.make((Object)"Z", (long)15L), result.get(new Windowed((Object)"2", (Window)new SessionWindow(15L, 15L))));
        Assert.assertEquals((Object)ValueAndTimestamp.make((Object)"A:B:C", (long)100L), result.get(new Windowed((Object)"1", (Window)new SessionWindow(70L, 100L))));
    }

    @Test
    public void shouldReduceSessionWindows() {
        MockApiProcessorSupplier<Windowed<String>, String, Void, Void> supplier = new MockApiProcessorSupplier<Windowed<String>, String, Void, Void>();
        KTable table = this.groupedStream.windowedBy(SessionWindows.ofInactivityGapWithNoGrace((Duration)Duration.ofMillis(30L))).reduce((value1, value2) -> value1 + ":" + value2, Materialized.as((String)"session-store"));
        table.toStream().process(supplier, new String[0]);
        this.doReduceSessionWindows(supplier);
        Assert.assertEquals((Object)table.queryableStoreName(), (Object)"session-store");
    }

    @Test
    public void shouldReduceSessionWindowsWithInternalStoreName() {
        MockApiProcessorSupplier<Windowed<String>, String, Void, Void> supplier = new MockApiProcessorSupplier<Windowed<String>, String, Void, Void>();
        KTable table = this.groupedStream.windowedBy(SessionWindows.ofInactivityGapWithNoGrace((Duration)Duration.ofMillis(30L))).reduce((value1, value2) -> value1 + ":" + value2);
        table.toStream().process(supplier, new String[0]);
        this.doReduceSessionWindows(supplier);
        Assert.assertNull((Object)table.queryableStoreName());
    }

    @Test
    public void shouldNotAcceptNullReducerWhenReducingSessionWindows() {
        Assert.assertThrows(NullPointerException.class, () -> this.groupedStream.windowedBy(SessionWindows.ofInactivityGapWithNoGrace((Duration)Duration.ofMillis(30L))).reduce(null, Materialized.as((String)"store")));
    }

    @Test
    public void shouldNotAcceptNullSessionWindowsReducingSessionWindows() {
        Assert.assertThrows(NullPointerException.class, () -> this.groupedStream.windowedBy((SessionWindows)null));
    }

    @Test
    public void shouldNotAcceptInvalidStoreNameWhenReducingSessionWindows() {
        Assert.assertThrows(TopologyException.class, () -> this.groupedStream.windowedBy(SessionWindows.ofInactivityGapWithNoGrace((Duration)Duration.ofMillis(30L))).reduce(MockReducer.STRING_ADDER, Materialized.as((String)INVALID_STORE_NAME)));
    }

    @Test
    public void shouldNotAcceptNullStateStoreSupplierWhenReducingSessionWindows() {
        Assert.assertThrows(NullPointerException.class, () -> this.groupedStream.windowedBy(SessionWindows.ofInactivityGapWithNoGrace((Duration)Duration.ofMillis(30L))).reduce(null, Materialized.as((String)null)));
    }

    @Test
    public void shouldNotAcceptNullInitializerWhenAggregatingSessionWindows() {
        Assert.assertThrows(NullPointerException.class, () -> this.groupedStream.windowedBy(SessionWindows.ofInactivityGapWithNoGrace((Duration)Duration.ofMillis(30L))).aggregate(null, MockAggregator.TOSTRING_ADDER, (aggKey, aggOne, aggTwo) -> null, Materialized.as((String)"storeName")));
    }

    @Test
    public void shouldNotAcceptNullAggregatorWhenAggregatingSessionWindows() {
        Assert.assertThrows(NullPointerException.class, () -> this.groupedStream.windowedBy(SessionWindows.ofInactivityGapWithNoGrace((Duration)Duration.ofMillis(30L))).aggregate(MockInitializer.STRING_INIT, null, (aggKey, aggOne, aggTwo) -> null, Materialized.as((String)"storeName")));
    }

    @Test
    public void shouldNotAcceptNullSessionMergerWhenAggregatingSessionWindows() {
        Assert.assertThrows(NullPointerException.class, () -> this.groupedStream.windowedBy(SessionWindows.ofInactivityGapWithNoGrace((Duration)Duration.ofMillis(30L))).aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, null, Materialized.as((String)"storeName")));
    }

    @Test
    public void shouldNotAcceptNullSessionWindowsWhenAggregatingSessionWindows() {
        Assert.assertThrows(NullPointerException.class, () -> this.groupedStream.windowedBy((SessionWindows)null));
    }

    @Test
    public void shouldAcceptNullStoreNameWhenAggregatingSessionWindows() {
        this.groupedStream.windowedBy(SessionWindows.ofInactivityGapWithNoGrace((Duration)Duration.ofMillis(10L))).aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, (aggKey, aggOne, aggTwo) -> null, Materialized.with((Serde)Serdes.String(), (Serde)Serdes.String()));
    }

    @Test
    public void shouldNotAcceptInvalidStoreNameWhenAggregatingSessionWindows() {
        Assert.assertThrows(TopologyException.class, () -> this.groupedStream.windowedBy(SessionWindows.ofInactivityGapWithNoGrace((Duration)Duration.ofMillis(10L))).aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, (aggKey, aggOne, aggTwo) -> null, Materialized.as((String)INVALID_STORE_NAME)));
    }

    @Test
    public void shouldThrowNullPointerOnReduceWhenMaterializedIsNull() {
        Assert.assertThrows(NullPointerException.class, () -> this.groupedStream.reduce(MockReducer.STRING_ADDER, null));
    }

    @Test
    public void shouldThrowNullPointerOnAggregateWhenMaterializedIsNull() {
        Assert.assertThrows(NullPointerException.class, () -> this.groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, null));
    }

    @Test
    public void shouldThrowNullPointerOnCountWhenMaterializedIsNull() {
        Assert.assertThrows(NullPointerException.class, () -> this.groupedStream.count((Materialized)null));
    }

    @Test
    public void shouldCountAndMaterializeResults() {
        this.groupedStream.count(Materialized.as((String)"count").withKeySerde(Serdes.String()));
        try (TopologyTestDriver driver = new TopologyTestDriver(this.builder.build(), this.props);){
            this.processData(driver);
            KeyValueStore count = driver.getKeyValueStore("count");
            MatcherAssert.assertThat((Object)count.get((Object)"1"), (Matcher)CoreMatchers.equalTo((Object)3L));
            MatcherAssert.assertThat((Object)count.get((Object)"2"), (Matcher)CoreMatchers.equalTo((Object)1L));
            MatcherAssert.assertThat((Object)count.get((Object)"3"), (Matcher)CoreMatchers.equalTo((Object)2L));
            count = driver.getTimestampedKeyValueStore("count");
            MatcherAssert.assertThat((Object)count.get((Object)"1"), (Matcher)CoreMatchers.equalTo((Object)ValueAndTimestamp.make((Object)3L, (long)10L)));
            MatcherAssert.assertThat((Object)count.get((Object)"2"), (Matcher)CoreMatchers.equalTo((Object)ValueAndTimestamp.make((Object)1L, (long)1L)));
            MatcherAssert.assertThat((Object)count.get((Object)"3"), (Matcher)CoreMatchers.equalTo((Object)ValueAndTimestamp.make((Object)2L, (long)9L)));
        }
    }

    @Test
    public void shouldLogAndMeasureSkipsInAggregate() {
        this.groupedStream.count(Materialized.as((String)"count").withKeySerde(Serdes.String()));
        try (LogCaptureAppender appender = LogCaptureAppender.createAndRegister(KStreamAggregate.class);
             TopologyTestDriver driver = new TopologyTestDriver(this.builder.build(), this.props);){
            this.processData(driver);
            MatcherAssert.assertThat((Object)appender.getMessages(), (Matcher)CoreMatchers.hasItem((Object)"Skipping record due to null key or value. topic=[topic] partition=[0] offset=[6]"));
        }
    }

    @Test
    public void shouldReduceAndMaterializeResults() {
        this.groupedStream.reduce(MockReducer.STRING_ADDER, Materialized.as((String)"reduce").withKeySerde(Serdes.String()).withValueSerde(Serdes.String()));
        try (TopologyTestDriver driver = new TopologyTestDriver(this.builder.build(), this.props);){
            this.processData(driver);
            KeyValueStore reduced = driver.getKeyValueStore("reduce");
            MatcherAssert.assertThat((Object)reduced.get((Object)"1"), (Matcher)CoreMatchers.equalTo((Object)"A+C+D"));
            MatcherAssert.assertThat((Object)reduced.get((Object)"2"), (Matcher)CoreMatchers.equalTo((Object)"B"));
            MatcherAssert.assertThat((Object)reduced.get((Object)"3"), (Matcher)CoreMatchers.equalTo((Object)"E+F"));
            reduced = driver.getTimestampedKeyValueStore("reduce");
            MatcherAssert.assertThat((Object)reduced.get((Object)"1"), (Matcher)CoreMatchers.equalTo((Object)ValueAndTimestamp.make((Object)"A+C+D", (long)10L)));
            MatcherAssert.assertThat((Object)reduced.get((Object)"2"), (Matcher)CoreMatchers.equalTo((Object)ValueAndTimestamp.make((Object)"B", (long)1L)));
            MatcherAssert.assertThat((Object)reduced.get((Object)"3"), (Matcher)CoreMatchers.equalTo((Object)ValueAndTimestamp.make((Object)"E+F", (long)9L)));
        }
    }

    @Test
    public void shouldLogAndMeasureSkipsInReduce() {
        this.groupedStream.reduce(MockReducer.STRING_ADDER, Materialized.as((String)"reduce").withKeySerde(Serdes.String()).withValueSerde(Serdes.String()));
        try (LogCaptureAppender appender = LogCaptureAppender.createAndRegister(KStreamReduce.class);
             TopologyTestDriver driver = new TopologyTestDriver(this.builder.build(), this.props);){
            this.processData(driver);
            MatcherAssert.assertThat((Object)appender.getMessages(), (Matcher)CoreMatchers.hasItem((Object)"Skipping record due to null key or value. topic=[topic] partition=[0] offset=[6]"));
        }
    }

    @Test
    public void shouldAggregateAndMaterializeResults() {
        this.groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, Materialized.as((String)"aggregate").withKeySerde(Serdes.String()).withValueSerde(Serdes.String()));
        try (TopologyTestDriver driver = new TopologyTestDriver(this.builder.build(), this.props);){
            this.processData(driver);
            KeyValueStore aggregate = driver.getKeyValueStore("aggregate");
            MatcherAssert.assertThat((Object)aggregate.get((Object)"1"), (Matcher)CoreMatchers.equalTo((Object)"0+A+C+D"));
            MatcherAssert.assertThat((Object)aggregate.get((Object)"2"), (Matcher)CoreMatchers.equalTo((Object)"0+B"));
            MatcherAssert.assertThat((Object)aggregate.get((Object)"3"), (Matcher)CoreMatchers.equalTo((Object)"0+E+F"));
            aggregate = driver.getTimestampedKeyValueStore("aggregate");
            MatcherAssert.assertThat((Object)aggregate.get((Object)"1"), (Matcher)CoreMatchers.equalTo((Object)ValueAndTimestamp.make((Object)"0+A+C+D", (long)10L)));
            MatcherAssert.assertThat((Object)aggregate.get((Object)"2"), (Matcher)CoreMatchers.equalTo((Object)ValueAndTimestamp.make((Object)"0+B", (long)1L)));
            MatcherAssert.assertThat((Object)aggregate.get((Object)"3"), (Matcher)CoreMatchers.equalTo((Object)ValueAndTimestamp.make((Object)"0+E+F", (long)9L)));
        }
    }

    @Test
    public void shouldAggregateWithDefaultSerdes() {
        MockApiProcessorSupplier supplier = new MockApiProcessorSupplier();
        this.groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER).toStream().process(supplier, new String[0]);
        try (TopologyTestDriver driver = new TopologyTestDriver(this.builder.build(), this.props);){
            this.processData(driver);
            MatcherAssert.assertThat(supplier.theCapturedProcessor().lastValueAndTimestampPerKey().get("1"), (Matcher)CoreMatchers.equalTo((Object)ValueAndTimestamp.make((Object)"0+A+C+D", (long)10L)));
            MatcherAssert.assertThat(supplier.theCapturedProcessor().lastValueAndTimestampPerKey().get("2"), (Matcher)CoreMatchers.equalTo((Object)ValueAndTimestamp.make((Object)"0+B", (long)1L)));
            MatcherAssert.assertThat(supplier.theCapturedProcessor().lastValueAndTimestampPerKey().get("3"), (Matcher)CoreMatchers.equalTo((Object)ValueAndTimestamp.make((Object)"0+E+F", (long)9L)));
        }
    }

    private void processData(TopologyTestDriver driver) {
        TestInputTopic inputTopic = driver.createInputTopic(TOPIC, (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
        inputTopic.pipeInput((Object)"1", (Object)"A", 5L);
        inputTopic.pipeInput((Object)"2", (Object)"B", 1L);
        inputTopic.pipeInput((Object)"1", (Object)"C", 3L);
        inputTopic.pipeInput((Object)"1", (Object)"D", 10L);
        inputTopic.pipeInput((Object)"3", (Object)"E", 8L);
        inputTopic.pipeInput((Object)"3", (Object)"F", 9L);
        inputTopic.pipeInput((Object)"3", (Object)null);
    }

    private void doCountWindowed(MockApiProcessorSupplier<Windowed<String>, Long, Void, Void> supplier) {
        try (TopologyTestDriver driver = new TopologyTestDriver(this.builder.build(), this.props);){
            TestInputTopic inputTopic = driver.createInputTopic(TOPIC, (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
            inputTopic.pipeInput((Object)"1", (Object)"A", 0L);
            inputTopic.pipeInput((Object)"1", (Object)"A", 499L);
            inputTopic.pipeInput((Object)"1", (Object)"A", 100L);
            inputTopic.pipeInput((Object)"2", (Object)"B", 0L);
            inputTopic.pipeInput((Object)"2", (Object)"B", 100L);
            inputTopic.pipeInput((Object)"2", (Object)"B", 200L);
            inputTopic.pipeInput((Object)"3", (Object)"C", 1L);
            inputTopic.pipeInput((Object)"1", (Object)"A", 500L);
            inputTopic.pipeInput((Object)"1", (Object)"A", 500L);
            inputTopic.pipeInput((Object)"2", (Object)"B", 500L);
            inputTopic.pipeInput((Object)"2", (Object)"B", 500L);
            inputTopic.pipeInput((Object)"3", (Object)"B", 100L);
        }
        MatcherAssert.assertThat(supplier.theCapturedProcessor().processed(), (Matcher)CoreMatchers.equalTo(Arrays.asList(new KeyValueTimestamp<Windowed, Long>(new Windowed((Object)"1", (Window)new TimeWindow(0L, 500L)), 1L, 0L), new KeyValueTimestamp<Windowed, Long>(new Windowed((Object)"1", (Window)new TimeWindow(0L, 500L)), 2L, 499L), new KeyValueTimestamp<Windowed, Long>(new Windowed((Object)"1", (Window)new TimeWindow(0L, 500L)), 3L, 499L), new KeyValueTimestamp<Windowed, Long>(new Windowed((Object)"2", (Window)new TimeWindow(0L, 500L)), 1L, 0L), new KeyValueTimestamp<Windowed, Long>(new Windowed((Object)"2", (Window)new TimeWindow(0L, 500L)), 2L, 100L), new KeyValueTimestamp<Windowed, Long>(new Windowed((Object)"2", (Window)new TimeWindow(0L, 500L)), 3L, 200L), new KeyValueTimestamp<Windowed, Long>(new Windowed((Object)"3", (Window)new TimeWindow(0L, 500L)), 1L, 1L), new KeyValueTimestamp<Windowed, Long>(new Windowed((Object)"1", (Window)new TimeWindow(500L, 1000L)), 1L, 500L), new KeyValueTimestamp<Windowed, Long>(new Windowed((Object)"1", (Window)new TimeWindow(500L, 1000L)), 2L, 500L), new KeyValueTimestamp<Windowed, Long>(new Windowed((Object)"2", (Window)new TimeWindow(500L, 1000L)), 1L, 500L), new KeyValueTimestamp<Windowed, Long>(new Windowed((Object)"2", (Window)new TimeWindow(500L, 1000L)), 2L, 500L), new KeyValueTimestamp<Windowed, Long>(new Windowed((Object)"3", (Window)new TimeWindow(0L, 500L)), 2L, 100L))));
    }

    @Test
    public void shouldCountWindowed() {
        MockApiProcessorSupplier<Windowed<String>, Long, Void, Void> supplier = new MockApiProcessorSupplier<Windowed<String>, Long, Void, Void>();
        this.groupedStream.windowedBy((Windows)TimeWindows.ofSizeAndGrace((Duration)Duration.ofMillis(500L), (Duration)Duration.ofMillis(100L))).count(Materialized.as((String)"aggregate-by-key-windowed")).toStream().process(supplier, new String[0]);
        this.doCountWindowed(supplier);
    }

    @Test
    public void shouldCountWindowedWithInternalStoreName() {
        MockApiProcessorSupplier<Windowed<String>, Long, Void, Void> supplier = new MockApiProcessorSupplier<Windowed<String>, Long, Void, Void>();
        this.groupedStream.windowedBy((Windows)TimeWindows.ofSizeAndGrace((Duration)Duration.ofMillis(500L), (Duration)Duration.ofMillis(100L))).count().toStream().process(supplier, new String[0]);
        this.doCountWindowed(supplier);
    }
}

