package org.apache.kafka.streams.kstream.internals;

import java.io.File;
import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.LogCaptureAppender;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.EmitStrategy;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.UnlimitedWindows;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.api.MockProcessorContext;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.TimestampedWindowStore;
import org.apache.kafka.streams.state.internals.RocksDbIndexedTimeOrderedWindowBytesStoreSupplier;
import org.apache.kafka.streams.test.TestRecord;
import org.apache.kafka.test.MockAggregator;
import org.apache.kafka.test.MockApiProcessor;
import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.MockInitializer;
import org.apache.kafka.test.MockInternalNewProcessorContext;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.class */
public class KStreamWindowAggregateTest {
    private static final String WINDOW_STORE_NAME = "dummy-store-name";
    private final Properties props = StreamsTestUtils.getStreamsConfig((Serde<?>) Serdes.String(), (Serde<?>) Serdes.String());
    private final String threadId = Thread.currentThread().getName();

    @Parameterized.Parameter
    public EmitStrategy.StrategyType type;

    @Parameterized.Parameter(1)
    public boolean withCache;
    private EmitStrategy emitStrategy;
    private boolean emitFinal;

    @Parameterized.Parameters(name = "{0}_cache:{1}")
    public static Collection<Object[]> getEmitStrategy() {
        return Arrays.asList(new Object[]{EmitStrategy.StrategyType.ON_WINDOW_UPDATE, true}, new Object[]{EmitStrategy.StrategyType.ON_WINDOW_UPDATE, false}, new Object[]{EmitStrategy.StrategyType.ON_WINDOW_CLOSE, true}, new Object[]{EmitStrategy.StrategyType.ON_WINDOW_CLOSE, false});
    }

    @Before
    public void before() {
        this.emitFinal = this.type.equals(EmitStrategy.StrategyType.ON_WINDOW_CLOSE);
        this.emitStrategy = EmitStrategy.StrategyType.forType(this.type);
    }

    @Test
    public void testAggBasic() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KTable aggregate = streamsBuilder.stream(AssignmentTestUtils.TP_1_NAME, Consumed.with(Serdes.String(), Serdes.String())).groupByKey(Grouped.with(Serdes.String(), Serdes.String())).windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofMillis(10L), Duration.ofMillis(100L)).advanceBy(Duration.ofMillis(5L))).emitStrategy(this.emitStrategy).aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, setMaterializedCache(Materialized.as("topic1-Canonized").withValueSerde(Serdes.String())));
        MockApiProcessorSupplier mockApiProcessorSupplier = new MockApiProcessorSupplier();
        aggregate.toStream().process(mockApiProcessorSupplier, new String[0]);
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build(), this.props);
        Throwable th = null;
        try {
            TestInputTopic createInputTopic = topologyTestDriver.createInputTopic(AssignmentTestUtils.TP_1_NAME, new StringSerializer(), new StringSerializer());
            createInputTopic.pipeInput("A", "1", 0L);
            createInputTopic.pipeInput("B", "2", 1L);
            createInputTopic.pipeInput("C", "3", 2L);
            createInputTopic.pipeInput("D", "4", 3L);
            createInputTopic.pipeInput("A", "1", 4L);
            createInputTopic.pipeInput("A", "1", 5L);
            createInputTopic.pipeInput("B", "2", 6L);
            createInputTopic.pipeInput("D", "4", 7L);
            createInputTopic.pipeInput("B", "2", 8L);
            createInputTopic.pipeInput("C", "3", 9L);
            createInputTopic.pipeInput("A", "1", 10L);
            createInputTopic.pipeInput("B", "2", 11L);
            createInputTopic.pipeInput("D", "4", 12L);
            createInputTopic.pipeInput("B", "2", 13L);
            createInputTopic.pipeInput("C", "3", 14L);
            createInputTopic.pipeInput("B", "1", 3L);
            createInputTopic.pipeInput("B", "2", 2L);
            createInputTopic.pipeInput("B", "3", 9L);
            if (topologyTestDriver != null) {
                if (0 != 0) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            if (this.emitFinal) {
                Assert.assertTrue(mockApiProcessorSupplier.theCapturedProcessor().processed().isEmpty());
            } else {
                Assert.assertEquals(Arrays.asList(new KeyValueTimestamp(new Windowed("A", new TimeWindow(0L, 10L)), "0+1", 0L), new KeyValueTimestamp(new Windowed("B", new TimeWindow(0L, 10L)), "0+2", 1L), new KeyValueTimestamp(new Windowed("C", new TimeWindow(0L, 10L)), "0+3", 2L), new KeyValueTimestamp(new Windowed("D", new TimeWindow(0L, 10L)), "0+4", 3L), new KeyValueTimestamp(new Windowed("A", new TimeWindow(0L, 10L)), "0+1+1", 4L), new KeyValueTimestamp(new Windowed("A", new TimeWindow(0L, 10L)), "0+1+1+1", 5L), new KeyValueTimestamp(new Windowed("A", new TimeWindow(5L, 15L)), "0+1", 5L), new KeyValueTimestamp(new Windowed("B", new TimeWindow(0L, 10L)), "0+2+2", 6L), new KeyValueTimestamp(new Windowed("B", new TimeWindow(5L, 15L)), "0+2", 6L), new KeyValueTimestamp(new Windowed("D", new TimeWindow(0L, 10L)), "0+4+4", 7L), new KeyValueTimestamp(new Windowed("D", new TimeWindow(5L, 15L)), "0+4", 7L), new KeyValueTimestamp(new Windowed("B", new TimeWindow(0L, 10L)), "0+2+2+2", 8L), new KeyValueTimestamp(new Windowed("B", new TimeWindow(5L, 15L)), "0+2+2", 8L), new KeyValueTimestamp(new Windowed("C", new TimeWindow(0L, 10L)), "0+3+3", 9L), new KeyValueTimestamp(new Windowed("C", new TimeWindow(5L, 15L)), "0+3", 9L), new KeyValueTimestamp(new Windowed("A", new TimeWindow(5L, 15L)), "0+1+1", 10L), new KeyValueTimestamp(new Windowed("A", new TimeWindow(10L, 20L)), "0+1", 10L), new KeyValueTimestamp(new Windowed("B", new TimeWindow(5L, 15L)), "0+2+2+2", 11L), new KeyValueTimestamp(new Windowed("B", new TimeWindow(10L, 20L)), "0+2", 11L), new KeyValueTimestamp(new Windowed("D", new TimeWindow(5L, 15L)), "0+4+4", 12L), new KeyValueTimestamp(new Windowed("D", new TimeWindow(10L, 20L)), "0+4", 12L), new KeyValueTimestamp(new Windowed("B", new TimeWindow(5L, 15L)), "0+2+2+2+2", 13L), new KeyValueTimestamp(new Windowed("B", new TimeWindow(10L, 20L)), "0+2+2", 13L), new KeyValueTimestamp(new Windowed("C", new TimeWindow(5L, 15L)), "0+3+3", 14L), new KeyValueTimestamp(new Windowed("C", new TimeWindow(10L, 20L)), "0+3", 14L), new KeyValueTimestamp(new Windowed("B", new TimeWindow(0L, 10L)), "0+2+2+2+1", 8L), new KeyValueTimestamp(new Windowed("B", new TimeWindow(0L, 10L)), "0+2+2+2+1+2", 8L), new KeyValueTimestamp(new Windowed("B", new TimeWindow(0L, 10L)), "0+2+2+2+1+2+3", 9L), new KeyValueTimestamp(new Windowed("B", new TimeWindow(5L, 15L)), "0+2+2+2+2+3", 13L)), mockApiProcessorSupplier.theCapturedProcessor().processed());
            }
        } catch (Throwable th3) {
            if (topologyTestDriver != null) {
                if (0 != 0) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testJoin() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        long j = this.emitFinal ? 5L : 100L;
        KTable aggregate = streamsBuilder.stream(AssignmentTestUtils.TP_1_NAME, Consumed.with(Serdes.String(), Serdes.String())).groupByKey(Grouped.with(Serdes.String(), Serdes.String())).windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofMillis(10L), Duration.ofMillis(j)).advanceBy(Duration.ofMillis(5L))).emitStrategy(this.emitStrategy).aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, setMaterializedCache(Materialized.as("topic1-Canonized").withValueSerde(Serdes.String())));
        MockApiProcessorSupplier<Windowed<String>, String, Void, Void> mockApiProcessorSupplier = new MockApiProcessorSupplier<>();
        aggregate.toStream().process(mockApiProcessorSupplier, new String[0]);
        KTable aggregate2 = streamsBuilder.stream(AssignmentTestUtils.TP_2_NAME, Consumed.with(Serdes.String(), Serdes.String())).groupByKey(Grouped.with(Serdes.String(), Serdes.String())).windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofMillis(10L), Duration.ofMillis(j)).advanceBy(Duration.ofMillis(5L))).emitStrategy(this.emitStrategy).aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, setMaterializedCache(Materialized.as("topic2-Canonized").withValueSerde(Serdes.String())));
        aggregate2.toStream().process(mockApiProcessorSupplier, new String[0]);
        aggregate.join(aggregate2, (str, str2) -> {
            return str + "%" + str2;
        }).toStream().process(mockApiProcessorSupplier, new String[0]);
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build(), this.props);
        Throwable th = null;
        try {
            TestInputTopic<String, String> createInputTopic = topologyTestDriver.createInputTopic(AssignmentTestUtils.TP_1_NAME, new StringSerializer(), new StringSerializer());
            TestInputTopic<String, String> createInputTopic2 = topologyTestDriver.createInputTopic(AssignmentTestUtils.TP_2_NAME, new StringSerializer(), new StringSerializer());
            if (this.emitFinal) {
                processEmitFinalJoin(createInputTopic, createInputTopic2, mockApiProcessorSupplier);
            } else {
                processEmitUpdateJoin(createInputTopic, createInputTopic2, mockApiProcessorSupplier);
            }
            if (topologyTestDriver != null) {
                if (0 == 0) {
                    topologyTestDriver.close();
                    return;
                }
                try {
                    topologyTestDriver.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (topologyTestDriver != null) {
                if (0 != 0) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th3;
        }
    }

    private void processEmitFinalJoin(TestInputTopic<String, String> testInputTopic, TestInputTopic<String, String> testInputTopic2, MockApiProcessorSupplier<Windowed<String>, String, Void, Void> mockApiProcessorSupplier) {
        testInputTopic.pipeInput("A", "1", 0L);
        testInputTopic.pipeInput("B", "2", 1L);
        testInputTopic.pipeInput("C", "3", 2L);
        testInputTopic.pipeInput("D", "4", 3L);
        testInputTopic.pipeInput("A", "1", 9L);
        testInputTopic.pipeInput("A", "1", 15L);
        List<MockApiProcessor<Windowed<String>, String, Void, Void>> capturedProcessors = mockApiProcessorSupplier.capturedProcessors(3);
        capturedProcessors.get(0).checkAndClearProcessResult(new KeyValueTimestamp<>(new Windowed("A", new TimeWindow(0L, 10L)), "0+1+1", 9L), new KeyValueTimestamp<>(new Windowed("B", new TimeWindow(0L, 10L)), "0+2", 1L), new KeyValueTimestamp<>(new Windowed("C", new TimeWindow(0L, 10L)), "0+3", 2L), new KeyValueTimestamp<>(new Windowed("D", new TimeWindow(0L, 10L)), "0+4", 3L));
        capturedProcessors.get(1).checkAndClearProcessResult(new KeyValueTimestamp[0]);
        capturedProcessors.get(2).checkAndClearProcessResult(new KeyValueTimestamp[0]);
        testInputTopic.pipeInput("A", "1", 10L);
        testInputTopic.pipeInput("B", "2", 11L);
        testInputTopic.pipeInput("D", "4", 12L);
        testInputTopic.pipeInput("B", "2", 13L);
        testInputTopic.pipeInput("C", "3", 14L);
        testInputTopic.pipeInput("A", "1", 20L);
        capturedProcessors.get(0).checkAndClearProcessResult(new KeyValueTimestamp<>(new Windowed("A", new TimeWindow(5L, 15L)), "0+1+1", 10L), new KeyValueTimestamp<>(new Windowed("B", new TimeWindow(5L, 15L)), "0+2+2", 13L), new KeyValueTimestamp<>(new Windowed("C", new TimeWindow(5L, 15L)), "0+3", 14L), new KeyValueTimestamp<>(new Windowed("D", new TimeWindow(5L, 15L)), "0+4", 12L));
        capturedProcessors.get(1).checkAndClearProcessResult(new KeyValueTimestamp[0]);
        capturedProcessors.get(2).checkAndClearProcessResult(new KeyValueTimestamp[0]);
        testInputTopic2.pipeInput("A", "a", 0L);
        testInputTopic2.pipeInput("B", "b", 1L);
        testInputTopic2.pipeInput("C", "c", 2L);
        testInputTopic2.pipeInput("D", "d", 10L);
        testInputTopic2.pipeInput("A", "a", 15L);
        capturedProcessors.get(0).checkAndClearProcessResult(new KeyValueTimestamp[0]);
        if (this.withCache) {
            capturedProcessors.get(1).checkAndClearProcessResult(new KeyValueTimestamp<>(new Windowed("A", new TimeWindow(0L, 10L)), "0+a", 0L), new KeyValueTimestamp<>(new Windowed("B", new TimeWindow(0L, 10L)), "0+b", 1L), new KeyValueTimestamp<>(new Windowed("C", new TimeWindow(0L, 10L)), "0+c", 2L));
            capturedProcessors.get(2).checkAndClearProcessResult(new KeyValueTimestamp<>(new Windowed("A", new TimeWindow(0L, 10L)), "0+1+1%0+a", 9L), new KeyValueTimestamp<>(new Windowed("B", new TimeWindow(0L, 10L)), "0+2%0+b", 1L), new KeyValueTimestamp<>(new Windowed("C", new TimeWindow(0L, 10L)), "0+3%0+c", 2L));
        } else {
            capturedProcessors.get(0).checkAndClearProcessResult(new KeyValueTimestamp[0]);
            capturedProcessors.get(2).checkAndClearProcessResult(new KeyValueTimestamp[0]);
        }
        testInputTopic2.pipeInput("A", "a", 5L);
        testInputTopic2.pipeInput("B", "b", 6L);
        testInputTopic2.pipeInput("D", "d", 7L);
        testInputTopic2.pipeInput("D", "d", 18L);
        testInputTopic2.pipeInput("A", "a", 21L);
        capturedProcessors.get(0).checkAndClearProcessResult(new KeyValueTimestamp[0]);
        if (this.withCache) {
            capturedProcessors.get(1).checkAndClearProcessResult(new KeyValueTimestamp<>(new Windowed("A", new TimeWindow(5L, 15L)), "0+a", 5L), new KeyValueTimestamp<>(new Windowed("B", new TimeWindow(5L, 15L)), "0+b", 6L), new KeyValueTimestamp<>(new Windowed("D", new TimeWindow(5L, 15L)), "0+d+d", 10L));
        } else {
            capturedProcessors.get(1).checkAndClearProcessResult(new KeyValueTimestamp<>(new Windowed("A", new TimeWindow(0L, 10L)), "0+a", 0L), new KeyValueTimestamp<>(new Windowed("B", new TimeWindow(0L, 10L)), "0+b", 1L), new KeyValueTimestamp<>(new Windowed("C", new TimeWindow(0L, 10L)), "0+c", 2L), new KeyValueTimestamp<>(new Windowed("A", new TimeWindow(5L, 15L)), "0+a", 5L), new KeyValueTimestamp<>(new Windowed("B", new TimeWindow(5L, 15L)), "0+b", 6L), new KeyValueTimestamp<>(new Windowed("D", new TimeWindow(5L, 15L)), "0+d+d", 10L));
        }
        capturedProcessors.get(2).checkAndClearProcessResult(new KeyValueTimestamp<>(new Windowed("A", new TimeWindow(5L, 15L)), "0+1+1%0+a", 10L), new KeyValueTimestamp<>(new Windowed("B", new TimeWindow(5L, 15L)), "0+2+2%0+b", 13L), new KeyValueTimestamp<>(new Windowed("D", new TimeWindow(5L, 15L)), "0+4%0+d+d", 12L));
    }

    private void processEmitUpdateJoin(TestInputTopic<String, String> testInputTopic, TestInputTopic<String, String> testInputTopic2, MockApiProcessorSupplier<Windowed<String>, String, Void, Void> mockApiProcessorSupplier) {
        testInputTopic.pipeInput("A", "1", 0L);
        testInputTopic.pipeInput("B", "2", 1L);
        testInputTopic.pipeInput("C", "3", 2L);
        testInputTopic.pipeInput("D", "4", 3L);
        testInputTopic.pipeInput("A", "1", 9L);
        List<MockApiProcessor<Windowed<String>, String, Void, Void>> capturedProcessors = mockApiProcessorSupplier.capturedProcessors(3);
        capturedProcessors.get(0).checkAndClearProcessResult(new KeyValueTimestamp<>(new Windowed("A", new TimeWindow(0L, 10L)), "0+1", 0L), new KeyValueTimestamp<>(new Windowed("B", new TimeWindow(0L, 10L)), "0+2", 1L), new KeyValueTimestamp<>(new Windowed("C", new TimeWindow(0L, 10L)), "0+3", 2L), new KeyValueTimestamp<>(new Windowed("D", new TimeWindow(0L, 10L)), "0+4", 3L), new KeyValueTimestamp<>(new Windowed("A", new TimeWindow(0L, 10L)), "0+1+1", 9L), new KeyValueTimestamp<>(new Windowed("A", new TimeWindow(5L, 15L)), "0+1", 9L));
        capturedProcessors.get(1).checkAndClearProcessResult(new KeyValueTimestamp[0]);
        capturedProcessors.get(2).checkAndClearProcessResult(new KeyValueTimestamp[0]);
        testInputTopic.pipeInput("A", "1", 5L);
        testInputTopic.pipeInput("B", "2", 6L);
        testInputTopic.pipeInput("D", "4", 7L);
        testInputTopic.pipeInput("B", "2", 8L);
        testInputTopic.pipeInput("C", "3", 9L);
        capturedProcessors.get(0).checkAndClearProcessResult(new KeyValueTimestamp<>(new Windowed("A", new TimeWindow(0L, 10L)), "0+1+1+1", 9L), new KeyValueTimestamp<>(new Windowed("A", new TimeWindow(5L, 15L)), "0+1+1", 9L), new KeyValueTimestamp<>(new Windowed("B", new TimeWindow(0L, 10L)), "0+2+2", 6L), new KeyValueTimestamp<>(new Windowed("B", new TimeWindow(5L, 15L)), "0+2", 6L), new KeyValueTimestamp<>(new Windowed("D", new TimeWindow(0L, 10L)), "0+4+4", 7L), new KeyValueTimestamp<>(new Windowed("D", new TimeWindow(5L, 15L)), "0+4", 7L), new KeyValueTimestamp<>(new Windowed("B", new TimeWindow(0L, 10L)), "0+2+2+2", 8L), new KeyValueTimestamp<>(new Windowed("B", new TimeWindow(5L, 15L)), "0+2+2", 8L), new KeyValueTimestamp<>(new Windowed("C", new TimeWindow(0L, 10L)), "0+3+3", 9L), new KeyValueTimestamp<>(new Windowed("C", new TimeWindow(5L, 15L)), "0+3", 9L));
        capturedProcessors.get(1).checkAndClearProcessResult(new KeyValueTimestamp[0]);
        capturedProcessors.get(2).checkAndClearProcessResult(new KeyValueTimestamp[0]);
        testInputTopic2.pipeInput("A", "a", 0L);
        testInputTopic2.pipeInput("B", "b", 1L);
        testInputTopic2.pipeInput("C", "c", 2L);
        testInputTopic2.pipeInput("D", "d", 20L);
        testInputTopic2.pipeInput("A", "a", 20L);
        capturedProcessors.get(0).checkAndClearProcessResult(new KeyValueTimestamp[0]);
        capturedProcessors.get(1).checkAndClearProcessResult(new KeyValueTimestamp<>(new Windowed("A", new TimeWindow(0L, 10L)), "0+a", 0L), new KeyValueTimestamp<>(new Windowed("B", new TimeWindow(0L, 10L)), "0+b", 1L), new KeyValueTimestamp<>(new Windowed("C", new TimeWindow(0L, 10L)), "0+c", 2L), new KeyValueTimestamp<>(new Windowed("D", new TimeWindow(15L, 25L)), "0+d", 20L), new KeyValueTimestamp<>(new Windowed("D", new TimeWindow(20L, 30L)), "0+d", 20L), new KeyValueTimestamp<>(new Windowed("A", new TimeWindow(15L, 25L)), "0+a", 20L), new KeyValueTimestamp<>(new Windowed("A", new TimeWindow(20L, 30L)), "0+a", 20L));
        capturedProcessors.get(2).checkAndClearProcessResult(new KeyValueTimestamp<>(new Windowed("A", new TimeWindow(0L, 10L)), "0+1+1+1%0+a", 9L), new KeyValueTimestamp<>(new Windowed("B", new TimeWindow(0L, 10L)), "0+2+2+2%0+b", 8L), new KeyValueTimestamp<>(new Windowed("C", new TimeWindow(0L, 10L)), "0+3+3%0+c", 9L));
        testInputTopic2.pipeInput("A", "a", 5L);
        testInputTopic2.pipeInput("B", "b", 6L);
        testInputTopic2.pipeInput("D", "d", 7L);
        testInputTopic2.pipeInput("D", "d", 18L);
        testInputTopic2.pipeInput("A", "a", 21L);
        capturedProcessors.get(0).checkAndClearProcessResult(new KeyValueTimestamp[0]);
        capturedProcessors.get(1).checkAndClearProcessResult(new KeyValueTimestamp<>(new Windowed("A", new TimeWindow(0L, 10L)), "0+a+a", 5L), new KeyValueTimestamp<>(new Windowed("A", new TimeWindow(5L, 15L)), "0+a", 5L), new KeyValueTimestamp<>(new Windowed("B", new TimeWindow(0L, 10L)), "0+b+b", 6L), new KeyValueTimestamp<>(new Windowed("B", new TimeWindow(5L, 15L)), "0+b", 6L), new KeyValueTimestamp<>(new Windowed("D", new TimeWindow(0L, 10L)), "0+d", 7L), new KeyValueTimestamp<>(new Windowed("D", new TimeWindow(5L, 15L)), "0+d", 7L), new KeyValueTimestamp<>(new Windowed("D", new TimeWindow(10L, 20L)), "0+d", 18L), new KeyValueTimestamp<>(new Windowed("D", new TimeWindow(15L, 25L)), "0+d+d", 20L), new KeyValueTimestamp<>(new Windowed("A", new TimeWindow(15L, 25L)), "0+a+a", 21L), new KeyValueTimestamp<>(new Windowed("A", new TimeWindow(20L, 30L)), "0+a+a", 21L));
        capturedProcessors.get(2).checkAndClearProcessResult(new KeyValueTimestamp<>(new Windowed("A", new TimeWindow(0L, 10L)), "0+1+1+1%0+a+a", 9L), new KeyValueTimestamp<>(new Windowed("A", new TimeWindow(5L, 15L)), "0+1+1%0+a", 9L), new KeyValueTimestamp<>(new Windowed("B", new TimeWindow(0L, 10L)), "0+2+2+2%0+b+b", 8L), new KeyValueTimestamp<>(new Windowed("B", new TimeWindow(5L, 15L)), "0+2+2%0+b", 8L), new KeyValueTimestamp<>(new Windowed("D", new TimeWindow(0L, 10L)), "0+4+4%0+d", 7L), new KeyValueTimestamp<>(new Windowed("D", new TimeWindow(5L, 15L)), "0+4%0+d", 7L));
    }

    @Test
    public void shouldLogAndMeterWhenSkippingNullKey() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream(AssignmentTestUtils.TOPIC_PREFIX, Consumed.with(Serdes.String(), Serdes.String())).groupByKey(Grouped.with(Serdes.String(), Serdes.String())).windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMillis(10L)).advanceBy(Duration.ofMillis(5L))).emitStrategy(this.emitStrategy).aggregate(MockInitializer.STRING_INIT, MockAggregator.toStringInstance("+"), setMaterializedCache(Materialized.as("topic1-Canonicalized").withValueSerde(Serdes.String())));
        LogCaptureAppender createAndRegister = LogCaptureAppender.createAndRegister(KStreamWindowAggregate.class);
        Throwable th = null;
        try {
            TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build(), this.props);
            Throwable th2 = null;
            try {
                try {
                    topologyTestDriver.createInputTopic(AssignmentTestUtils.TOPIC_PREFIX, new StringSerializer(), new StringSerializer()).pipeInput((Object) null, "1");
                    MatcherAssert.assertThat(createAndRegister.getMessages(), CoreMatchers.hasItem("Skipping record due to null key. topic=[topic] partition=[0] offset=[0]"));
                    if (topologyTestDriver != null) {
                        if (0 != 0) {
                            try {
                                topologyTestDriver.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            topologyTestDriver.close();
                        }
                    }
                    if (createAndRegister != null) {
                        if (0 == 0) {
                            createAndRegister.close();
                            return;
                        }
                        try {
                            createAndRegister.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    }
                } catch (Throwable th5) {
                    th2 = th5;
                    throw th5;
                }
            } catch (Throwable th6) {
                if (topologyTestDriver != null) {
                    if (th2 != null) {
                        try {
                            topologyTestDriver.close();
                        } catch (Throwable th7) {
                            th2.addSuppressed(th7);
                        }
                    } else {
                        topologyTestDriver.close();
                    }
                }
                throw th6;
            }
        } catch (Throwable th8) {
            if (createAndRegister != null) {
                if (0 != 0) {
                    try {
                        createAndRegister.close();
                    } catch (Throwable th9) {
                        th.addSuppressed(th9);
                    }
                } else {
                    createAndRegister.close();
                }
            }
            throw th8;
        }
    }

    @Test
    public void shouldLogAndMeterWhenSkippingExpiredWindow() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream(AssignmentTestUtils.TOPIC_PREFIX, Consumed.with(Serdes.String(), Serdes.String())).groupByKey(Grouped.with(Serdes.String(), Serdes.String())).windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofMillis(10L), Duration.ofMillis(90L)).advanceBy(Duration.ofMillis(5L))).emitStrategy(this.emitStrategy).aggregate(() -> {
            return "";
        }, MockAggregator.toStringInstance("+"), setMaterializedCache(Materialized.as("topic1-Canonicalized").withValueSerde(Serdes.String()).withLoggingDisabled().withRetention(Duration.ofMillis(100L)))).toStream().map((windowed, str) -> {
            return new KeyValue(windowed.toString(), str);
        }).to("output");
        LogCaptureAppender createAndRegister = LogCaptureAppender.createAndRegister(KStreamWindowAggregate.class);
        Throwable th = null;
        try {
            TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build(), this.props);
            Throwable th2 = null;
            try {
                try {
                    TestInputTopic createInputTopic = topologyTestDriver.createInputTopic(AssignmentTestUtils.TOPIC_PREFIX, new StringSerializer(), new StringSerializer());
                    createInputTopic.pipeInput("k", "100", 100L);
                    createInputTopic.pipeInput("k", "0", 0L);
                    createInputTopic.pipeInput("k", "1", 1L);
                    createInputTopic.pipeInput("k", "2", 2L);
                    createInputTopic.pipeInput("k", "3", 3L);
                    createInputTopic.pipeInput("k", "4", 4L);
                    createInputTopic.pipeInput("k", "5", 5L);
                    createInputTopic.pipeInput("k", "6", 6L);
                    createInputTopic.pipeInput("k", "105", 105L);
                    createInputTopic.pipeInput("k", "106", 106L);
                    assertLatenessMetrics(topologyTestDriver, CoreMatchers.is(Double.valueOf(7.0d)), CoreMatchers.is(Double.valueOf(100.0d)), CoreMatchers.is(Double.valueOf(67.9d)));
                    MatcherAssert.assertThat(createAndRegister.getMessages(), CoreMatchers.hasItems(new String[]{"Skipping record for expired window. topic=[topic] partition=[0] offset=[1] timestamp=[0] window=[0,10) expiration=[10] streamTime=[100]", "Skipping record for expired window. topic=[topic] partition=[0] offset=[2] timestamp=[1] window=[0,10) expiration=[10] streamTime=[100]", "Skipping record for expired window. topic=[topic] partition=[0] offset=[3] timestamp=[2] window=[0,10) expiration=[10] streamTime=[100]", "Skipping record for expired window. topic=[topic] partition=[0] offset=[4] timestamp=[3] window=[0,10) expiration=[10] streamTime=[100]", "Skipping record for expired window. topic=[topic] partition=[0] offset=[5] timestamp=[4] window=[0,10) expiration=[10] streamTime=[100]", "Skipping record for expired window. topic=[topic] partition=[0] offset=[6] timestamp=[5] window=[0,10) expiration=[10] streamTime=[100]", "Skipping record for expired window. topic=[topic] partition=[0] offset=[7] timestamp=[6] window=[0,10) expiration=[10] streamTime=[100]"}));
                    TestOutputTopic createOutputTopic = topologyTestDriver.createOutputTopic("output", new StringDeserializer(), new StringDeserializer());
                    if (this.emitFinal) {
                        MatcherAssert.assertThat(createOutputTopic.readRecord(), CoreMatchers.equalTo(new TestRecord("[k@5/15]", "+5+6", (Headers) null, 6L)));
                        assertEmittedMetrics(topologyTestDriver, CoreMatchers.is(Double.valueOf(1.0d)));
                    } else {
                        MatcherAssert.assertThat(createOutputTopic.readRecord(), CoreMatchers.equalTo(new TestRecord("[k@95/105]", "+100", (Headers) null, 100L)));
                        MatcherAssert.assertThat(createOutputTopic.readRecord(), CoreMatchers.equalTo(new TestRecord("[k@100/110]", "+100", (Headers) null, 100L)));
                        MatcherAssert.assertThat(createOutputTopic.readRecord(), CoreMatchers.equalTo(new TestRecord("[k@5/15]", "+5", (Headers) null, 5L)));
                        MatcherAssert.assertThat(createOutputTopic.readRecord(), CoreMatchers.equalTo(new TestRecord("[k@5/15]", "+5+6", (Headers) null, 6L)));
                        MatcherAssert.assertThat(createOutputTopic.readRecord(), CoreMatchers.equalTo(new TestRecord("[k@100/110]", "+100+105", (Headers) null, 105L)));
                        MatcherAssert.assertThat(createOutputTopic.readRecord(), CoreMatchers.equalTo(new TestRecord("[k@105/115]", "+105", (Headers) null, 105L)));
                        MatcherAssert.assertThat(createOutputTopic.readRecord(), CoreMatchers.equalTo(new TestRecord("[k@100/110]", "+100+105+106", (Headers) null, 106L)));
                        MatcherAssert.assertThat(createOutputTopic.readRecord(), CoreMatchers.equalTo(new TestRecord("[k@105/115]", "+105+106", (Headers) null, 106L)));
                    }
                    Assert.assertTrue(createOutputTopic.isEmpty());
                    if (topologyTestDriver != null) {
                        if (0 != 0) {
                            try {
                                topologyTestDriver.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            topologyTestDriver.close();
                        }
                    }
                    if (createAndRegister != null) {
                        if (0 == 0) {
                            createAndRegister.close();
                            return;
                        }
                        try {
                            createAndRegister.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    }
                } catch (Throwable th5) {
                    th2 = th5;
                    throw th5;
                }
            } catch (Throwable th6) {
                if (topologyTestDriver != null) {
                    if (th2 != null) {
                        try {
                            topologyTestDriver.close();
                        } catch (Throwable th7) {
                            th2.addSuppressed(th7);
                        }
                    } else {
                        topologyTestDriver.close();
                    }
                }
                throw th6;
            }
        } catch (Throwable th8) {
            if (createAndRegister != null) {
                if (0 != 0) {
                    try {
                        createAndRegister.close();
                    } catch (Throwable th9) {
                        th.addSuppressed(th9);
                    }
                } else {
                    createAndRegister.close();
                }
            }
            throw th8;
        }
    }

    @Test
    public void shouldLogAndMeterWhenSkippingExpiredWindowByGrace() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream(AssignmentTestUtils.TOPIC_PREFIX, Consumed.with(Serdes.String(), Serdes.String())).groupByKey(Grouped.with(Serdes.String(), Serdes.String())).windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofMillis(10L), Duration.ofMillis(90L)).advanceBy(Duration.ofMillis(10L))).emitStrategy(this.emitStrategy).aggregate(() -> {
            return "";
        }, MockAggregator.toStringInstance("+"), setMaterializedCache(Materialized.as("topic1-Canonicalized").withValueSerde(Serdes.String()).withLoggingDisabled())).toStream().map((windowed, str) -> {
            return new KeyValue(windowed.toString(), str);
        }).to("output");
        LogCaptureAppender createAndRegister = LogCaptureAppender.createAndRegister(KStreamWindowAggregate.class);
        Throwable th = null;
        try {
            TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build(), this.props);
            Throwable th2 = null;
            try {
                try {
                    TestInputTopic createInputTopic = topologyTestDriver.createInputTopic(AssignmentTestUtils.TOPIC_PREFIX, new StringSerializer(), new StringSerializer());
                    createInputTopic.pipeInput("k", "100", 200L);
                    createInputTopic.pipeInput("k", "0", 100L);
                    createInputTopic.pipeInput("k", "1", 101L);
                    createInputTopic.pipeInput("k", "2", 102L);
                    createInputTopic.pipeInput("k", "3", 103L);
                    createInputTopic.pipeInput("k", "4", 104L);
                    createInputTopic.pipeInput("k", "5", 105L);
                    createInputTopic.pipeInput("k", "6", 6L);
                    assertLatenessMetrics(topologyTestDriver, CoreMatchers.is(Double.valueOf(7.0d)), CoreMatchers.is(Double.valueOf(194.0d)), CoreMatchers.is(Double.valueOf(97.375d)));
                    MatcherAssert.assertThat(createAndRegister.getMessages(), CoreMatchers.hasItems(new String[]{"Skipping record for expired window. topic=[topic] partition=[0] offset=[1] timestamp=[100] window=[100,110) expiration=[110] streamTime=[200]", "Skipping record for expired window. topic=[topic] partition=[0] offset=[2] timestamp=[101] window=[100,110) expiration=[110] streamTime=[200]", "Skipping record for expired window. topic=[topic] partition=[0] offset=[3] timestamp=[102] window=[100,110) expiration=[110] streamTime=[200]", "Skipping record for expired window. topic=[topic] partition=[0] offset=[4] timestamp=[103] window=[100,110) expiration=[110] streamTime=[200]", "Skipping record for expired window. topic=[topic] partition=[0] offset=[5] timestamp=[104] window=[100,110) expiration=[110] streamTime=[200]", "Skipping record for expired window. topic=[topic] partition=[0] offset=[6] timestamp=[105] window=[100,110) expiration=[110] streamTime=[200]", "Skipping record for expired window. topic=[topic] partition=[0] offset=[7] timestamp=[6] window=[0,10) expiration=[110] streamTime=[200]"}));
                    if (!this.emitFinal) {
                        TestOutputTopic createOutputTopic = topologyTestDriver.createOutputTopic("output", new StringDeserializer(), new StringDeserializer());
                        MatcherAssert.assertThat(createOutputTopic.readRecord(), CoreMatchers.equalTo(new TestRecord("[k@200/210]", "+100", (Headers) null, 200L)));
                        Assert.assertTrue(createOutputTopic.isEmpty());
                    }
                    if (topologyTestDriver != null) {
                        if (0 != 0) {
                            try {
                                topologyTestDriver.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            topologyTestDriver.close();
                        }
                    }
                    if (createAndRegister != null) {
                        if (0 == 0) {
                            createAndRegister.close();
                            return;
                        }
                        try {
                            createAndRegister.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    }
                } catch (Throwable th5) {
                    th2 = th5;
                    throw th5;
                }
            } catch (Throwable th6) {
                if (topologyTestDriver != null) {
                    if (th2 != null) {
                        try {
                            topologyTestDriver.close();
                        } catch (Throwable th7) {
                            th2.addSuppressed(th7);
                        }
                    } else {
                        topologyTestDriver.close();
                    }
                }
                throw th6;
            }
        } catch (Throwable th8) {
            if (createAndRegister != null) {
                if (0 != 0) {
                    try {
                        createAndRegister.close();
                    } catch (Throwable th9) {
                        th.addSuppressed(th9);
                    }
                } else {
                    createAndRegister.close();
                }
            }
            throw th8;
        }
    }

    @Test
    public void shouldNotEmitFinalIfNotProgressEnough() throws IOException {
        File tempDirectory = TestUtils.tempDirectory();
        TimeWindows advanceBy = TimeWindows.ofSizeAndGrace(Duration.ofMillis(10L), Duration.ofMillis(5L)).advanceBy(Duration.ofMillis(5L));
        try {
            this.props.put("__emit.interval.ms.kstreams.windowed.aggregation__", 0);
            MockInternalNewProcessorContext<Windowed<String>, Change<String>> makeContext = makeContext(tempDirectory, 10L);
            Processor processor = new KStreamWindowAggregate(advanceBy, WINDOW_STORE_NAME, this.emitStrategy, MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER).get();
            processor.init(makeContext);
            makeContext.setSystemTimeMs(0L);
            processor.process(new Record("A", "1", 0L));
            processor.process(new Record("B", "2", 5L));
            processor.process(new Record("C", "3", 15L));
            MatcherAssert.assertThat(makeContext.forwarded(), CoreMatchers.is(this.emitFinal ? Arrays.asList(new MockProcessorContext.CapturedForward(new Record(new Windowed("A", new TimeWindow(0L, 10L)), new Change("0+1", (Object) null), 0L)), new MockProcessorContext.CapturedForward(new Record(new Windowed("B", new TimeWindow(0L, 10L)), new Change("0+2", (Object) null), 5L))) : Arrays.asList(new MockProcessorContext.CapturedForward(new Record(new Windowed("A", new TimeWindow(0L, 10L)), new Change("0+1", (Object) null), 0L)), new MockProcessorContext.CapturedForward(new Record(new Windowed("B", new TimeWindow(0L, 10L)), new Change("0+2", (Object) null), 5L)), new MockProcessorContext.CapturedForward(new Record(new Windowed("B", new TimeWindow(5L, 15L)), new Change("0+2", (Object) null), 5L)), new MockProcessorContext.CapturedForward(new Record(new Windowed("C", new TimeWindow(10L, 20L)), new Change("0+3", (Object) null), 15L)), new MockProcessorContext.CapturedForward(new Record(new Windowed("C", new TimeWindow(15L, 25L)), new Change("0+3", (Object) null), 15L)))));
            makeContext.resetForwards();
            processor.process(new Record("D", "4", 15L));
            List forwarded = makeContext.forwarded();
            if (this.emitFinal) {
                Assert.assertTrue(forwarded.isEmpty());
            } else {
                MatcherAssert.assertThat(forwarded, CoreMatchers.is(Arrays.asList(new MockProcessorContext.CapturedForward(new Record(new Windowed("D", new TimeWindow(10L, 20L)), new Change("0+4", (Object) null), 15L)), new MockProcessorContext.CapturedForward(new Record(new Windowed("D", new TimeWindow(15L, 25L)), new Change("0+4", (Object) null), 15L)))));
            }
            makeContext.resetForwards();
            processor.process(new Record("E", "5", 19L));
            List forwarded2 = makeContext.forwarded();
            if (this.emitFinal) {
                Assert.assertTrue(forwarded2.isEmpty());
            } else {
                MatcherAssert.assertThat(forwarded2, CoreMatchers.is(Arrays.asList(new MockProcessorContext.CapturedForward(new Record(new Windowed("E", new TimeWindow(10L, 20L)), new Change("0+5", (Object) null), 19L)), new MockProcessorContext.CapturedForward(new Record(new Windowed("E", new TimeWindow(15L, 25L)), new Change("0+5", (Object) null), 19L)))));
            }
            makeContext.getStateStore(WINDOW_STORE_NAME).close();
            Utils.delete(tempDirectory);
        } catch (Throwable th) {
            Utils.delete(tempDirectory);
            throw th;
        }
    }

    @Test
    public void shouldEmitWithInterval0() throws IOException {
        File tempDirectory = TestUtils.tempDirectory();
        TimeWindows advanceBy = TimeWindows.ofSizeAndGrace(Duration.ofMillis(10L), Duration.ofMillis(5L)).advanceBy(Duration.ofMillis(5L));
        try {
            this.props.put("__emit.interval.ms.kstreams.windowed.aggregation__", 0);
            MockInternalNewProcessorContext<Windowed<String>, Change<String>> makeContext = makeContext(tempDirectory, 10L);
            Processor processor = new KStreamWindowAggregate(advanceBy, WINDOW_STORE_NAME, this.emitStrategy, MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER).get();
            processor.init(makeContext);
            makeContext.setSystemTimeMs(0L);
            processor.process(new Record("A", "1", 0L));
            processor.process(new Record("A", "1", 5L));
            processor.process(new Record("B", "2", 10L));
            processor.process(new Record("C", "3", 15L));
            processor.process(new Record("D", "4", 20L));
            MatcherAssert.assertThat(makeContext.forwarded(), CoreMatchers.is(this.emitFinal ? Arrays.asList(new MockProcessorContext.CapturedForward(new Record(new Windowed("A", new TimeWindow(0L, 10L)), new Change("0+1+1", (Object) null), 5L)), new MockProcessorContext.CapturedForward(new Record(new Windowed("A", new TimeWindow(5L, 15L)), new Change("0+1", (Object) null), 5L)), new MockProcessorContext.CapturedForward(new Record(new Windowed("B", new TimeWindow(5L, 15L)), new Change("0+2", (Object) null), 10L))) : Arrays.asList(new MockProcessorContext.CapturedForward(new Record(new Windowed("A", new TimeWindow(0L, 10L)), new Change("0+1", (Object) null), 0L)), new MockProcessorContext.CapturedForward(new Record(new Windowed("A", new TimeWindow(0L, 10L)), new Change("0+1+1", (Object) null), 5L)), new MockProcessorContext.CapturedForward(new Record(new Windowed("A", new TimeWindow(5L, 15L)), new Change("0+1", (Object) null), 5L)), new MockProcessorContext.CapturedForward(new Record(new Windowed("B", new TimeWindow(5L, 15L)), new Change("0+2", (Object) null), 10L)), new MockProcessorContext.CapturedForward(new Record(new Windowed("B", new TimeWindow(10L, 20L)), new Change("0+2", (Object) null), 10L)), new MockProcessorContext.CapturedForward(new Record(new Windowed("C", new TimeWindow(10L, 20L)), new Change("0+3", (Object) null), 15L)), new MockProcessorContext.CapturedForward(new Record(new Windowed("C", new TimeWindow(15L, 25L)), new Change("0+3", (Object) null), 15L)), new MockProcessorContext.CapturedForward(new Record(new Windowed("D", new TimeWindow(15L, 25L)), new Change("0+4", (Object) null), 20L)), new MockProcessorContext.CapturedForward(new Record(new Windowed("D", new TimeWindow(20L, 30L)), new Change("0+4", (Object) null), 20L)))));
            makeContext.getStateStore(WINDOW_STORE_NAME).close();
            Utils.delete(tempDirectory);
        } catch (Throwable th) {
            Utils.delete(tempDirectory);
            throw th;
        }
    }

    @Test
    public void shouldEmitWithLargeInterval() throws IOException {
        File tempDirectory = TestUtils.tempDirectory();
        TimeWindows advanceBy = TimeWindows.ofSizeAndGrace(Duration.ofMillis(10L), Duration.ofMillis(5L)).advanceBy(Duration.ofMillis(5L));
        try {
            this.props.put("__emit.interval.ms.kstreams.windowed.aggregation__", 1000L);
            MockInternalNewProcessorContext<Windowed<String>, Change<String>> makeContext = makeContext(tempDirectory, 10L);
            Processor processor = new KStreamWindowAggregate(advanceBy, WINDOW_STORE_NAME, this.emitStrategy, MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER).get();
            processor.init(makeContext);
            makeContext.setSystemTimeMs(0L);
            processor.process(new Record("A", "1", 0L));
            processor.process(new Record("A", "1", 5L));
            processor.process(new Record("B", "2", 10L));
            processor.process(new Record("C", "3", 15L));
            List forwarded = makeContext.forwarded();
            if (this.emitFinal) {
                Assert.assertTrue(forwarded.isEmpty());
            } else {
                MatcherAssert.assertThat(forwarded, CoreMatchers.is(Arrays.asList(new MockProcessorContext.CapturedForward(new Record(new Windowed("A", new TimeWindow(0L, 10L)), new Change("0+1", (Object) null), 0L)), new MockProcessorContext.CapturedForward(new Record(new Windowed("A", new TimeWindow(0L, 10L)), new Change("0+1+1", (Object) null), 5L)), new MockProcessorContext.CapturedForward(new Record(new Windowed("A", new TimeWindow(5L, 15L)), new Change("0+1", (Object) null), 5L)), new MockProcessorContext.CapturedForward(new Record(new Windowed("B", new TimeWindow(5L, 15L)), new Change("0+2", (Object) null), 10L)), new MockProcessorContext.CapturedForward(new Record(new Windowed("B", new TimeWindow(10L, 20L)), new Change("0+2", (Object) null), 10L)), new MockProcessorContext.CapturedForward(new Record(new Windowed("C", new TimeWindow(10L, 20L)), new Change("0+3", (Object) null), 15L)), new MockProcessorContext.CapturedForward(new Record(new Windowed("C", new TimeWindow(15L, 25L)), new Change("0+3", (Object) null), 15L)))));
            }
            makeContext.resetForwards();
            makeContext.setSystemTimeMs(10000L);
            processor.process(new Record("D", "4", 20L));
            MatcherAssert.assertThat(makeContext.forwarded(), CoreMatchers.is(this.emitFinal ? Arrays.asList(new MockProcessorContext.CapturedForward(new Record(new Windowed("A", new TimeWindow(0L, 10L)), new Change("0+1+1", (Object) null), 5L)), new MockProcessorContext.CapturedForward(new Record(new Windowed("A", new TimeWindow(5L, 15L)), new Change("0+1", (Object) null), 5L)), new MockProcessorContext.CapturedForward(new Record(new Windowed("B", new TimeWindow(5L, 15L)), new Change("0+2", (Object) null), 10L))) : Arrays.asList(new MockProcessorContext.CapturedForward(new Record(new Windowed("D", new TimeWindow(15L, 25L)), new Change("0+4", (Object) null), 20L)), new MockProcessorContext.CapturedForward(new Record(new Windowed("D", new TimeWindow(20L, 30L)), new Change("0+4", (Object) null), 20L)))));
            makeContext.resetForwards();
            makeContext.setSystemTimeMs(10100L);
            processor.process(new Record("E", "5", 40L));
            List forwarded2 = makeContext.forwarded();
            if (this.emitFinal) {
                Assert.assertTrue(forwarded2.isEmpty());
            } else {
                MatcherAssert.assertThat(forwarded2, CoreMatchers.is(Arrays.asList(new MockProcessorContext.CapturedForward(new Record(new Windowed("E", new TimeWindow(35L, 45L)), new Change("0+5", (Object) null), 40L)), new MockProcessorContext.CapturedForward(new Record(new Windowed("E", new TimeWindow(40L, 50L)), new Change("0+5", (Object) null), 40L)))));
            }
            makeContext.getStateStore(WINDOW_STORE_NAME).close();
            Utils.delete(tempDirectory);
        } catch (Throwable th) {
            Utils.delete(tempDirectory);
            throw th;
        }
    }

    @Test
    public void shouldEmitFromLastEmitTime() throws IOException {
        File tempDirectory = TestUtils.tempDirectory();
        TimeWindows advanceBy = TimeWindows.ofSizeAndGrace(Duration.ofMillis(10L), Duration.ofMillis(5L)).advanceBy(Duration.ofMillis(5L));
        try {
            this.props.put("__emit.interval.ms.kstreams.windowed.aggregation__", 0);
            MockInternalNewProcessorContext<Windowed<String>, Change<String>> makeContext = makeContext(tempDirectory, 10L);
            KStreamWindowAggregate kStreamWindowAggregate = new KStreamWindowAggregate(advanceBy, WINDOW_STORE_NAME, this.emitStrategy, MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER);
            Processor processor = kStreamWindowAggregate.get();
            processor.init(makeContext);
            makeContext.setSystemTimeMs(0L);
            processor.process(new Record("A", "1", 0L));
            processor.process(new Record("B", "2", 5L));
            processor.process(new Record("C", "3", 15L));
            MatcherAssert.assertThat(makeContext.forwarded(), CoreMatchers.is(this.emitFinal ? Arrays.asList(new MockProcessorContext.CapturedForward(new Record(new Windowed("A", new TimeWindow(0L, 10L)), new Change("0+1", (Object) null), 0L)), new MockProcessorContext.CapturedForward(new Record(new Windowed("B", new TimeWindow(0L, 10L)), new Change("0+2", (Object) null), 5L))) : Arrays.asList(new MockProcessorContext.CapturedForward(new Record(new Windowed("A", new TimeWindow(0L, 10L)), new Change("0+1", (Object) null), 0L)), new MockProcessorContext.CapturedForward(new Record(new Windowed("B", new TimeWindow(0L, 10L)), new Change("0+2", (Object) null), 5L)), new MockProcessorContext.CapturedForward(new Record(new Windowed("B", new TimeWindow(5L, 15L)), new Change("0+2", (Object) null), 5L)), new MockProcessorContext.CapturedForward(new Record(new Windowed("C", new TimeWindow(10L, 20L)), new Change("0+3", (Object) null), 15L)), new MockProcessorContext.CapturedForward(new Record(new Windowed("C", new TimeWindow(15L, 25L)), new Change("0+3", (Object) null), 15L)))));
            makeContext.resetForwards();
            Processor processor2 = kStreamWindowAggregate.get();
            processor2.init(makeContext);
            processor2.process(new Record("D", "4", 25L));
            MatcherAssert.assertThat(makeContext.forwarded(), CoreMatchers.is(this.emitFinal ? Arrays.asList(new MockProcessorContext.CapturedForward(new Record(new Windowed("B", new TimeWindow(5L, 15L)), new Change("0+2", (Object) null), 5L)), new MockProcessorContext.CapturedForward(new Record(new Windowed("C", new TimeWindow(10L, 20L)), new Change("0+3", (Object) null), 15L))) : Arrays.asList(new MockProcessorContext.CapturedForward(new Record(new Windowed("D", new TimeWindow(20L, 30L)), new Change("0+4", (Object) null), 25L)), new MockProcessorContext.CapturedForward(new Record(new Windowed("D", new TimeWindow(25L, 35L)), new Change("0+4", (Object) null), 25L)))));
            makeContext.resetForwards();
            makeContext.getStateStore(WINDOW_STORE_NAME).close();
            Utils.delete(tempDirectory);
        } catch (Throwable th) {
            Utils.delete(tempDirectory);
            throw th;
        }
    }

    @Test
    public void showThrowIfEmitFinalUsedWithUnlimitedWindow() {
        if (this.emitFinal) {
            MatcherAssert.assertThat(((IllegalArgumentException) Assert.assertThrows(IllegalArgumentException.class, () -> {
                new KStreamWindowAggregate(UnlimitedWindows.of(), WINDOW_STORE_NAME, this.emitStrategy, MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER);
            })).getMessage(), CoreMatchers.is("ON_WINDOW_CLOSE strategy is only supported for TimeWindows and SlidingWindows for TimeWindowedKStream"));
        } else {
            new KStreamWindowAggregate(UnlimitedWindows.of(), WINDOW_STORE_NAME, this.emitStrategy, MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER);
        }
    }

    private TimestampedWindowStore<String, String> getWindowStore(long j) {
        return Stores.timestampedWindowStoreBuilder(this.emitFinal ? RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.create(WINDOW_STORE_NAME, Duration.ofDays(1L), Duration.ofMillis(j), false, false) : Stores.persistentTimestampedWindowStore(WINDOW_STORE_NAME, Duration.ofDays(1L), Duration.ofMillis(j), false), Serdes.String(), Serdes.String()).withLoggingDisabled().withCachingDisabled().build();
    }

    private MockInternalNewProcessorContext<Windowed<String>, Change<String>> makeContext(File file, long j) {
        MockInternalNewProcessorContext<Windowed<String>, Change<String>> mockInternalNewProcessorContext = new MockInternalNewProcessorContext<>(this.props, new TaskId(0, 0), file);
        mockInternalNewProcessorContext.setCurrentNode(new ProcessorNode("testNode"));
        TimestampedWindowStore<String, String> windowStore = getWindowStore(j);
        windowStore.init(mockInternalNewProcessorContext.getStateStoreContext(), windowStore);
        mockInternalNewProcessorContext.getStateStoreContext().register(windowStore, (StateRestoreCallback) null);
        return mockInternalNewProcessorContext;
    }

    private void assertLatenessMetrics(TopologyTestDriver topologyTestDriver, Matcher<Object> matcher, Matcher<Object> matcher2, Matcher<Object> matcher3) {
        MetricName metricName = new MetricName("dropped-records-total", "stream-task-metrics", "The total number of dropped records", Utils.mkMap(new Map.Entry[]{Utils.mkEntry("thread-id", this.threadId), Utils.mkEntry("task-id", "0_0")}));
        MetricName metricName2 = new MetricName("dropped-records-rate", "stream-task-metrics", "The average number of dropped records per second", Utils.mkMap(new Map.Entry[]{Utils.mkEntry("thread-id", this.threadId), Utils.mkEntry("task-id", "0_0")}));
        MetricName metricName3 = new MetricName("record-lateness-max", "stream-task-metrics", "The observed maximum lateness of records in milliseconds, measured by comparing the record timestamp with the current stream time", Utils.mkMap(new Map.Entry[]{Utils.mkEntry("thread-id", this.threadId), Utils.mkEntry("task-id", "0_0")}));
        MetricName metricName4 = new MetricName("record-lateness-avg", "stream-task-metrics", "The observed average lateness of records in milliseconds, measured by comparing the record timestamp with the current stream time", Utils.mkMap(new Map.Entry[]{Utils.mkEntry("thread-id", this.threadId), Utils.mkEntry("task-id", "0_0")}));
        MatcherAssert.assertThat(((Metric) topologyTestDriver.metrics().get(metricName)).metricValue(), matcher);
        MatcherAssert.assertThat(((Metric) topologyTestDriver.metrics().get(metricName2)).metricValue(), CoreMatchers.not(Double.valueOf(0.0d)));
        MatcherAssert.assertThat(((Metric) topologyTestDriver.metrics().get(metricName3)).metricValue(), matcher2);
        MatcherAssert.assertThat(((Metric) topologyTestDriver.metrics().get(metricName4)).metricValue(), matcher3);
    }

    private void assertEmittedMetrics(TopologyTestDriver topologyTestDriver, Matcher<Object> matcher) {
        MetricName metricName = new MetricName("window-aggregate-final-emit-total", "stream-processor-node-metrics", "The total number of emit final records", Utils.mkMap(new Map.Entry[]{Utils.mkEntry("thread-id", this.threadId), Utils.mkEntry("task-id", "0_0"), Utils.mkEntry("processor-node-id", "KSTREAM-AGGREGATE-0000000001")}));
        MetricName metricName2 = new MetricName("window-aggregate-final-emit-rate", "stream-processor-node-metrics", "The average number of emit final records per second", Utils.mkMap(new Map.Entry[]{Utils.mkEntry("thread-id", this.threadId), Utils.mkEntry("task-id", "0_0"), Utils.mkEntry("processor-node-id", "KSTREAM-AGGREGATE-0000000001")}));
        MatcherAssert.assertThat(((Metric) topologyTestDriver.metrics().get(metricName)).metricValue(), matcher);
        MatcherAssert.assertThat(((Metric) topologyTestDriver.metrics().get(metricName2)).metricValue(), CoreMatchers.not(Double.valueOf(0.0d)));
    }

    private <K, V, S extends StateStore> Materialized<K, V, S> setMaterializedCache(Materialized<K, V, S> materialized) {
        return this.withCache ? materialized.withCachingEnabled() : materialized.withCachingDisabled();
    }
}
