/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.stream.Collectors;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.SlidingWindows;
import org.apache.kafka.streams.kstream.TimeWindowedDeserializer;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.KStreamSlidingWindowAggregate;
import org.apache.kafka.streams.kstream.internals.TimeWindow;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
import org.apache.kafka.streams.state.internals.InMemoryWindowBytesStoreSupplier;
import org.apache.kafka.streams.state.internals.InMemoryWindowStore;
import org.apache.kafka.streams.test.TestRecord;
import org.apache.kafka.test.MockAggregator;
import org.apache.kafka.test.MockInitializer;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockReducer;
import org.apache.kafka.test.StreamsTestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class KStreamSlidingWindowAggregateTest {
    @Parameterized.Parameter
    public boolean inOrderIterator;
    private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
    private final String threadId = Thread.currentThread().getName();

    @Parameterized.Parameters(name="{0}")
    public static Collection<Boolean[]> data() {
        return Arrays.asList({false}, {true});
    }

    @Test
    public void testAggregateSmallInput() {
        StreamsBuilder builder = new StreamsBuilder();
        String topic = "topic";
        InOrderMemoryWindowStoreSupplier storeSupplier = this.inOrderIterator ? new InOrderMemoryWindowStoreSupplier("InOrder", 50000L, 10L, false) : Stores.inMemoryWindowStore((String)"Reverse", (Duration)Duration.ofMillis(50000L), (Duration)Duration.ofMillis(10L), (boolean)false);
        KTable table = builder.stream("topic", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String())).groupByKey(Grouped.with((Serde)Serdes.String(), (Serde)Serdes.String())).windowedBy(SlidingWindows.withTimeDifferenceAndGrace((Duration)Duration.ofMillis(10L), (Duration)Duration.ofMillis(50L))).aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, Materialized.as((WindowBytesStoreSupplier)storeSupplier));
        MockProcessorSupplier supplier = new MockProcessorSupplier();
        table.toStream().process(supplier, new String[0]);
        TopologyTestDriver driver = new TopologyTestDriver(builder.build(), this.props);
        Object object = null;
        try {
            TestInputTopic testInputTopic = driver.createInputTopic("topic", (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
            testInputTopic.pipeInput((Object)"A", (Object)"1", 10L);
            testInputTopic.pipeInput((Object)"A", (Object)"2", 15L);
            testInputTopic.pipeInput((Object)"A", (Object)"3", 20L);
            testInputTopic.pipeInput((Object)"A", (Object)"4", 22L);
            testInputTopic.pipeInput((Object)"A", (Object)"5", 30L);
        }
        catch (Throwable throwable) {
            object = throwable;
            throw throwable;
        }
        finally {
            if (driver != null) {
                if (object != null) {
                    try {
                        driver.close();
                    }
                    catch (Throwable throwable) {
                        ((Throwable)object).addSuppressed(throwable);
                    }
                } else {
                    driver.close();
                }
            }
        }
        HashMap<Long, ValueAndTimestamp> actual = new HashMap<Long, ValueAndTimestamp>();
        for (KeyValueTimestamp keyValueTimestamp : supplier.theCapturedProcessor().processed()) {
            ValueAndTimestamp valueAndTimestamp;
            Windowed window = (Windowed)keyValueTimestamp.key();
            Long start = window.window().start();
            if (actual.putIfAbsent(start, valueAndTimestamp = ValueAndTimestamp.make(keyValueTimestamp.value(), (long)keyValueTimestamp.timestamp())) == null) continue;
            actual.replace(start, valueAndTimestamp);
        }
        HashMap<Long, ValueAndTimestamp> expected = new HashMap<Long, ValueAndTimestamp>();
        expected.put(0L, ValueAndTimestamp.make((Object)"0+1", (long)10L));
        expected.put(5L, ValueAndTimestamp.make((Object)"0+1+2", (long)15L));
        expected.put(10L, ValueAndTimestamp.make((Object)"0+1+2+3", (long)20L));
        expected.put(11L, ValueAndTimestamp.make((Object)"0+2+3", (long)20L));
        expected.put(12L, ValueAndTimestamp.make((Object)"0+2+3+4", (long)22L));
        expected.put(16L, ValueAndTimestamp.make((Object)"0+3+4", (long)22L));
        expected.put(20L, ValueAndTimestamp.make((Object)"0+3+4+5", (long)30L));
        expected.put(21L, ValueAndTimestamp.make((Object)"0+4+5", (long)30L));
        expected.put(23L, ValueAndTimestamp.make((Object)"0+5", (long)30L));
        Assert.assertEquals(expected, actual);
    }

    @Test
    public void testReduceSmallInput() {
        StreamsBuilder builder = new StreamsBuilder();
        String topic = "topic";
        InOrderMemoryWindowStoreSupplier storeSupplier = this.inOrderIterator ? new InOrderMemoryWindowStoreSupplier("InOrder", 50000L, 10L, false) : Stores.inMemoryWindowStore((String)"Reverse", (Duration)Duration.ofMillis(50000L), (Duration)Duration.ofMillis(10L), (boolean)false);
        KTable table = builder.stream("topic", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String())).groupByKey(Grouped.with((Serde)Serdes.String(), (Serde)Serdes.String())).windowedBy(SlidingWindows.withTimeDifferenceAndGrace((Duration)Duration.ofMillis(10L), (Duration)Duration.ofMillis(50L))).reduce(MockReducer.STRING_ADDER, Materialized.as((WindowBytesStoreSupplier)storeSupplier));
        MockProcessorSupplier supplier = new MockProcessorSupplier();
        table.toStream().process(supplier, new String[0]);
        TopologyTestDriver driver = new TopologyTestDriver(builder.build(), this.props);
        Object object = null;
        try {
            TestInputTopic testInputTopic = driver.createInputTopic("topic", (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
            testInputTopic.pipeInput((Object)"A", (Object)"1", 10L);
            testInputTopic.pipeInput((Object)"A", (Object)"2", 14L);
            testInputTopic.pipeInput((Object)"A", (Object)"3", 15L);
            testInputTopic.pipeInput((Object)"A", (Object)"4", 22L);
            testInputTopic.pipeInput((Object)"A", (Object)"5", 26L);
            testInputTopic.pipeInput((Object)"A", (Object)"6", 30L);
        }
        catch (Throwable throwable) {
            object = throwable;
            throw throwable;
        }
        finally {
            if (driver != null) {
                if (object != null) {
                    try {
                        driver.close();
                    }
                    catch (Throwable throwable) {
                        ((Throwable)object).addSuppressed(throwable);
                    }
                } else {
                    driver.close();
                }
            }
        }
        HashMap<Long, ValueAndTimestamp> actual = new HashMap<Long, ValueAndTimestamp>();
        for (KeyValueTimestamp keyValueTimestamp : supplier.theCapturedProcessor().processed()) {
            ValueAndTimestamp valueAndTimestamp;
            Windowed window = (Windowed)keyValueTimestamp.key();
            Long start = window.window().start();
            if (actual.putIfAbsent(start, valueAndTimestamp = ValueAndTimestamp.make(keyValueTimestamp.value(), (long)keyValueTimestamp.timestamp())) == null) continue;
            actual.replace(start, valueAndTimestamp);
        }
        HashMap<Long, ValueAndTimestamp> expected = new HashMap<Long, ValueAndTimestamp>();
        expected.put(0L, ValueAndTimestamp.make((Object)"1", (long)10L));
        expected.put(4L, ValueAndTimestamp.make((Object)"1+2", (long)14L));
        expected.put(5L, ValueAndTimestamp.make((Object)"1+2+3", (long)15L));
        expected.put(11L, ValueAndTimestamp.make((Object)"2+3", (long)15L));
        expected.put(12L, ValueAndTimestamp.make((Object)"2+3+4", (long)22L));
        expected.put(15L, ValueAndTimestamp.make((Object)"3+4", (long)22L));
        expected.put(16L, ValueAndTimestamp.make((Object)"4+5", (long)26L));
        expected.put(20L, ValueAndTimestamp.make((Object)"4+5+6", (long)30L));
        expected.put(23L, ValueAndTimestamp.make((Object)"5+6", (long)30L));
        expected.put(27L, ValueAndTimestamp.make((Object)"6", (long)30L));
        Assert.assertEquals(expected, actual);
    }

    @Test
    public void testAggregateLargeInput() {
        StreamsBuilder builder = new StreamsBuilder();
        String topic1 = "topic1";
        InOrderMemoryWindowStoreSupplier storeSupplier = this.inOrderIterator ? new InOrderMemoryWindowStoreSupplier("InOrder", 50000L, 10L, false) : Stores.inMemoryWindowStore((String)"Reverse", (Duration)Duration.ofMillis(50000L), (Duration)Duration.ofMillis(10L), (boolean)false);
        KTable table2 = builder.stream("topic1", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String())).groupByKey(Grouped.with((Serde)Serdes.String(), (Serde)Serdes.String())).windowedBy(SlidingWindows.withTimeDifferenceAndGrace((Duration)Duration.ofMillis(10L), (Duration)Duration.ofMillis(50L))).aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, Materialized.as((WindowBytesStoreSupplier)storeSupplier));
        MockProcessorSupplier supplier = new MockProcessorSupplier();
        table2.toStream().process(supplier, new String[0]);
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), this.props);){
            TestInputTopic inputTopic1 = driver.createInputTopic("topic1", (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
            inputTopic1.pipeInput((Object)"A", (Object)"1", 10L);
            inputTopic1.pipeInput((Object)"A", (Object)"2", 20L);
            inputTopic1.pipeInput((Object)"A", (Object)"3", 22L);
            inputTopic1.pipeInput((Object)"A", (Object)"4", 15L);
            inputTopic1.pipeInput((Object)"B", (Object)"1", 12L);
            inputTopic1.pipeInput((Object)"B", (Object)"2", 13L);
            inputTopic1.pipeInput((Object)"B", (Object)"3", 18L);
            inputTopic1.pipeInput((Object)"B", (Object)"4", 19L);
            inputTopic1.pipeInput((Object)"B", (Object)"5", 25L);
            inputTopic1.pipeInput((Object)"B", (Object)"6", 14L);
            inputTopic1.pipeInput((Object)"C", (Object)"1", 11L);
            inputTopic1.pipeInput((Object)"C", (Object)"2", 15L);
            inputTopic1.pipeInput((Object)"C", (Object)"3", 16L);
            inputTopic1.pipeInput((Object)"C", (Object)"4", 21L);
            inputTopic1.pipeInput((Object)"C", (Object)"5", 23L);
            inputTopic1.pipeInput((Object)"D", (Object)"4", 11L);
            inputTopic1.pipeInput((Object)"D", (Object)"2", 12L);
            inputTopic1.pipeInput((Object)"D", (Object)"3", 29L);
            inputTopic1.pipeInput((Object)"D", (Object)"5", 16L);
        }
        Comparator<KeyValueTimestamp> comparator = Comparator.comparing(o -> (String)((Windowed)o.key()).key()).thenComparing(o -> ((Windowed)o.key()).window().start());
        ArrayList actual = supplier.theCapturedProcessor().processed();
        actual.sort(comparator);
        Assert.assertEquals(Arrays.asList(new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(0L, 10L)), "0+1", 10L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(5L, 15L)), "0+1+4", 15L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(10L, 20L)), "0+1+2", 20L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(10L, 20L)), "0+1+2+4", 20L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(11L, 21L)), "0+2", 20L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(11L, 21L)), "0+2+4", 20L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(12L, 22L)), "0+2+3", 22L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(12L, 22L)), "0+2+3+4", 22L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(16L, 26L)), "0+2+3", 22L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(21L, 31L)), "0+3", 22L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(2L, 12L)), "0+1", 12L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(3L, 13L)), "0+1+2", 13L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(4L, 14L)), "0+1+2+6", 14L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(8L, 18L)), "0+1+2+3", 18L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(8L, 18L)), "0+1+2+3+6", 18L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(9L, 19L)), "0+1+2+3+4", 19L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(9L, 19L)), "0+1+2+3+4+6", 19L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(13L, 23L)), "0+2", 13L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(13L, 23L)), "0+2+3", 18L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(13L, 23L)), "0+2+3+4", 19L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(13L, 23L)), "0+2+3+4+6", 19L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(14L, 24L)), "0+3", 18L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(14L, 24L)), "0+3+4", 19L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(14L, 24L)), "0+3+4+6", 19L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(15L, 25L)), "0+3+4+5", 25L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(19L, 29L)), "0+4", 19L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(19L, 29L)), "0+4+5", 25L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(20L, 30L)), "0+5", 25L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"C", (Window)new TimeWindow(1L, 11L)), "0+1", 11L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"C", (Window)new TimeWindow(5L, 15L)), "0+1+2", 15L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"C", (Window)new TimeWindow(6L, 16L)), "0+1+2+3", 16L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"C", (Window)new TimeWindow(11L, 21L)), "0+1+2+3+4", 21L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"C", (Window)new TimeWindow(12L, 22L)), "0+2", 15L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"C", (Window)new TimeWindow(12L, 22L)), "0+2+3", 16L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"C", (Window)new TimeWindow(12L, 22L)), "0+2+3+4", 21L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"C", (Window)new TimeWindow(13L, 23L)), "0+2+3+4+5", 23L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"C", (Window)new TimeWindow(16L, 26L)), "0+3", 16L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"C", (Window)new TimeWindow(16L, 26L)), "0+3+4", 21L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"C", (Window)new TimeWindow(16L, 26L)), "0+3+4+5", 23L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"C", (Window)new TimeWindow(17L, 27L)), "0+4", 21L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"C", (Window)new TimeWindow(17L, 27L)), "0+4+5", 23L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"C", (Window)new TimeWindow(22L, 32L)), "0+5", 23L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"D", (Window)new TimeWindow(1L, 11L)), "0+4", 11L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"D", (Window)new TimeWindow(2L, 12L)), "0+4+2", 12L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"D", (Window)new TimeWindow(6L, 16L)), "0+4+2+5", 16L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"D", (Window)new TimeWindow(12L, 22L)), "0+2", 12L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"D", (Window)new TimeWindow(12L, 22L)), "0+2+5", 16L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"D", (Window)new TimeWindow(13L, 23L)), "0+5", 16L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"D", (Window)new TimeWindow(19L, 29L)), "0+3", 29L)), actual);
    }

    @Test
    public void testJoin() {
        StreamsBuilder builder = new StreamsBuilder();
        String topic1 = "topic1";
        String topic2 = "topic2";
        InOrderMemoryWindowStoreSupplier storeSupplier1 = this.inOrderIterator ? new InOrderMemoryWindowStoreSupplier("InOrder1", 50000L, 10L, false) : Stores.inMemoryWindowStore((String)"Reverse1", (Duration)Duration.ofMillis(50000L), (Duration)Duration.ofMillis(10L), (boolean)false);
        InOrderMemoryWindowStoreSupplier storeSupplier2 = this.inOrderIterator ? new InOrderMemoryWindowStoreSupplier("InOrder2", 50000L, 10L, false) : Stores.inMemoryWindowStore((String)"Reverse2", (Duration)Duration.ofMillis(50000L), (Duration)Duration.ofMillis(10L), (boolean)false);
        KTable table1 = builder.stream("topic1", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String())).groupByKey(Grouped.with((Serde)Serdes.String(), (Serde)Serdes.String())).windowedBy(SlidingWindows.withTimeDifferenceAndGrace((Duration)Duration.ofMillis(10L), (Duration)Duration.ofMillis(100L))).aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, Materialized.as((WindowBytesStoreSupplier)storeSupplier1));
        KTable table2 = builder.stream("topic2", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String())).groupByKey(Grouped.with((Serde)Serdes.String(), (Serde)Serdes.String())).windowedBy(SlidingWindows.withTimeDifferenceAndGrace((Duration)Duration.ofMillis(10L), (Duration)Duration.ofMillis(100L))).aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, Materialized.as((WindowBytesStoreSupplier)storeSupplier2));
        MockProcessorSupplier supplier = new MockProcessorSupplier();
        table1.toStream().process(supplier, new String[0]);
        table2.toStream().process(supplier, new String[0]);
        table1.join(table2, (p1, p2) -> p1 + "%" + p2).toStream().process(supplier, new String[0]);
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), this.props);){
            TestInputTopic inputTopic1 = driver.createInputTopic("topic1", (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
            TestInputTopic inputTopic2 = driver.createInputTopic("topic2", (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
            inputTopic1.pipeInput((Object)"A", (Object)"1", 10L);
            inputTopic1.pipeInput((Object)"B", (Object)"2", 11L);
            inputTopic1.pipeInput((Object)"C", (Object)"3", 12L);
            List processors = supplier.capturedProcessors(3);
            processors.get(0).checkAndClearProcessResult(new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(0L, 10L)), "0+1", 10L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(1L, 11L)), "0+2", 11L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"C", (Window)new TimeWindow(2L, 12L)), "0+3", 12L));
            processors.get(1).checkAndClearProcessResult(new KeyValueTimestamp[0]);
            processors.get(2).checkAndClearProcessResult(new KeyValueTimestamp[0]);
            inputTopic1.pipeInput((Object)"A", (Object)"1", 15L);
            inputTopic1.pipeInput((Object)"B", (Object)"2", 16L);
            inputTopic1.pipeInput((Object)"C", (Object)"3", 19L);
            processors.get(0).checkAndClearProcessResult(new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(11L, 21L)), "0+1", 15L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(5L, 15L)), "0+1+1", 15L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(12L, 22L)), "0+2", 16L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(6L, 16L)), "0+2+2", 16L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"C", (Window)new TimeWindow(13L, 23L)), "0+3", 19L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"C", (Window)new TimeWindow(9L, 19L)), "0+3+3", 19L));
            processors.get(1).checkAndClearProcessResult(new KeyValueTimestamp[0]);
            processors.get(2).checkAndClearProcessResult(new KeyValueTimestamp[0]);
            inputTopic2.pipeInput((Object)"A", (Object)"a", 10L);
            inputTopic2.pipeInput((Object)"B", (Object)"b", 30L);
            inputTopic2.pipeInput((Object)"C", (Object)"c", 12L);
            inputTopic2.pipeInput((Object)"C", (Object)"c", 35L);
            processors.get(0).checkAndClearProcessResult(new KeyValueTimestamp[0]);
            processors.get(1).checkAndClearProcessResult(new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(0L, 10L)), "0+a", 10L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(20L, 30L)), "0+b", 30L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"C", (Window)new TimeWindow(2L, 12L)), "0+c", 12L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"C", (Window)new TimeWindow(25L, 35L)), "0+c", 35L));
            processors.get(2).checkAndClearProcessResult(new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(0L, 10L)), "0+1%0+a", 10L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"C", (Window)new TimeWindow(2L, 12L)), "0+3%0+c", 12L));
            inputTopic2.pipeInput((Object)"A", (Object)"a", 15L);
            inputTopic2.pipeInput((Object)"B", (Object)"b", 16L);
            inputTopic2.pipeInput((Object)"C", (Object)"c", 17L);
            processors.get(0).checkAndClearProcessResult(new KeyValueTimestamp[0]);
            processors.get(1).checkAndClearProcessResult(new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(11L, 21L)), "0+a", 15L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(5L, 15L)), "0+a+a", 15L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(6L, 16L)), "0+b", 16L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"C", (Window)new TimeWindow(13L, 23L)), "0+c", 17L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"C", (Window)new TimeWindow(7L, 17L)), "0+c+c", 17L));
            processors.get(2).checkAndClearProcessResult(new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(11L, 21L)), "0+1%0+a", 15L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(5L, 15L)), "0+1+1%0+a+a", 15L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(6L, 16L)), "0+2+2%0+b", 16L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"C", (Window)new TimeWindow(13L, 23L)), "0+3%0+c", 19L));
        }
    }

    @Test
    public void testEarlyRecordsSmallInput() {
        StreamsBuilder builder = new StreamsBuilder();
        String topic = "topic";
        KTable table2 = builder.stream("topic", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String())).groupByKey(Grouped.with((Serde)Serdes.String(), (Serde)Serdes.String())).windowedBy(SlidingWindows.withTimeDifferenceAndGrace((Duration)Duration.ofMillis(50L), (Duration)Duration.ofMillis(200L))).aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, Materialized.as((String)"topic-Canonized").withValueSerde(Serdes.String()));
        MockProcessorSupplier supplier = new MockProcessorSupplier();
        table2.toStream().process(supplier, new String[0]);
        TopologyTestDriver driver = new TopologyTestDriver(builder.build(), this.props);
        Object object = null;
        try {
            TestInputTopic testInputTopic = driver.createInputTopic("topic", (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
            testInputTopic.pipeInput((Object)"A", (Object)"1", 0L);
            testInputTopic.pipeInput((Object)"A", (Object)"2", 5L);
            testInputTopic.pipeInput((Object)"A", (Object)"3", 6L);
            testInputTopic.pipeInput((Object)"A", (Object)"4", 3L);
            testInputTopic.pipeInput((Object)"A", (Object)"5", 13L);
            testInputTopic.pipeInput((Object)"A", (Object)"6", 10L);
        }
        catch (Throwable throwable) {
            object = throwable;
            throw throwable;
        }
        finally {
            if (driver != null) {
                if (object != null) {
                    try {
                        driver.close();
                    }
                    catch (Throwable throwable) {
                        ((Throwable)object).addSuppressed(throwable);
                    }
                } else {
                    driver.close();
                }
            }
        }
        HashMap<Long, ValueAndTimestamp> actual = new HashMap<Long, ValueAndTimestamp>();
        for (KeyValueTimestamp keyValueTimestamp : supplier.theCapturedProcessor().processed()) {
            ValueAndTimestamp valueAndTimestamp;
            Windowed window = (Windowed)keyValueTimestamp.key();
            Long start = window.window().start();
            if (actual.putIfAbsent(start, valueAndTimestamp = ValueAndTimestamp.make(keyValueTimestamp.value(), (long)keyValueTimestamp.timestamp())) == null) continue;
            actual.replace(start, valueAndTimestamp);
        }
        HashMap<Long, ValueAndTimestamp> expected = new HashMap<Long, ValueAndTimestamp>();
        expected.put(0L, ValueAndTimestamp.make((Object)"0+1+2+3+4+5+6", (long)13L));
        expected.put(1L, ValueAndTimestamp.make((Object)"0+2+3+4+5+6", (long)13L));
        expected.put(4L, ValueAndTimestamp.make((Object)"0+2+3+5+6", (long)13L));
        expected.put(6L, ValueAndTimestamp.make((Object)"0+3+5+6", (long)13L));
        expected.put(7L, ValueAndTimestamp.make((Object)"0+5+6", (long)13L));
        expected.put(11L, ValueAndTimestamp.make((Object)"0+5", (long)13L));
        Assert.assertEquals(expected, actual);
    }

    @Test
    public void testEarlyRecordsRepeatedInput() {
        StreamsBuilder builder = new StreamsBuilder();
        String topic = "topic";
        KTable table2 = builder.stream("topic", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String())).groupByKey(Grouped.with((Serde)Serdes.String(), (Serde)Serdes.String())).windowedBy(SlidingWindows.withTimeDifferenceAndGrace((Duration)Duration.ofMillis(5L), (Duration)Duration.ofMillis(20L))).aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, Materialized.as((String)"topic-Canonized").withValueSerde(Serdes.String()));
        MockProcessorSupplier supplier = new MockProcessorSupplier();
        table2.toStream().process(supplier, new String[0]);
        TopologyTestDriver driver = new TopologyTestDriver(builder.build(), this.props);
        Object object = null;
        try {
            TestInputTopic testInputTopic = driver.createInputTopic("topic", (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
            testInputTopic.pipeInput((Object)"A", (Object)"1", 0L);
            testInputTopic.pipeInput((Object)"A", (Object)"2", 2L);
            testInputTopic.pipeInput((Object)"A", (Object)"3", 4L);
            testInputTopic.pipeInput((Object)"A", (Object)"4", 0L);
            testInputTopic.pipeInput((Object)"A", (Object)"5", 2L);
            testInputTopic.pipeInput((Object)"A", (Object)"6", 2L);
            testInputTopic.pipeInput((Object)"A", (Object)"7", 0L);
        }
        catch (Throwable throwable) {
            object = throwable;
            throw throwable;
        }
        finally {
            if (driver != null) {
                if (object != null) {
                    try {
                        driver.close();
                    }
                    catch (Throwable throwable) {
                        ((Throwable)object).addSuppressed(throwable);
                    }
                } else {
                    driver.close();
                }
            }
        }
        HashMap<Long, ValueAndTimestamp> actual = new HashMap<Long, ValueAndTimestamp>();
        for (KeyValueTimestamp keyValueTimestamp : supplier.theCapturedProcessor().processed()) {
            ValueAndTimestamp valueAndTimestamp;
            Windowed window = (Windowed)keyValueTimestamp.key();
            Long start = window.window().start();
            if (actual.putIfAbsent(start, valueAndTimestamp = ValueAndTimestamp.make(keyValueTimestamp.value(), (long)keyValueTimestamp.timestamp())) == null) continue;
            actual.replace(start, valueAndTimestamp);
        }
        HashMap<Long, ValueAndTimestamp> expected = new HashMap<Long, ValueAndTimestamp>();
        expected.put(0L, ValueAndTimestamp.make((Object)"0+1+2+3+4+5+6+7", (long)4L));
        expected.put(1L, ValueAndTimestamp.make((Object)"0+2+3+5+6", (long)4L));
        expected.put(3L, ValueAndTimestamp.make((Object)"0+3", (long)4L));
        Assert.assertEquals(expected, actual);
    }

    @Test
    public void testEarlyRecordsLargeInput() {
        StreamsBuilder builder = new StreamsBuilder();
        String topic = "topic";
        InOrderMemoryWindowStoreSupplier storeSupplier = this.inOrderIterator ? new InOrderMemoryWindowStoreSupplier("InOrder", 50000L, 10L, false) : Stores.inMemoryWindowStore((String)"Reverse", (Duration)Duration.ofMillis(50000L), (Duration)Duration.ofMillis(10L), (boolean)false);
        KTable table2 = builder.stream("topic", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String())).groupByKey(Grouped.with((Serde)Serdes.String(), (Serde)Serdes.String())).windowedBy(SlidingWindows.withTimeDifferenceAndGrace((Duration)Duration.ofMillis(10L), (Duration)Duration.ofMillis(50L))).aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, Materialized.as((WindowBytesStoreSupplier)storeSupplier));
        MockProcessorSupplier supplier = new MockProcessorSupplier();
        table2.toStream().process(supplier, new String[0]);
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), this.props);){
            TestInputTopic inputTopic1 = driver.createInputTopic("topic", (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
            inputTopic1.pipeInput((Object)"E", (Object)"1", 0L);
            inputTopic1.pipeInput((Object)"E", (Object)"3", 5L);
            inputTopic1.pipeInput((Object)"E", (Object)"4", 6L);
            inputTopic1.pipeInput((Object)"E", (Object)"2", 3L);
            inputTopic1.pipeInput((Object)"E", (Object)"6", 13L);
            inputTopic1.pipeInput((Object)"E", (Object)"5", 10L);
            inputTopic1.pipeInput((Object)"E", (Object)"7", 4L);
            inputTopic1.pipeInput((Object)"E", (Object)"8", 2L);
            inputTopic1.pipeInput((Object)"E", (Object)"9", 15L);
        }
        Comparator<KeyValueTimestamp> comparator = Comparator.comparing(o -> (String)((Windowed)o.key()).key()).thenComparing(o -> ((Windowed)o.key()).window().start());
        ArrayList actual = supplier.theCapturedProcessor().processed();
        actual.sort(comparator);
        Assert.assertEquals(Arrays.asList(new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"E", (Window)new TimeWindow(0L, 10L)), "0+1", 0L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"E", (Window)new TimeWindow(0L, 10L)), "0+1+3", 5L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"E", (Window)new TimeWindow(0L, 10L)), "0+1+3+4", 6L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"E", (Window)new TimeWindow(0L, 10L)), "0+1+3+4+2", 6L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"E", (Window)new TimeWindow(0L, 10L)), "0+1+3+4+2+5", 10L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"E", (Window)new TimeWindow(0L, 10L)), "0+1+3+4+2+5+7", 10L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"E", (Window)new TimeWindow(0L, 10L)), "0+1+3+4+2+5+7+8", 10L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"E", (Window)new TimeWindow(1L, 11L)), "0+3", 5L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"E", (Window)new TimeWindow(1L, 11L)), "0+3+4", 6L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"E", (Window)new TimeWindow(1L, 11L)), "0+3+4+2", 6L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"E", (Window)new TimeWindow(1L, 11L)), "0+3+4+2+5", 10L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"E", (Window)new TimeWindow(1L, 11L)), "0+3+4+2+5+7", 10L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"E", (Window)new TimeWindow(1L, 11L)), "0+3+4+2+5+7+8", 10L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"E", (Window)new TimeWindow(3L, 13L)), "0+3+4+2+6", 13L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"E", (Window)new TimeWindow(3L, 13L)), "0+3+4+2+6+5", 13L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"E", (Window)new TimeWindow(3L, 13L)), "0+3+4+2+6+5+7", 13L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"E", (Window)new TimeWindow(4L, 14L)), "0+3+4", 6L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"E", (Window)new TimeWindow(4L, 14L)), "0+3+4+6", 13L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"E", (Window)new TimeWindow(4L, 14L)), "0+3+4+6+5", 13L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"E", (Window)new TimeWindow(4L, 14L)), "0+3+4+6+5+7", 13L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"E", (Window)new TimeWindow(5L, 15L)), "0+3+4+6+5", 13L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"E", (Window)new TimeWindow(5L, 15L)), "0+3+4+6+5+9", 15L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"E", (Window)new TimeWindow(6L, 16L)), "0+4", 6L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"E", (Window)new TimeWindow(6L, 16L)), "0+4+6", 13L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"E", (Window)new TimeWindow(6L, 16L)), "0+4+6+5", 13L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"E", (Window)new TimeWindow(6L, 16L)), "0+4+6+5+9", 15L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"E", (Window)new TimeWindow(7L, 17L)), "0+6", 13L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"E", (Window)new TimeWindow(7L, 17L)), "0+6+5", 13L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"E", (Window)new TimeWindow(7L, 17L)), "0+6+5+9", 15L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"E", (Window)new TimeWindow(11L, 21L)), "0+6", 13L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"E", (Window)new TimeWindow(11L, 21L)), "0+6+9", 15L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"E", (Window)new TimeWindow(14L, 24L)), "0+9", 15L)), actual);
    }

    @Test
    public void shouldLogAndMeterWhenSkippingNullKey() {
        String builtInMetricsVersion = "latest";
        StreamsBuilder builder = new StreamsBuilder();
        String topic = "topic";
        builder.stream("topic", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String())).groupByKey(Grouped.with((Serde)Serdes.String(), (Serde)Serdes.String())).windowedBy(SlidingWindows.ofTimeDifferenceAndGrace((Duration)Duration.ofMillis(10L), (Duration)Duration.ofMillis(100L))).aggregate(MockInitializer.STRING_INIT, MockAggregator.toStringInstance("+"), Materialized.as((String)"topic1-Canonicalized").withValueSerde(Serdes.String()));
        this.props.setProperty("built.in.metrics.version", "latest");
        try (LogCaptureAppender appender = LogCaptureAppender.createAndRegister(KStreamSlidingWindowAggregate.class);
             TopologyTestDriver driver = new TopologyTestDriver(builder.build(), this.props);){
            TestInputTopic inputTopic = driver.createInputTopic("topic", (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
            inputTopic.pipeInput(null, (Object)"1");
            MatcherAssert.assertThat(appender.getEvents().stream().filter(e -> e.getLevel().equals("WARN")).map(LogCaptureAppender.Event::getMessage).collect(Collectors.toList()), (Matcher)CoreMatchers.hasItem((Object)"Skipping record due to null key or value. topic=[topic] partition=[0] offset=[0]"));
        }
    }

    @Test
    public void shouldLogAndMeterWhenSkippingExpiredWindowByGrace() {
        String builtInMetricsVersion = "latest";
        StreamsBuilder builder = new StreamsBuilder();
        String topic = "topic";
        InOrderMemoryWindowStoreSupplier storeSupplier = this.inOrderIterator ? new InOrderMemoryWindowStoreSupplier("InOrder", 50000L, 10L, false) : Stores.inMemoryWindowStore((String)"Reverse", (Duration)Duration.ofMillis(50000L), (Duration)Duration.ofMillis(10L), (boolean)false);
        KStream stream1 = builder.stream("topic", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String()));
        stream1.groupByKey(Grouped.with((Serde)Serdes.String(), (Serde)Serdes.String())).windowedBy(SlidingWindows.ofTimeDifferenceAndGrace((Duration)Duration.ofMillis(10L), (Duration)Duration.ofMillis(90L))).aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, Materialized.as((WindowBytesStoreSupplier)storeSupplier)).toStream().to("output");
        this.props.setProperty("built.in.metrics.version", "latest");
        try (LogCaptureAppender appender = LogCaptureAppender.createAndRegister(KStreamSlidingWindowAggregate.class);
             TopologyTestDriver driver = new TopologyTestDriver(builder.build(), this.props);){
            TestInputTopic inputTopic = driver.createInputTopic("topic", (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
            inputTopic.pipeInput((Object)"k", (Object)"100", 200L);
            inputTopic.pipeInput((Object)"k", (Object)"0", 100L);
            inputTopic.pipeInput((Object)"k", (Object)"1", 101L);
            inputTopic.pipeInput((Object)"k", (Object)"2", 102L);
            inputTopic.pipeInput((Object)"k", (Object)"3", 103L);
            inputTopic.pipeInput((Object)"k", (Object)"4", 104L);
            inputTopic.pipeInput((Object)"k", (Object)"5", 105L);
            inputTopic.pipeInput((Object)"k", (Object)"6", 15L);
            this.assertLatenessMetrics(driver, (Matcher<Object>)CoreMatchers.is((Object)7.0), (Matcher<Object>)CoreMatchers.is((Object)185.0), (Matcher<Object>)CoreMatchers.is((Object)96.25));
            MatcherAssert.assertThat(appender.getMessages(), (Matcher)CoreMatchers.hasItems((Object[])new String[]{"Skipping record for expired window. topic=[topic] partition=[0] offset=[1] timestamp=[100] window=[90,100] expiration=[110] streamTime=[200]", "Skipping record for expired window. topic=[topic] partition=[0] offset=[2] timestamp=[101] window=[91,101] expiration=[110] streamTime=[200]", "Skipping record for expired window. topic=[topic] partition=[0] offset=[3] timestamp=[102] window=[92,102] expiration=[110] streamTime=[200]", "Skipping record for expired window. topic=[topic] partition=[0] offset=[4] timestamp=[103] window=[93,103] expiration=[110] streamTime=[200]", "Skipping record for expired window. topic=[topic] partition=[0] offset=[5] timestamp=[104] window=[94,104] expiration=[110] streamTime=[200]", "Skipping record for expired window. topic=[topic] partition=[0] offset=[6] timestamp=[105] window=[95,105] expiration=[110] streamTime=[200]", "Skipping record for expired window. topic=[topic] partition=[0] offset=[7] timestamp=[15] window=[5,15] expiration=[110] streamTime=[200]"}));
            TestOutputTopic outputTopic = driver.createOutputTopic("output", (Deserializer)new TimeWindowedDeserializer((Deserializer)new StringDeserializer(), Long.valueOf(10L)), (Deserializer)new StringDeserializer());
            MatcherAssert.assertThat((Object)outputTopic.readRecord(), (Matcher)CoreMatchers.equalTo((Object)new TestRecord((Object)new Windowed((Object)"k", (Window)new TimeWindow(190L, 200L)), (Object)"0+100", null, Long.valueOf(200L))));
            Assert.assertTrue((boolean)outputTopic.isEmpty());
        }
    }

    @Test
    public void testAggregateRandomInput() {
        StreamsBuilder builder = new StreamsBuilder();
        String topic1 = "topic1";
        InOrderMemoryWindowStoreSupplier storeSupplier = this.inOrderIterator ? new InOrderMemoryWindowStoreSupplier("InOrder", 50000L, 10L, false) : Stores.inMemoryWindowStore((String)"Reverse", (Duration)Duration.ofMillis(50000L), (Duration)Duration.ofMillis(10L), (boolean)false);
        KTable table = builder.stream("topic1", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String())).groupByKey(Grouped.with((Serde)Serdes.String(), (Serde)Serdes.String())).windowedBy(SlidingWindows.ofTimeDifferenceAndGrace((Duration)Duration.ofMillis(10L), (Duration)Duration.ofMillis(10000L))).aggregate(() -> "", (key, value, aggregate) -> {
            aggregate = aggregate + value;
            char[] ch = aggregate.toCharArray();
            Arrays.sort(ch);
            aggregate = String.valueOf(ch);
            return aggregate;
        }, Materialized.as((WindowBytesStoreSupplier)storeSupplier));
        MockProcessorSupplier supplier = new MockProcessorSupplier();
        table.toStream().process(supplier, new String[0]);
        long seed = new Random().nextLong();
        Random shuffle = new Random(seed);
        try {
            List<ValueAndTimestamp> input = Arrays.asList(ValueAndTimestamp.make((Object)"A", (long)10L), ValueAndTimestamp.make((Object)"B", (long)15L), ValueAndTimestamp.make((Object)"C", (long)16L), ValueAndTimestamp.make((Object)"D", (long)18L), ValueAndTimestamp.make((Object)"E", (long)30L), ValueAndTimestamp.make((Object)"F", (long)40L), ValueAndTimestamp.make((Object)"G", (long)55L), ValueAndTimestamp.make((Object)"H", (long)56L), ValueAndTimestamp.make((Object)"I", (long)58L), ValueAndTimestamp.make((Object)"J", (long)58L), ValueAndTimestamp.make((Object)"K", (long)62L), ValueAndTimestamp.make((Object)"L", (long)63L), ValueAndTimestamp.make((Object)"M", (long)63L), ValueAndTimestamp.make((Object)"N", (long)63L), ValueAndTimestamp.make((Object)"O", (long)76L), ValueAndTimestamp.make((Object)"P", (long)77L), ValueAndTimestamp.make((Object)"Q", (long)80L), ValueAndTimestamp.make((Object)"R", (long)2L), ValueAndTimestamp.make((Object)"S", (long)3L), ValueAndTimestamp.make((Object)"T", (long)5L), ValueAndTimestamp.make((Object)"U", (long)8L));
            Collections.shuffle(input, shuffle);
            TopologyTestDriver driver = new TopologyTestDriver(builder.build(), this.props);
            Object object = null;
            try {
                TestInputTopic testInputTopic = driver.createInputTopic("topic1", (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
                for (int i = 0; i < input.size(); ++i) {
                    testInputTopic.pipeInput((Object)"A", input.get(i).value(), input.get(i).timestamp());
                }
            }
            catch (Throwable throwable) {
                object = throwable;
                throw throwable;
            }
            finally {
                if (driver != null) {
                    if (object != null) {
                        try {
                            driver.close();
                        }
                        catch (Throwable throwable) {
                            ((Throwable)object).addSuppressed(throwable);
                        }
                    } else {
                        driver.close();
                    }
                }
            }
            HashMap<Long, ValueAndTimestamp<String>> results = new HashMap<Long, ValueAndTimestamp<String>>();
            for (KeyValueTimestamp keyValueTimestamp : supplier.theCapturedProcessor().processed()) {
                ValueAndTimestamp valueAndTimestamp;
                Windowed window = (Windowed)keyValueTimestamp.key();
                Long start = window.window().start();
                if (results.putIfAbsent(start, (ValueAndTimestamp<String>)(valueAndTimestamp = ValueAndTimestamp.make(keyValueTimestamp.value(), (long)keyValueTimestamp.timestamp()))) == null) continue;
                results.replace(start, (ValueAndTimestamp<String>)valueAndTimestamp);
            }
            this.verifyRandomTestResults(results);
        }
        catch (AssertionError t) {
            throw new AssertionError("Assertion failed in randomized test. Reproduce with seed: " + seed + ".", (Throwable)((Object)t));
        }
        catch (Throwable t) {
            StringBuilder sb = new StringBuilder().append("Exception in randomized scenario. Reproduce with seed: ").append(seed).append(".");
            throw new AssertionError(sb.toString(), t);
        }
    }

    private void verifyRandomTestResults(Map<Long, ValueAndTimestamp<String>> actual) {
        HashMap<Long, ValueAndTimestamp> expected = new HashMap<Long, ValueAndTimestamp>();
        expected.put(0L, ValueAndTimestamp.make((Object)"ARSTU", (long)10L));
        expected.put(3L, ValueAndTimestamp.make((Object)"ASTU", (long)10L));
        expected.put(4L, ValueAndTimestamp.make((Object)"ATU", (long)10L));
        expected.put(5L, ValueAndTimestamp.make((Object)"ABTU", (long)15L));
        expected.put(6L, ValueAndTimestamp.make((Object)"ABCU", (long)16L));
        expected.put(8L, ValueAndTimestamp.make((Object)"ABCDU", (long)18L));
        expected.put(9L, ValueAndTimestamp.make((Object)"ABCD", (long)18L));
        expected.put(11L, ValueAndTimestamp.make((Object)"BCD", (long)18L));
        expected.put(16L, ValueAndTimestamp.make((Object)"CD", (long)18L));
        expected.put(17L, ValueAndTimestamp.make((Object)"D", (long)18L));
        expected.put(20L, ValueAndTimestamp.make((Object)"E", (long)30L));
        expected.put(30L, ValueAndTimestamp.make((Object)"EF", (long)40L));
        expected.put(31L, ValueAndTimestamp.make((Object)"F", (long)40L));
        expected.put(45L, ValueAndTimestamp.make((Object)"G", (long)55L));
        expected.put(46L, ValueAndTimestamp.make((Object)"GH", (long)56L));
        expected.put(48L, ValueAndTimestamp.make((Object)"GHIJ", (long)58L));
        expected.put(52L, ValueAndTimestamp.make((Object)"GHIJK", (long)62L));
        expected.put(53L, ValueAndTimestamp.make((Object)"GHIJKLMN", (long)63L));
        expected.put(56L, ValueAndTimestamp.make((Object)"HIJKLMN", (long)63L));
        expected.put(57L, ValueAndTimestamp.make((Object)"IJKLMN", (long)63L));
        expected.put(59L, ValueAndTimestamp.make((Object)"KLMN", (long)63L));
        expected.put(63L, ValueAndTimestamp.make((Object)"LMN", (long)63L));
        expected.put(66L, ValueAndTimestamp.make((Object)"O", (long)76L));
        expected.put(67L, ValueAndTimestamp.make((Object)"OP", (long)77L));
        expected.put(70L, ValueAndTimestamp.make((Object)"OPQ", (long)80L));
        expected.put(77L, ValueAndTimestamp.make((Object)"PQ", (long)80L));
        expected.put(78L, ValueAndTimestamp.make((Object)"Q", (long)80L));
        Assert.assertEquals(expected, actual);
    }

    private void assertLatenessMetrics(TopologyTestDriver driver, Matcher<Object> dropTotal, Matcher<Object> maxLateness, Matcher<Object> avgLateness) {
        MetricName dropTotalMetric = new MetricName("dropped-records-total", "stream-task-metrics", "The total number of dropped records", Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"thread-id", (Object)this.threadId), Utils.mkEntry((Object)"task-id", (Object)"0_0")}));
        MetricName dropRateMetric = new MetricName("dropped-records-rate", "stream-task-metrics", "The average number of dropped records per second", Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"thread-id", (Object)this.threadId), Utils.mkEntry((Object)"task-id", (Object)"0_0")}));
        MetricName latenessMaxMetric = new MetricName("record-lateness-max", "stream-task-metrics", "The observed maximum lateness of records in milliseconds, measured by comparing the record timestamp with the current stream time", Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"thread-id", (Object)this.threadId), Utils.mkEntry((Object)"task-id", (Object)"0_0")}));
        MetricName latenessAvgMetric = new MetricName("record-lateness-avg", "stream-task-metrics", "The observed average lateness of records in milliseconds, measured by comparing the record timestamp with the current stream time", Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"thread-id", (Object)this.threadId), Utils.mkEntry((Object)"task-id", (Object)"0_0")}));
        MatcherAssert.assertThat((Object)((Metric)driver.metrics().get(dropTotalMetric)).metricValue(), dropTotal);
        MatcherAssert.assertThat((Object)((Metric)driver.metrics().get(dropRateMetric)).metricValue(), (Matcher)CoreMatchers.not((Object)0.0));
        MatcherAssert.assertThat((Object)((Metric)driver.metrics().get(latenessMaxMetric)).metricValue(), maxLateness);
        MatcherAssert.assertThat((Object)((Metric)driver.metrics().get(latenessAvgMetric)).metricValue(), avgLateness);
    }

    private static class InOrderMemoryWindowStoreSupplier
    extends InMemoryWindowBytesStoreSupplier {
        InOrderMemoryWindowStoreSupplier(String name, long retentionPeriod, long windowSize, boolean retainDuplicates) {
            super(name, retentionPeriod, windowSize, retainDuplicates);
        }

        public WindowStore<Bytes, byte[]> get() {
            return new InOrderMemoryWindowStore(this.name(), this.retentionPeriod(), this.windowSize(), this.retainDuplicates(), this.metricsScope());
        }
    }

    private static class InOrderMemoryWindowStore
    extends InMemoryWindowStore {
        InOrderMemoryWindowStore(String name, long retentionPeriod, long windowSize, boolean retainDuplicates, String metricScope) {
            super(name, retentionPeriod, windowSize, retainDuplicates, metricScope);
        }

        public WindowStoreIterator<byte[]> backwardFetch(Bytes key, long timeFrom, long timeTo) {
            throw new UnsupportedOperationException("Backward fetch not supported here");
        }

        public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFetch(Bytes keyFrom, Bytes keyTo, long timeFrom, long timeTo) {
            throw new UnsupportedOperationException("Backward fetch not supported here");
        }

        public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFetchAll(long timeFrom, long timeTo) {
            throw new UnsupportedOperationException("Backward fetch not supported here");
        }

        public KeyValueIterator<Windowed<Bytes>, byte[]> backwardAll() {
            throw new UnsupportedOperationException("Backward fetch not supported here");
        }
    }
}

