package org.apache.kafka.streams.kstream.internals;

import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.Spliterators;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.LogCaptureAppender;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.TopologyWrapper;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.StreamJoined;
import org.apache.kafka.streams.kstream.internals.KStreamImplJoin;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.InternalTopicConfig;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.internals.InMemoryKeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.internals.InMemoryWindowBytesStoreSupplier;
import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;
import org.apache.kafka.streams.state.internals.LeftOrRightValueSerde;
import org.apache.kafka.streams.state.internals.TimestampedKeyAndJoinSideSerde;
import org.apache.kafka.streams.state.internals.WindowStoreBuilder;
import org.apache.kafka.test.GenericInMemoryKeyValueStore;
import org.apache.kafka.test.MockApiProcessor;
import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.MockInternalNewProcessorContext;
import org.apache.kafka.test.MockValueJoiner;
import org.apache.kafka.test.StreamsTestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.class */
public class KStreamKStreamJoinTest {
    private final String topic1 = AssignmentTestUtils.TP_1_NAME;
    private final String topic2 = AssignmentTestUtils.TP_2_NAME;
    private final Consumed<Integer, String> consumed = Consumed.with(Serdes.Integer(), Serdes.String());
    private final Properties props = StreamsTestUtils.getStreamsConfig((Serde<?>) Serdes.String(), (Serde<?>) Serdes.String());
    private final JoinWindows joinWindows = JoinWindows.ofTimeDifferenceAndGrace(Duration.ofMillis(50), Duration.ofMillis(50));
    private final StreamJoined<String, Integer, Integer> streamJoined = StreamJoined.with(Serdes.String(), Serdes.Integer(), Serdes.Integer());
    private final String errorMessagePrefix = "Window settings mismatch. WindowBytesStoreSupplier settings";
    private final String expectedTopologyWithUserNamedRepartitionTopics = "Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [topic])\n      --> KSTREAM-MAP-0000000003\n    Processor: KSTREAM-MAP-0000000003 (stores: [])\n      --> second-join-left-repartition-filter, first-join-left-repartition-filter\n      <-- KSTREAM-SOURCE-0000000000\n    Processor: first-join-left-repartition-filter (stores: [])\n      --> first-join-left-repartition-sink\n      <-- KSTREAM-MAP-0000000003\n    Processor: second-join-left-repartition-filter (stores: [])\n      --> second-join-left-repartition-sink\n      <-- KSTREAM-MAP-0000000003\n    Sink: first-join-left-repartition-sink (topic: first-join-left-repartition)\n      <-- first-join-left-repartition-filter\n    Sink: second-join-left-repartition-sink (topic: second-join-left-repartition)\n      <-- second-join-left-repartition-filter\n\n  Sub-topology: 1\n    Source: KSTREAM-SOURCE-0000000001 (topics: [topic2])\n      --> first-join-other-windowed\n    Source: first-join-left-repartition-source (topics: [first-join-left-repartition])\n      --> first-join-this-windowed\n    Processor: first-join-other-windowed (stores: [KSTREAM-JOINOTHER-0000000010-store])\n      --> first-join-other-join\n      <-- KSTREAM-SOURCE-0000000001\n    Processor: first-join-this-windowed (stores: [KSTREAM-JOINTHIS-0000000009-store])\n      --> first-join-this-join\n      <-- first-join-left-repartition-source\n    Processor: first-join-other-join (stores: [KSTREAM-JOINTHIS-0000000009-store])\n      --> first-join-merge\n      <-- first-join-other-windowed\n    Processor: first-join-this-join (stores: [KSTREAM-JOINOTHER-0000000010-store])\n      --> first-join-merge\n      <-- first-join-this-windowed\n    Processor: first-join-merge (stores: [])\n      --> KSTREAM-SINK-0000000012\n      <-- first-join-this-join, first-join-other-join\n    Sink: KSTREAM-SINK-0000000012 (topic: out-one)\n      <-- first-join-merge\n\n  Sub-topology: 2\n    Source: KSTREAM-SOURCE-0000000002 (topics: [topic3])\n      --> second-join-other-windowed\n    Source: second-join-left-repartition-source (topics: [second-join-left-repartition])\n      --> second-join-this-windowed\n    Processor: second-join-other-windowed (stores: [KSTREAM-JOINOTHER-0000000019-store])\n      --> second-join-other-join\n      <-- KSTREAM-SOURCE-0000000002\n    Processor: second-join-this-windowed (stores: [KSTREAM-JOINTHIS-0000000018-store])\n      --> second-join-this-join\n      <-- second-join-left-repartition-source\n    Processor: second-join-other-join (stores: [KSTREAM-JOINTHIS-0000000018-store])\n      --> second-join-merge\n      <-- second-join-other-windowed\n    Processor: second-join-this-join (stores: [KSTREAM-JOINOTHER-0000000019-store])\n      --> second-join-merge\n      <-- second-join-this-windowed\n    Processor: second-join-merge (stores: [])\n      --> KSTREAM-SINK-0000000021\n      <-- second-join-this-join, second-join-other-join\n    Sink: KSTREAM-SINK-0000000021 (topic: out-two)\n      <-- second-join-merge\n\n";
    private final String expectedTopologyWithGeneratedRepartitionTopic = "Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [topic])\n      --> KSTREAM-MAP-0000000003\n    Processor: KSTREAM-MAP-0000000003 (stores: [])\n      --> KSTREAM-FILTER-0000000005\n      <-- KSTREAM-SOURCE-0000000000\n    Processor: KSTREAM-FILTER-0000000005 (stores: [])\n      --> KSTREAM-SINK-0000000004\n      <-- KSTREAM-MAP-0000000003\n    Sink: KSTREAM-SINK-0000000004 (topic: KSTREAM-MAP-0000000003-repartition)\n      <-- KSTREAM-FILTER-0000000005\n\n  Sub-topology: 1\n    Source: KSTREAM-SOURCE-0000000006 (topics: [KSTREAM-MAP-0000000003-repartition])\n      --> KSTREAM-WINDOWED-0000000007, KSTREAM-WINDOWED-0000000016\n    Source: KSTREAM-SOURCE-0000000001 (topics: [topic2])\n      --> KSTREAM-WINDOWED-0000000008\n    Source: KSTREAM-SOURCE-0000000002 (topics: [topic3])\n      --> KSTREAM-WINDOWED-0000000017\n    Processor: KSTREAM-WINDOWED-0000000007 (stores: [KSTREAM-JOINTHIS-0000000009-store])\n      --> KSTREAM-JOINTHIS-0000000009\n      <-- KSTREAM-SOURCE-0000000006\n    Processor: KSTREAM-WINDOWED-0000000008 (stores: [KSTREAM-JOINOTHER-0000000010-store])\n      --> KSTREAM-JOINOTHER-0000000010\n      <-- KSTREAM-SOURCE-0000000001\n    Processor: KSTREAM-WINDOWED-0000000016 (stores: [KSTREAM-JOINTHIS-0000000018-store])\n      --> KSTREAM-JOINTHIS-0000000018\n      <-- KSTREAM-SOURCE-0000000006\n    Processor: KSTREAM-WINDOWED-0000000017 (stores: [KSTREAM-JOINOTHER-0000000019-store])\n      --> KSTREAM-JOINOTHER-0000000019\n      <-- KSTREAM-SOURCE-0000000002\n    Processor: KSTREAM-JOINOTHER-0000000010 (stores: [KSTREAM-JOINTHIS-0000000009-store])\n      --> KSTREAM-MERGE-0000000011\n      <-- KSTREAM-WINDOWED-0000000008\n    Processor: KSTREAM-JOINOTHER-0000000019 (stores: [KSTREAM-JOINTHIS-0000000018-store])\n      --> KSTREAM-MERGE-0000000020\n      <-- KSTREAM-WINDOWED-0000000017\n    Processor: KSTREAM-JOINTHIS-0000000009 (stores: [KSTREAM-JOINOTHER-0000000010-store])\n      --> KSTREAM-MERGE-0000000011\n      <-- KSTREAM-WINDOWED-0000000007\n    Processor: KSTREAM-JOINTHIS-0000000018 (stores: [KSTREAM-JOINOTHER-0000000019-store])\n      --> KSTREAM-MERGE-0000000020\n      <-- KSTREAM-WINDOWED-0000000016\n    Processor: KSTREAM-MERGE-0000000011 (stores: [])\n      --> KSTREAM-SINK-0000000012\n      <-- KSTREAM-JOINTHIS-0000000009, KSTREAM-JOINOTHER-0000000010\n    Processor: KSTREAM-MERGE-0000000020 (stores: [])\n      --> KSTREAM-SINK-0000000021\n      <-- KSTREAM-JOINTHIS-0000000018, KSTREAM-JOINOTHER-0000000019\n    Sink: KSTREAM-SINK-0000000012 (topic: out-one)\n      <-- KSTREAM-MERGE-0000000011\n    Sink: KSTREAM-SINK-0000000021 (topic: out-to)\n      <-- KSTREAM-MERGE-0000000020\n\n";

    @Test
    public void shouldLogAndMeterOnSkippedRecordsWithNullValueWithBuiltInMetricsVersionLatest() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream("left", Consumed.with(Serdes.String(), Serdes.Integer())).join(streamsBuilder.stream("right", Consumed.with(Serdes.String(), Serdes.Integer())), (v0, v1) -> {
            return Integer.sum(v0, v1);
        }, JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(100L)), StreamJoined.with(Serdes.String(), Serdes.Integer(), Serdes.Integer()));
        this.props.setProperty("built.in.metrics.version", "latest");
        LogCaptureAppender createAndRegister = LogCaptureAppender.createAndRegister(KStreamKStreamJoin.class);
        Throwable th = null;
        try {
            TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build(), this.props);
            Throwable th2 = null;
            try {
                try {
                    topologyTestDriver.createInputTopic("left", new StringSerializer(), new IntegerSerializer()).pipeInput("A", (Object) null);
                    MatcherAssert.assertThat(createAndRegister.getMessages(), CoreMatchers.hasItem("Skipping record due to null key or value. topic=[left] partition=[0] offset=[0]"));
                    if (topologyTestDriver != null) {
                        if (0 != 0) {
                            try {
                                topologyTestDriver.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            topologyTestDriver.close();
                        }
                    }
                    if (createAndRegister != null) {
                        if (0 == 0) {
                            createAndRegister.close();
                            return;
                        }
                        try {
                            createAndRegister.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    }
                } catch (Throwable th5) {
                    th2 = th5;
                    throw th5;
                }
            } catch (Throwable th6) {
                if (topologyTestDriver != null) {
                    if (th2 != null) {
                        try {
                            topologyTestDriver.close();
                        } catch (Throwable th7) {
                            th2.addSuppressed(th7);
                        }
                    } else {
                        topologyTestDriver.close();
                    }
                }
                throw th6;
            }
        } catch (Throwable th8) {
            if (createAndRegister != null) {
                if (0 != 0) {
                    try {
                        createAndRegister.close();
                    } catch (Throwable th9) {
                        th.addSuppressed(th9);
                    }
                } else {
                    createAndRegister.close();
                }
            }
            throw th8;
        }
    }

    @Test
    public void shouldReuseRepartitionTopicWithGeneratedName() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        Properties properties = new Properties();
        properties.put("topology.optimization", "none");
        KStream stream = streamsBuilder.stream(AssignmentTestUtils.TOPIC_PREFIX, Consumed.with(Serdes.String(), Serdes.String()));
        KStream stream2 = streamsBuilder.stream(AssignmentTestUtils.TP_2_NAME, Consumed.with(Serdes.String(), Serdes.String()));
        KStream stream3 = streamsBuilder.stream(AssignmentTestUtils.TP_3_NAME, Consumed.with(Serdes.String(), Serdes.String()));
        KStream map = stream.map((str, str2) -> {
            return new KeyValue(str2, str);
        });
        map.join(stream2, (str3, str4) -> {
            return str3 + str4;
        }, JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(100L))).to("out-one");
        map.join(stream3, (str5, str6) -> {
            return str5 + str6;
        }, JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(100L))).to("out-to");
        Assert.assertEquals("Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [topic])\n      --> KSTREAM-MAP-0000000003\n    Processor: KSTREAM-MAP-0000000003 (stores: [])\n      --> KSTREAM-FILTER-0000000005\n      <-- KSTREAM-SOURCE-0000000000\n    Processor: KSTREAM-FILTER-0000000005 (stores: [])\n      --> KSTREAM-SINK-0000000004\n      <-- KSTREAM-MAP-0000000003\n    Sink: KSTREAM-SINK-0000000004 (topic: KSTREAM-MAP-0000000003-repartition)\n      <-- KSTREAM-FILTER-0000000005\n\n  Sub-topology: 1\n    Source: KSTREAM-SOURCE-0000000006 (topics: [KSTREAM-MAP-0000000003-repartition])\n      --> KSTREAM-WINDOWED-0000000007, KSTREAM-WINDOWED-0000000016\n    Source: KSTREAM-SOURCE-0000000001 (topics: [topic2])\n      --> KSTREAM-WINDOWED-0000000008\n    Source: KSTREAM-SOURCE-0000000002 (topics: [topic3])\n      --> KSTREAM-WINDOWED-0000000017\n    Processor: KSTREAM-WINDOWED-0000000007 (stores: [KSTREAM-JOINTHIS-0000000009-store])\n      --> KSTREAM-JOINTHIS-0000000009\n      <-- KSTREAM-SOURCE-0000000006\n    Processor: KSTREAM-WINDOWED-0000000008 (stores: [KSTREAM-JOINOTHER-0000000010-store])\n      --> KSTREAM-JOINOTHER-0000000010\n      <-- KSTREAM-SOURCE-0000000001\n    Processor: KSTREAM-WINDOWED-0000000016 (stores: [KSTREAM-JOINTHIS-0000000018-store])\n      --> KSTREAM-JOINTHIS-0000000018\n      <-- KSTREAM-SOURCE-0000000006\n    Processor: KSTREAM-WINDOWED-0000000017 (stores: [KSTREAM-JOINOTHER-0000000019-store])\n      --> KSTREAM-JOINOTHER-0000000019\n      <-- KSTREAM-SOURCE-0000000002\n    Processor: KSTREAM-JOINOTHER-0000000010 (stores: [KSTREAM-JOINTHIS-0000000009-store])\n      --> KSTREAM-MERGE-0000000011\n      <-- KSTREAM-WINDOWED-0000000008\n    Processor: KSTREAM-JOINOTHER-0000000019 (stores: [KSTREAM-JOINTHIS-0000000018-store])\n      --> KSTREAM-MERGE-0000000020\n      <-- KSTREAM-WINDOWED-0000000017\n    Processor: KSTREAM-JOINTHIS-0000000009 (stores: [KSTREAM-JOINOTHER-0000000010-store])\n      --> KSTREAM-MERGE-0000000011\n      <-- KSTREAM-WINDOWED-0000000007\n    Processor: KSTREAM-JOINTHIS-0000000018 (stores: [KSTREAM-JOINOTHER-0000000019-store])\n      --> KSTREAM-MERGE-0000000020\n      <-- KSTREAM-WINDOWED-0000000016\n    Processor: KSTREAM-MERGE-0000000011 (stores: [])\n      --> KSTREAM-SINK-0000000012\n      <-- KSTREAM-JOINTHIS-0000000009, KSTREAM-JOINOTHER-0000000010\n    Processor: KSTREAM-MERGE-0000000020 (stores: [])\n      --> KSTREAM-SINK-0000000021\n      <-- KSTREAM-JOINTHIS-0000000018, KSTREAM-JOINOTHER-0000000019\n    Sink: KSTREAM-SINK-0000000012 (topic: out-one)\n      <-- KSTREAM-MERGE-0000000011\n    Sink: KSTREAM-SINK-0000000021 (topic: out-to)\n      <-- KSTREAM-MERGE-0000000020\n\n", streamsBuilder.build(properties).describe().toString());
    }

    @Test
    public void shouldCreateRepartitionTopicsWithUserProvidedName() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        Properties properties = new Properties();
        properties.put("topology.optimization", "none");
        KStream stream = streamsBuilder.stream(AssignmentTestUtils.TOPIC_PREFIX, Consumed.with(Serdes.String(), Serdes.String()));
        KStream stream2 = streamsBuilder.stream(AssignmentTestUtils.TP_2_NAME, Consumed.with(Serdes.String(), Serdes.String()));
        KStream stream3 = streamsBuilder.stream(AssignmentTestUtils.TP_3_NAME, Consumed.with(Serdes.String(), Serdes.String()));
        KStream map = stream.map((str, str2) -> {
            return new KeyValue(str2, str);
        });
        StreamJoined with = StreamJoined.with(Serdes.String(), Serdes.String(), Serdes.String());
        map.join(stream2, (str3, str4) -> {
            return str3 + str4;
        }, JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(100L)), with.withName("first-join")).to("out-one");
        map.join(stream3, (str5, str6) -> {
            return str5 + str6;
        }, JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(100L)), with.withName("second-join")).to("out-two");
        Topology build = streamsBuilder.build(properties);
        System.out.println(build.describe().toString());
        Assert.assertEquals("Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [topic])\n      --> KSTREAM-MAP-0000000003\n    Processor: KSTREAM-MAP-0000000003 (stores: [])\n      --> second-join-left-repartition-filter, first-join-left-repartition-filter\n      <-- KSTREAM-SOURCE-0000000000\n    Processor: first-join-left-repartition-filter (stores: [])\n      --> first-join-left-repartition-sink\n      <-- KSTREAM-MAP-0000000003\n    Processor: second-join-left-repartition-filter (stores: [])\n      --> second-join-left-repartition-sink\n      <-- KSTREAM-MAP-0000000003\n    Sink: first-join-left-repartition-sink (topic: first-join-left-repartition)\n      <-- first-join-left-repartition-filter\n    Sink: second-join-left-repartition-sink (topic: second-join-left-repartition)\n      <-- second-join-left-repartition-filter\n\n  Sub-topology: 1\n    Source: KSTREAM-SOURCE-0000000001 (topics: [topic2])\n      --> first-join-other-windowed\n    Source: first-join-left-repartition-source (topics: [first-join-left-repartition])\n      --> first-join-this-windowed\n    Processor: first-join-other-windowed (stores: [KSTREAM-JOINOTHER-0000000010-store])\n      --> first-join-other-join\n      <-- KSTREAM-SOURCE-0000000001\n    Processor: first-join-this-windowed (stores: [KSTREAM-JOINTHIS-0000000009-store])\n      --> first-join-this-join\n      <-- first-join-left-repartition-source\n    Processor: first-join-other-join (stores: [KSTREAM-JOINTHIS-0000000009-store])\n      --> first-join-merge\n      <-- first-join-other-windowed\n    Processor: first-join-this-join (stores: [KSTREAM-JOINOTHER-0000000010-store])\n      --> first-join-merge\n      <-- first-join-this-windowed\n    Processor: first-join-merge (stores: [])\n      --> KSTREAM-SINK-0000000012\n      <-- first-join-this-join, first-join-other-join\n    Sink: KSTREAM-SINK-0000000012 (topic: out-one)\n      <-- first-join-merge\n\n  Sub-topology: 2\n    Source: KSTREAM-SOURCE-0000000002 (topics: [topic3])\n      --> second-join-other-windowed\n    Source: second-join-left-repartition-source (topics: [second-join-left-repartition])\n      --> second-join-this-windowed\n    Processor: second-join-other-windowed (stores: [KSTREAM-JOINOTHER-0000000019-store])\n      --> second-join-other-join\n      <-- KSTREAM-SOURCE-0000000002\n    Processor: second-join-this-windowed (stores: [KSTREAM-JOINTHIS-0000000018-store])\n      --> second-join-this-join\n      <-- second-join-left-repartition-source\n    Processor: second-join-other-join (stores: [KSTREAM-JOINTHIS-0000000018-store])\n      --> second-join-merge\n      <-- second-join-other-windowed\n    Processor: second-join-this-join (stores: [KSTREAM-JOINOTHER-0000000019-store])\n      --> second-join-merge\n      <-- second-join-this-windowed\n    Processor: second-join-merge (stores: [])\n      --> KSTREAM-SINK-0000000021\n      <-- second-join-this-join, second-join-other-join\n    Sink: KSTREAM-SINK-0000000021 (topic: out-two)\n      <-- second-join-merge\n\n", build.describe().toString());
    }

    @Test
    public void shouldDisableLoggingOnStreamJoined() {
        JoinWindows ofTimeDifferenceAndGrace = JoinWindows.ofTimeDifferenceAndGrace(Duration.ofMillis(100L), Duration.ofMillis(50L));
        StreamJoined withLoggingDisabled = StreamJoined.with(Serdes.String(), Serdes.Integer(), Serdes.Integer()).withStoreName("store").withLoggingDisabled();
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream("left", Consumed.with(Serdes.String(), Serdes.Integer())).join(streamsBuilder.stream("right", Consumed.with(Serdes.String(), Serdes.Integer())), (v0, v1) -> {
            return Integer.sum(v0, v1);
        }, ofTimeDifferenceAndGrace, withLoggingDisabled);
        InternalTopologyBuilder internalTopologyBuilder = TopologyWrapper.getInternalTopologyBuilder(streamsBuilder.build());
        MatcherAssert.assertThat(Boolean.valueOf(((InternalTopologyBuilder.StateStoreFactory) internalTopologyBuilder.stateStores().get("store-this-join-store")).loggingEnabled()), CoreMatchers.equalTo(false));
        MatcherAssert.assertThat(Boolean.valueOf(((InternalTopologyBuilder.StateStoreFactory) internalTopologyBuilder.stateStores().get("store-other-join-store")).loggingEnabled()), CoreMatchers.equalTo(false));
    }

    @Test
    public void shouldEnableLoggingWithCustomConfigOnStreamJoined() {
        JoinWindows ofTimeDifferenceAndGrace = JoinWindows.ofTimeDifferenceAndGrace(Duration.ofMillis(100L), Duration.ofMillis(50L));
        StreamJoined withLoggingEnabled = StreamJoined.with(Serdes.String(), Serdes.Integer(), Serdes.Integer()).withStoreName("store").withLoggingEnabled(Collections.singletonMap("test", "property"));
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream("left", Consumed.with(Serdes.String(), Serdes.Integer())).join(streamsBuilder.stream("right", Consumed.with(Serdes.String(), Serdes.Integer())), (v0, v1) -> {
            return Integer.sum(v0, v1);
        }, ofTimeDifferenceAndGrace, withLoggingEnabled);
        InternalTopologyBuilder internalTopologyBuilder = TopologyWrapper.getInternalTopologyBuilder(streamsBuilder.build());
        internalTopologyBuilder.buildSubtopology(0);
        MatcherAssert.assertThat(Boolean.valueOf(((InternalTopologyBuilder.StateStoreFactory) internalTopologyBuilder.stateStores().get("store-this-join-store")).loggingEnabled()), CoreMatchers.equalTo(true));
        MatcherAssert.assertThat(Boolean.valueOf(((InternalTopologyBuilder.StateStoreFactory) internalTopologyBuilder.stateStores().get("store-other-join-store")).loggingEnabled()), CoreMatchers.equalTo(true));
        MatcherAssert.assertThat(Integer.valueOf(((InternalTopologyBuilder.TopicsInfo) internalTopologyBuilder.subtopologyToTopicsInfo().get(AssignmentTestUtils.SUBTOPOLOGY_0)).stateChangelogTopics.size()), CoreMatchers.equalTo(2));
        Iterator it = ((InternalTopologyBuilder.TopicsInfo) internalTopologyBuilder.subtopologyToTopicsInfo().get(AssignmentTestUtils.SUBTOPOLOGY_0)).stateChangelogTopics.values().iterator();
        while (it.hasNext()) {
            MatcherAssert.assertThat(((InternalTopicConfig) it.next()).getProperties(Collections.emptyMap(), 0L).get("test"), CoreMatchers.equalTo("property"));
        }
    }

    @Test
    public void shouldThrowExceptionThisStoreSupplierRetentionDoNotMatchWindowsSizeAndGrace() {
        WindowBytesStoreSupplier buildWindowBytesStoreSupplier = buildWindowBytesStoreSupplier("in-memory-join-store", 500L, 100L, true);
        buildStreamsJoinThatShouldThrow(this.streamJoined.withThisStoreSupplier(buildWindowBytesStoreSupplier).withOtherStoreSupplier(buildWindowBytesStoreSupplier("in-memory-join-store-other", 150L, 100L, true)), this.joinWindows, "Window settings mismatch. WindowBytesStoreSupplier settings");
    }

    @Test
    public void shouldThrowExceptionThisStoreSupplierWindowSizeDoesNotMatchJoinWindowsWindowSize() {
        WindowBytesStoreSupplier buildWindowBytesStoreSupplier = buildWindowBytesStoreSupplier("in-memory-join-store", 150L, 150L, true);
        buildStreamsJoinThatShouldThrow(this.streamJoined.withThisStoreSupplier(buildWindowBytesStoreSupplier).withOtherStoreSupplier(buildWindowBytesStoreSupplier("in-memory-join-store-other", 150L, 100L, true)), this.joinWindows, "Window settings mismatch. WindowBytesStoreSupplier settings");
    }

    @Test
    public void shouldThrowExceptionWhenThisJoinStoreSetsRetainDuplicatesFalse() {
        WindowBytesStoreSupplier buildWindowBytesStoreSupplier = buildWindowBytesStoreSupplier("in-memory-join-store", 150L, 100L, false);
        buildStreamsJoinThatShouldThrow(this.streamJoined.withThisStoreSupplier(buildWindowBytesStoreSupplier).withOtherStoreSupplier(buildWindowBytesStoreSupplier("in-memory-join-store-other", 150L, 100L, true)), this.joinWindows, "The StoreSupplier must set retainDuplicates=true, found retainDuplicates=false");
    }

    @Test
    public void shouldThrowExceptionOtherStoreSupplierRetentionDoNotMatchWindowsSizeAndGrace() {
        WindowBytesStoreSupplier buildWindowBytesStoreSupplier = buildWindowBytesStoreSupplier("in-memory-join-store", 150L, 100L, true);
        buildStreamsJoinThatShouldThrow(this.streamJoined.withThisStoreSupplier(buildWindowBytesStoreSupplier).withOtherStoreSupplier(buildWindowBytesStoreSupplier("in-memory-join-store-other", 500L, 100L, true)), this.joinWindows, "Window settings mismatch. WindowBytesStoreSupplier settings");
    }

    @Test
    public void shouldThrowExceptionOtherStoreSupplierWindowSizeDoesNotMatchJoinWindowsWindowSize() {
        WindowBytesStoreSupplier buildWindowBytesStoreSupplier = buildWindowBytesStoreSupplier("in-memory-join-store", 150L, 100L, true);
        buildStreamsJoinThatShouldThrow(this.streamJoined.withThisStoreSupplier(buildWindowBytesStoreSupplier).withOtherStoreSupplier(buildWindowBytesStoreSupplier("in-memory-join-store-other", 150L, 150L, true)), this.joinWindows, "Window settings mismatch. WindowBytesStoreSupplier settings");
    }

    @Test
    public void shouldThrowExceptionWhenOtherJoinStoreSetsRetainDuplicatesFalse() {
        WindowBytesStoreSupplier buildWindowBytesStoreSupplier = buildWindowBytesStoreSupplier("in-memory-join-store", 150L, 100L, true);
        buildStreamsJoinThatShouldThrow(this.streamJoined.withThisStoreSupplier(buildWindowBytesStoreSupplier).withOtherStoreSupplier(buildWindowBytesStoreSupplier("in-memory-join-store-other", 150L, 100L, false)), this.joinWindows, "The StoreSupplier must set retainDuplicates=true, found retainDuplicates=false");
    }

    @Test
    public void shouldBuildJoinWithCustomStoresAndCorrectWindowSettings() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream("left", Consumed.with(Serdes.String(), Serdes.Integer())).join(streamsBuilder.stream("right", Consumed.with(Serdes.String(), Serdes.Integer())), (v0, v1) -> {
            return Integer.sum(v0, v1);
        }, this.joinWindows, this.streamJoined);
        streamsBuilder.build();
    }

    @Test
    public void shouldExceptionWhenJoinStoresDoNotHaveUniqueNames() {
        JoinWindows ofTimeDifferenceAndGrace = JoinWindows.ofTimeDifferenceAndGrace(Duration.ofMillis(100L), Duration.ofMillis(50L));
        StreamJoined with = StreamJoined.with(Serdes.String(), Serdes.Integer(), Serdes.Integer());
        WindowBytesStoreSupplier buildWindowBytesStoreSupplier = buildWindowBytesStoreSupplier("in-memory-join-store", 150L, 100L, true);
        buildStreamsJoinThatShouldThrow(with.withThisStoreSupplier(buildWindowBytesStoreSupplier).withOtherStoreSupplier(buildWindowBytesStoreSupplier("in-memory-join-store", 150L, 100L, true)), ofTimeDifferenceAndGrace, "Both StoreSuppliers have the same name.  StoreSuppliers must provide unique names");
    }

    @Test
    public void shouldJoinWithCustomStoreSuppliers() {
        JoinWindows ofTimeDifferenceWithNoGrace = JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(100L));
        WindowBytesStoreSupplier inMemoryWindowStore = Stores.inMemoryWindowStore("in-memory-join-store", Duration.ofMillis(ofTimeDifferenceWithNoGrace.size() + ofTimeDifferenceWithNoGrace.gracePeriodMs()), Duration.ofMillis(ofTimeDifferenceWithNoGrace.size()), true);
        WindowBytesStoreSupplier inMemoryWindowStore2 = Stores.inMemoryWindowStore("in-memory-join-store-other", Duration.ofMillis(ofTimeDifferenceWithNoGrace.size() + ofTimeDifferenceWithNoGrace.gracePeriodMs()), Duration.ofMillis(ofTimeDifferenceWithNoGrace.size()), true);
        StreamJoined with = StreamJoined.with(Serdes.String(), Serdes.Integer(), Serdes.Integer());
        runJoin(with.withThisStoreSupplier(inMemoryWindowStore).withOtherStoreSupplier(inMemoryWindowStore2), ofTimeDifferenceWithNoGrace);
        runJoin(with.withThisStoreSupplier(inMemoryWindowStore), ofTimeDifferenceWithNoGrace);
        runJoin(with.withOtherStoreSupplier(inMemoryWindowStore2), ofTimeDifferenceWithNoGrace);
    }

    @Test
    public void shouldThrottleEmitNonJoinedOuterRecordsEvenWhenClockDrift() {
        Processor processor = new KStreamKStreamJoin(false, "other", new JoinWindowsInternal(JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(1000L))), (str, str2, str3) -> {
            return str2 + str3;
        }, true, Optional.of("outer"), new KStreamImplJoin.TimeTrackerSupplier()).get();
        MockInternalNewProcessorContext mockInternalNewProcessorContext = new MockInternalNewProcessorContext();
        WindowStore build = new WindowStoreBuilder(new InMemoryWindowBytesStoreSupplier("other", 1000L, 100L, false), Serdes.String(), Serdes.String(), new MockTime()).build();
        KeyValueStore keyValueStore = (KeyValueStore) Mockito.spy(new KeyValueStoreBuilder(new InMemoryKeyValueBytesStoreSupplier("outer"), new TimestampedKeyAndJoinSideSerde(Serdes.String()), new LeftOrRightValueSerde(Serdes.String(), Serdes.String()), new MockTime()).build());
        GenericInMemoryKeyValueStore genericInMemoryKeyValueStore = new GenericInMemoryKeyValueStore("root");
        build.init(mockInternalNewProcessorContext, genericInMemoryKeyValueStore);
        mockInternalNewProcessorContext.addStateStore(build);
        keyValueStore.init(mockInternalNewProcessorContext, genericInMemoryKeyValueStore);
        mockInternalNewProcessorContext.addStateStore(keyValueStore);
        processor.init(mockInternalNewProcessorContext);
        Record record = new Record("key1", "value1", 10000L);
        Record record2 = new Record("key2", "value2", 13000L);
        Record record3 = new Record("key3", "value3", 15000L);
        Record record4 = new Record("key4", "value4", 17000L);
        mockInternalNewProcessorContext.setSystemTimeMs(1000L);
        processor.process(record);
        mockInternalNewProcessorContext.setSystemTimeMs(2100L);
        processor.process(record2);
        mockInternalNewProcessorContext.setSystemTimeMs(2500L);
        processor.process(record3);
        Assert.assertEquals(2L, iteratorToList(keyValueStore.all()).size());
        mockInternalNewProcessorContext.setSystemTimeMs(4000L);
        processor.process(record4);
        Assert.assertEquals(1L, iteratorToList(keyValueStore.all()).size());
    }

    private <T> List<T> iteratorToList(Iterator<T> it) {
        return (List) StreamSupport.stream(Spliterators.spliteratorUnknownSize(it, 16), false).collect(Collectors.toList());
    }

    private void runJoin(StreamJoined<String, Integer, Integer> streamJoined, JoinWindows joinWindows) {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KStream stream = streamsBuilder.stream("left", Consumed.with(Serdes.String(), Serdes.Integer()));
        KStream stream2 = streamsBuilder.stream("right", Consumed.with(Serdes.String(), Serdes.Integer()));
        MockApiProcessorSupplier mockApiProcessorSupplier = new MockApiProcessorSupplier();
        stream.join(stream2, (v0, v1) -> {
            return Integer.sum(v0, v1);
        }, joinWindows, streamJoined).process(mockApiProcessorSupplier, new String[0]);
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build(), this.props);
        Throwable th = null;
        try {
            try {
                TestInputTopic createInputTopic = topologyTestDriver.createInputTopic("left", new StringSerializer(), new IntegerSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
                TestInputTopic createInputTopic2 = topologyTestDriver.createInputTopic("right", new StringSerializer(), new IntegerSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
                MockApiProcessor theCapturedProcessor = mockApiProcessorSupplier.theCapturedProcessor();
                createInputTopic.pipeInput("A", 1, 1L);
                createInputTopic.pipeInput("B", 1, 2L);
                createInputTopic2.pipeInput("A", 1, 1L);
                createInputTopic2.pipeInput("B", 2, 2L);
                theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp<>("A", 2, 1L), new KeyValueTimestamp<>("B", 3, 2L));
                if (topologyTestDriver != null) {
                    if (0 == 0) {
                        topologyTestDriver.close();
                        return;
                    }
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (topologyTestDriver != null) {
                if (th != null) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testJoin() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        int[] iArr = {0, 1, 2, 3};
        MockApiProcessorSupplier mockApiProcessorSupplier = new MockApiProcessorSupplier();
        streamsBuilder.stream(AssignmentTestUtils.TP_1_NAME, this.consumed).join(streamsBuilder.stream(AssignmentTestUtils.TP_2_NAME, this.consumed), MockValueJoiner.TOSTRING_JOINER, JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(100L)), StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String())).process(mockApiProcessorSupplier, new String[0]);
        Collection copartitionGroups = TopologyWrapper.getInternalTopologyBuilder(streamsBuilder.build()).copartitionGroups();
        Assert.assertEquals(1L, copartitionGroups.size());
        Assert.assertEquals(new HashSet(Arrays.asList(AssignmentTestUtils.TP_1_NAME, AssignmentTestUtils.TP_2_NAME)), copartitionGroups.iterator().next());
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build(), this.props);
        Throwable th = null;
        try {
            try {
                TestInputTopic createInputTopic = topologyTestDriver.createInputTopic(AssignmentTestUtils.TP_1_NAME, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
                TestInputTopic createInputTopic2 = topologyTestDriver.createInputTopic(AssignmentTestUtils.TP_2_NAME, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
                MockApiProcessor theCapturedProcessor = mockApiProcessorSupplier.theCapturedProcessor();
                for (int i = 0; i < 2; i++) {
                    createInputTopic.pipeInput(Integer.valueOf(iArr[i]), "A" + iArr[i]);
                }
                theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp[0]);
                for (int i2 = 0; i2 < 2; i2++) {
                    createInputTopic2.pipeInput(Integer.valueOf(iArr[i2]), "a" + iArr[i2]);
                }
                theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "A0+a0", 0L), new KeyValueTimestamp<>(1, "A1+a1", 0L));
                for (int i3 : iArr) {
                    createInputTopic.pipeInput(Integer.valueOf(i3), "B" + i3);
                }
                theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "B0+a0", 0L), new KeyValueTimestamp<>(1, "B1+a1", 0L));
                for (int i4 : iArr) {
                    createInputTopic2.pipeInput(Integer.valueOf(i4), "b" + i4);
                }
                theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "A0+b0", 0L), new KeyValueTimestamp<>(0, "B0+b0", 0L), new KeyValueTimestamp<>(1, "A1+b1", 0L), new KeyValueTimestamp<>(1, "B1+b1", 0L), new KeyValueTimestamp<>(2, "B2+b2", 0L), new KeyValueTimestamp<>(3, "B3+b3", 0L));
                for (int i5 : iArr) {
                    createInputTopic.pipeInput(Integer.valueOf(i5), "C" + i5);
                }
                theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "C0+a0", 0L), new KeyValueTimestamp<>(0, "C0+b0", 0L), new KeyValueTimestamp<>(1, "C1+a1", 0L), new KeyValueTimestamp<>(1, "C1+b1", 0L), new KeyValueTimestamp<>(2, "C2+b2", 0L), new KeyValueTimestamp<>(3, "C3+b3", 0L));
                for (int i6 = 0; i6 < 2; i6++) {
                    createInputTopic2.pipeInput(Integer.valueOf(iArr[i6]), "c" + iArr[i6]);
                }
                theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "A0+c0", 0L), new KeyValueTimestamp<>(0, "B0+c0", 0L), new KeyValueTimestamp<>(0, "C0+c0", 0L), new KeyValueTimestamp<>(1, "A1+c1", 0L), new KeyValueTimestamp<>(1, "B1+c1", 0L), new KeyValueTimestamp<>(1, "C1+c1", 0L));
                if (topologyTestDriver != null) {
                    if (0 == 0) {
                        topologyTestDriver.close();
                        return;
                    }
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (topologyTestDriver != null) {
                if (th != null) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testOuterJoin() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        int[] iArr = {0, 1, 2, 3};
        MockApiProcessorSupplier mockApiProcessorSupplier = new MockApiProcessorSupplier();
        streamsBuilder.stream(AssignmentTestUtils.TP_1_NAME, this.consumed).outerJoin(streamsBuilder.stream(AssignmentTestUtils.TP_2_NAME, this.consumed), MockValueJoiner.TOSTRING_JOINER, JoinWindows.ofTimeDifferenceAndGrace(Duration.ofMillis(100L), Duration.ofHours(24L)), StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String())).process(mockApiProcessorSupplier, new String[0]);
        Collection copartitionGroups = TopologyWrapper.getInternalTopologyBuilder(streamsBuilder.build()).copartitionGroups();
        Assert.assertEquals(1L, copartitionGroups.size());
        Assert.assertEquals(new HashSet(Arrays.asList(AssignmentTestUtils.TP_1_NAME, AssignmentTestUtils.TP_2_NAME)), copartitionGroups.iterator().next());
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build(), this.props);
        Throwable th = null;
        try {
            try {
                TestInputTopic createInputTopic = topologyTestDriver.createInputTopic(AssignmentTestUtils.TP_1_NAME, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
                TestInputTopic createInputTopic2 = topologyTestDriver.createInputTopic(AssignmentTestUtils.TP_2_NAME, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
                MockApiProcessor theCapturedProcessor = mockApiProcessorSupplier.theCapturedProcessor();
                for (int i = 0; i < 2; i++) {
                    createInputTopic.pipeInput(Integer.valueOf(iArr[i]), "A" + iArr[i]);
                }
                theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp[0]);
                for (int i2 = 0; i2 < 2; i2++) {
                    createInputTopic2.pipeInput(Integer.valueOf(iArr[i2]), "a" + iArr[i2]);
                }
                theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "A0+a0", 0L), new KeyValueTimestamp<>(1, "A1+a1", 0L));
                for (int i3 : iArr) {
                    createInputTopic.pipeInput(Integer.valueOf(i3), "B" + i3);
                }
                theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "B0+a0", 0L), new KeyValueTimestamp<>(1, "B1+a1", 0L));
                for (int i4 : iArr) {
                    createInputTopic2.pipeInput(Integer.valueOf(i4), "b" + i4);
                }
                theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "A0+b0", 0L), new KeyValueTimestamp<>(0, "B0+b0", 0L), new KeyValueTimestamp<>(1, "A1+b1", 0L), new KeyValueTimestamp<>(1, "B1+b1", 0L), new KeyValueTimestamp<>(2, "B2+b2", 0L), new KeyValueTimestamp<>(3, "B3+b3", 0L));
                for (int i5 : iArr) {
                    createInputTopic.pipeInput(Integer.valueOf(i5), "C" + i5);
                }
                theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "C0+a0", 0L), new KeyValueTimestamp<>(0, "C0+b0", 0L), new KeyValueTimestamp<>(1, "C1+a1", 0L), new KeyValueTimestamp<>(1, "C1+b1", 0L), new KeyValueTimestamp<>(2, "C2+b2", 0L), new KeyValueTimestamp<>(3, "C3+b3", 0L));
                for (int i6 = 0; i6 < 2; i6++) {
                    createInputTopic2.pipeInput(Integer.valueOf(iArr[i6]), "c" + iArr[i6]);
                }
                theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "A0+c0", 0L), new KeyValueTimestamp<>(0, "B0+c0", 0L), new KeyValueTimestamp<>(0, "C0+c0", 0L), new KeyValueTimestamp<>(1, "A1+c1", 0L), new KeyValueTimestamp<>(1, "B1+c1", 0L), new KeyValueTimestamp<>(1, "C1+c1", 0L));
                if (topologyTestDriver != null) {
                    if (0 == 0) {
                        topologyTestDriver.close();
                        return;
                    }
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (topologyTestDriver != null) {
                if (th != null) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testWindowing() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        int[] iArr = {0, 1, 2, 3};
        MockApiProcessorSupplier mockApiProcessorSupplier = new MockApiProcessorSupplier();
        streamsBuilder.stream(AssignmentTestUtils.TP_1_NAME, this.consumed).join(streamsBuilder.stream(AssignmentTestUtils.TP_2_NAME, this.consumed), MockValueJoiner.TOSTRING_JOINER, JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(100L)), StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String())).process(mockApiProcessorSupplier, new String[0]);
        Collection copartitionGroups = TopologyWrapper.getInternalTopologyBuilder(streamsBuilder.build()).copartitionGroups();
        Assert.assertEquals(1L, copartitionGroups.size());
        Assert.assertEquals(new HashSet(Arrays.asList(AssignmentTestUtils.TP_1_NAME, AssignmentTestUtils.TP_2_NAME)), copartitionGroups.iterator().next());
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build(), this.props);
        Throwable th = null;
        try {
            try {
                TestInputTopic createInputTopic = topologyTestDriver.createInputTopic(AssignmentTestUtils.TP_1_NAME, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
                TestInputTopic createInputTopic2 = topologyTestDriver.createInputTopic(AssignmentTestUtils.TP_2_NAME, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
                MockApiProcessor theCapturedProcessor = mockApiProcessorSupplier.theCapturedProcessor();
                for (int i = 0; i < 2; i++) {
                    createInputTopic.pipeInput(Integer.valueOf(iArr[i]), "A" + iArr[i], 0L);
                }
                theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp[0]);
                for (int i2 = 0; i2 < 2; i2++) {
                    createInputTopic2.pipeInput(Integer.valueOf(iArr[i2]), "a" + iArr[i2], 0L);
                }
                theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "A0+a0", 0L), new KeyValueTimestamp<>(1, "A1+a1", 0L));
                for (int i3 = 0; i3 < iArr.length; i3++) {
                    createInputTopic.pipeInput(Integer.valueOf(iArr[i3]), "B" + iArr[i3], 1000 + i3);
                }
                theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp[0]);
                long j = 1000 + 100;
                for (int i4 : iArr) {
                    createInputTopic2.pipeInput(Integer.valueOf(i4), "b" + i4, j);
                }
                theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "B0+b0", 1100L), new KeyValueTimestamp<>(1, "B1+b1", 1100L), new KeyValueTimestamp<>(2, "B2+b2", 1100L), new KeyValueTimestamp<>(3, "B3+b3", 1100L));
                long j2 = j + 1;
                for (int i5 : iArr) {
                    createInputTopic2.pipeInput(Integer.valueOf(i5), "c" + i5, j2);
                }
                theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp<>(1, "B1+c1", 1101L), new KeyValueTimestamp<>(2, "B2+c2", 1101L), new KeyValueTimestamp<>(3, "B3+c3", 1101L));
                long j3 = j2 + 1;
                for (int i6 : iArr) {
                    createInputTopic2.pipeInput(Integer.valueOf(i6), "d" + i6, j3);
                }
                theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp<>(2, "B2+d2", 1102L), new KeyValueTimestamp<>(3, "B3+d3", 1102L));
                long j4 = j3 + 1;
                for (int i7 : iArr) {
                    createInputTopic2.pipeInput(Integer.valueOf(i7), "e" + i7, j4);
                }
                theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp<>(3, "B3+e3", 1103L));
                long j5 = j4 + 1;
                for (int i8 : iArr) {
                    createInputTopic2.pipeInput(Integer.valueOf(i8), "f" + i8, j5);
                }
                theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp[0]);
                for (int i9 : iArr) {
                    createInputTopic2.pipeInput(Integer.valueOf(i9), "g" + i9, 899L);
                }
                theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp[0]);
                long j6 = 899 + 1;
                for (int i10 : iArr) {
                    createInputTopic2.pipeInput(Integer.valueOf(i10), "h" + i10, j6);
                }
                theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "B0+h0", 1000L));
                long j7 = j6 + 1;
                for (int i11 : iArr) {
                    createInputTopic2.pipeInput(Integer.valueOf(i11), "i" + i11, j7);
                }
                theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "B0+i0", 1000L), new KeyValueTimestamp<>(1, "B1+i1", 1001L));
                long j8 = j7 + 1;
                for (int i12 : iArr) {
                    createInputTopic2.pipeInput(Integer.valueOf(i12), "j" + i12, j8);
                }
                theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "B0+j0", 1000L), new KeyValueTimestamp<>(1, "B1+j1", 1001L), new KeyValueTimestamp<>(2, "B2+j2", 1002L));
                long j9 = j8 + 1;
                for (int i13 : iArr) {
                    createInputTopic2.pipeInput(Integer.valueOf(i13), "k" + i13, j9);
                }
                theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "B0+k0", 1000L), new KeyValueTimestamp<>(1, "B1+k1", 1001L), new KeyValueTimestamp<>(2, "B2+k2", 1002L), new KeyValueTimestamp<>(3, "B3+k3", 1003L));
                for (int i14 = 0; i14 < iArr.length; i14++) {
                    createInputTopic2.pipeInput(Integer.valueOf(iArr[i14]), "l" + iArr[i14], 2000 + i14);
                }
                theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp[0]);
                for (int i15 : iArr) {
                    createInputTopic.pipeInput(Integer.valueOf(i15), "C" + i15, 2100L);
                }
                theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "C0+l0", 2100L), new KeyValueTimestamp<>(1, "C1+l1", 2100L), new KeyValueTimestamp<>(2, "C2+l2", 2100L), new KeyValueTimestamp<>(3, "C3+l3", 2100L));
                long j10 = 2100 + 1;
                for (int i16 : iArr) {
                    createInputTopic.pipeInput(Integer.valueOf(i16), "D" + i16, j10);
                }
                theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp<>(1, "D1+l1", 2101L), new KeyValueTimestamp<>(2, "D2+l2", 2101L), new KeyValueTimestamp<>(3, "D3+l3", 2101L));
                long j11 = j10 + 1;
                for (int i17 : iArr) {
                    createInputTopic.pipeInput(Integer.valueOf(i17), "E" + i17, j11);
                }
                theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp<>(2, "E2+l2", 2102L), new KeyValueTimestamp<>(3, "E3+l3", 2102L));
                long j12 = j11 + 1;
                for (int i18 : iArr) {
                    createInputTopic.pipeInput(Integer.valueOf(i18), "F" + i18, j12);
                }
                theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp<>(3, "F3+l3", 2103L));
                long j13 = j12 + 1;
                for (int i19 : iArr) {
                    createInputTopic.pipeInput(Integer.valueOf(i19), "G" + i19, j13);
                }
                theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp[0]);
                for (int i20 : iArr) {
                    createInputTopic.pipeInput(Integer.valueOf(i20), "H" + i20, 1899L);
                }
                theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp[0]);
                long j14 = 1899 + 1;
                for (int i21 : iArr) {
                    createInputTopic.pipeInput(Integer.valueOf(i21), "I" + i21, j14);
                }
                theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "I0+l0", 2000L));
                long j15 = j14 + 1;
                for (int i22 : iArr) {
                    createInputTopic.pipeInput(Integer.valueOf(i22), "J" + i22, j15);
                }
                theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "J0+l0", 2000L), new KeyValueTimestamp<>(1, "J1+l1", 2001L));
                long j16 = j15 + 1;
                for (int i23 : iArr) {
                    createInputTopic.pipeInput(Integer.valueOf(i23), "K" + i23, j16);
                }
                theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "K0+l0", 2000L), new KeyValueTimestamp<>(1, "K1+l1", 2001L), new KeyValueTimestamp<>(2, "K2+l2", 2002L));
                long j17 = j16 + 1;
                for (int i24 : iArr) {
                    createInputTopic.pipeInput(Integer.valueOf(i24), "L" + i24, j17);
                }
                theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "L0+l0", 2000L), new KeyValueTimestamp<>(1, "L1+l1", 2001L), new KeyValueTimestamp<>(2, "L2+l2", 2002L), new KeyValueTimestamp<>(3, "L3+l3", 2003L));
                if (topologyTestDriver != null) {
                    if (0 == 0) {
                        topologyTestDriver.close();
                        return;
                    }
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (topologyTestDriver != null) {
                if (th != null) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testAsymmetricWindowingAfter() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        int[] iArr = {0, 1, 2, 3};
        MockApiProcessorSupplier mockApiProcessorSupplier = new MockApiProcessorSupplier();
        streamsBuilder.stream(AssignmentTestUtils.TP_1_NAME, this.consumed).join(streamsBuilder.stream(AssignmentTestUtils.TP_2_NAME, this.consumed), MockValueJoiner.TOSTRING_JOINER, JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(0L)).after(Duration.ofMillis(100L)), StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String())).process(mockApiProcessorSupplier, new String[0]);
        Collection copartitionGroups = TopologyWrapper.getInternalTopologyBuilder(streamsBuilder.build()).copartitionGroups();
        Assert.assertEquals(1L, copartitionGroups.size());
        Assert.assertEquals(new HashSet(Arrays.asList(AssignmentTestUtils.TP_1_NAME, AssignmentTestUtils.TP_2_NAME)), copartitionGroups.iterator().next());
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build(), this.props);
        Throwable th = null;
        try {
            try {
                TestInputTopic createInputTopic = topologyTestDriver.createInputTopic(AssignmentTestUtils.TP_1_NAME, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
                TestInputTopic createInputTopic2 = topologyTestDriver.createInputTopic(AssignmentTestUtils.TP_2_NAME, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
                MockApiProcessor theCapturedProcessor = mockApiProcessorSupplier.theCapturedProcessor();
                for (int i = 0; i < iArr.length; i++) {
                    createInputTopic.pipeInput(Integer.valueOf(iArr[i]), "A" + iArr[i], 1000 + i);
                }
                theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp[0]);
                for (int i2 : iArr) {
                    createInputTopic2.pipeInput(Integer.valueOf(i2), "a" + i2, 999L);
                }
                theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp[0]);
                long j = 999 + 1;
                for (int i3 : iArr) {
                    createInputTopic2.pipeInput(Integer.valueOf(i3), "b" + i3, j);
                }
                theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "A0+b0", 1000L));
                long j2 = j + 1;
                for (int i4 : iArr) {
                    createInputTopic2.pipeInput(Integer.valueOf(i4), "c" + i4, j2);
                }
                theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "A0+c0", 1001L), new KeyValueTimestamp<>(1, "A1+c1", 1001L));
                long j3 = j2 + 1;
                for (int i5 : iArr) {
                    createInputTopic2.pipeInput(Integer.valueOf(i5), "d" + i5, j3);
                }
                theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "A0+d0", 1002L), new KeyValueTimestamp<>(1, "A1+d1", 1002L), new KeyValueTimestamp<>(2, "A2+d2", 1002L));
                long j4 = j3 + 1;
                for (int i6 : iArr) {
                    createInputTopic2.pipeInput(Integer.valueOf(i6), "e" + i6, j4);
                }
                theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "A0+e0", 1003L), new KeyValueTimestamp<>(1, "A1+e1", 1003L), new KeyValueTimestamp<>(2, "A2+e2", 1003L), new KeyValueTimestamp<>(3, "A3+e3", 1003L));
                for (int i7 : iArr) {
                    createInputTopic2.pipeInput(Integer.valueOf(i7), "f" + i7, 1100L);
                }
                theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "A0+f0", 1100L), new KeyValueTimestamp<>(1, "A1+f1", 1100L), new KeyValueTimestamp<>(2, "A2+f2", 1100L), new KeyValueTimestamp<>(3, "A3+f3", 1100L));
                long j5 = 1100 + 1;
                for (int i8 : iArr) {
                    createInputTopic2.pipeInput(Integer.valueOf(i8), "g" + i8, j5);
                }
                theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp<>(1, "A1+g1", 1101L), new KeyValueTimestamp<>(2, "A2+g2", 1101L), new KeyValueTimestamp<>(3, "A3+g3", 1101L));
                long j6 = j5 + 1;
                for (int i9 : iArr) {
                    createInputTopic2.pipeInput(Integer.valueOf(i9), "h" + i9, j6);
                }
                theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp<>(2, "A2+h2", 1102L), new KeyValueTimestamp<>(3, "A3+h3", 1102L));
                long j7 = j6 + 1;
                for (int i10 : iArr) {
                    createInputTopic2.pipeInput(Integer.valueOf(i10), "i" + i10, j7);
                }
                theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp<>(3, "A3+i3", 1103L));
                long j8 = j7 + 1;
                for (int i11 : iArr) {
                    createInputTopic2.pipeInput(Integer.valueOf(i11), "j" + i11, j8);
                }
                theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp[0]);
                if (topologyTestDriver != null) {
                    if (0 == 0) {
                        topologyTestDriver.close();
                        return;
                    }
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (topologyTestDriver != null) {
                if (th != null) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testAsymmetricWindowingBefore() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        int[] iArr = {0, 1, 2, 3};
        MockApiProcessorSupplier mockApiProcessorSupplier = new MockApiProcessorSupplier();
        streamsBuilder.stream(AssignmentTestUtils.TP_1_NAME, this.consumed).join(streamsBuilder.stream(AssignmentTestUtils.TP_2_NAME, this.consumed), MockValueJoiner.TOSTRING_JOINER, JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(0L)).before(Duration.ofMillis(100L)), StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String())).process(mockApiProcessorSupplier, new String[0]);
        Collection copartitionGroups = TopologyWrapper.getInternalTopologyBuilder(streamsBuilder.build()).copartitionGroups();
        Assert.assertEquals(1L, copartitionGroups.size());
        Assert.assertEquals(new HashSet(Arrays.asList(AssignmentTestUtils.TP_1_NAME, AssignmentTestUtils.TP_2_NAME)), copartitionGroups.iterator().next());
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build(), this.props);
        Throwable th = null;
        try {
            try {
                TestInputTopic createInputTopic = topologyTestDriver.createInputTopic(AssignmentTestUtils.TP_1_NAME, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
                TestInputTopic createInputTopic2 = topologyTestDriver.createInputTopic(AssignmentTestUtils.TP_2_NAME, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
                MockApiProcessor theCapturedProcessor = mockApiProcessorSupplier.theCapturedProcessor();
                for (int i = 0; i < iArr.length; i++) {
                    createInputTopic.pipeInput(Integer.valueOf(iArr[i]), "A" + iArr[i], 1000 + i);
                }
                theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp[0]);
                for (int i2 : iArr) {
                    createInputTopic2.pipeInput(Integer.valueOf(i2), "a" + i2, 899L);
                }
                theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp[0]);
                long j = 899 + 1;
                for (int i3 : iArr) {
                    createInputTopic2.pipeInput(Integer.valueOf(i3), "b" + i3, j);
                }
                theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "A0+b0", 1000L));
                long j2 = j + 1;
                for (int i4 : iArr) {
                    createInputTopic2.pipeInput(Integer.valueOf(i4), "c" + i4, j2);
                }
                theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "A0+c0", 1000L), new KeyValueTimestamp<>(1, "A1+c1", 1001L));
                long j3 = j2 + 1;
                for (int i5 : iArr) {
                    createInputTopic2.pipeInput(Integer.valueOf(i5), "d" + i5, j3);
                }
                theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "A0+d0", 1000L), new KeyValueTimestamp<>(1, "A1+d1", 1001L), new KeyValueTimestamp<>(2, "A2+d2", 1002L));
                long j4 = j3 + 1;
                for (int i6 : iArr) {
                    createInputTopic2.pipeInput(Integer.valueOf(i6), "e" + i6, j4);
                }
                theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "A0+e0", 1000L), new KeyValueTimestamp<>(1, "A1+e1", 1001L), new KeyValueTimestamp<>(2, "A2+e2", 1002L), new KeyValueTimestamp<>(3, "A3+e3", 1003L));
                for (int i7 : iArr) {
                    createInputTopic2.pipeInput(Integer.valueOf(i7), "f" + i7, 1000L);
                }
                theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "A0+f0", 1000L), new KeyValueTimestamp<>(1, "A1+f1", 1001L), new KeyValueTimestamp<>(2, "A2+f2", 1002L), new KeyValueTimestamp<>(3, "A3+f3", 1003L));
                long j5 = 1000 + 1;
                for (int i8 : iArr) {
                    createInputTopic2.pipeInput(Integer.valueOf(i8), "g" + i8, j5);
                }
                theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp<>(1, "A1+g1", 1001L), new KeyValueTimestamp<>(2, "A2+g2", 1002L), new KeyValueTimestamp<>(3, "A3+g3", 1003L));
                long j6 = j5 + 1;
                for (int i9 : iArr) {
                    createInputTopic2.pipeInput(Integer.valueOf(i9), "h" + i9, j6);
                }
                theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp<>(2, "A2+h2", 1002L), new KeyValueTimestamp<>(3, "A3+h3", 1003L));
                long j7 = j6 + 1;
                for (int i10 : iArr) {
                    createInputTopic2.pipeInput(Integer.valueOf(i10), "i" + i10, j7);
                }
                theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp<>(3, "A3+i3", 1003L));
                long j8 = j7 + 1;
                for (int i11 : iArr) {
                    createInputTopic2.pipeInput(Integer.valueOf(i11), "j" + i11, j8);
                }
                theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp[0]);
                if (topologyTestDriver != null) {
                    if (0 == 0) {
                        topologyTestDriver.close();
                        return;
                    }
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (topologyTestDriver != null) {
                if (th != null) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th4;
        }
    }

    private void buildStreamsJoinThatShouldThrow(StreamJoined<String, Integer, Integer> streamJoined, JoinWindows joinWindows, String str) {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KStream stream = streamsBuilder.stream("left", Consumed.with(Serdes.String(), Serdes.Integer()));
        KStream stream2 = streamsBuilder.stream("right", Consumed.with(Serdes.String(), Serdes.Integer()));
        Assert.assertTrue(Assert.assertThrows(StreamsException.class, () -> {
            stream.join(stream2, (v0, v1) -> {
                return Integer.sum(v0, v1);
            }, joinWindows, streamJoined);
        }).getMessage().startsWith(str));
    }

    private WindowBytesStoreSupplier buildWindowBytesStoreSupplier(String str, long j, long j2, boolean z) {
        return Stores.inMemoryWindowStore(str, Duration.ofMillis(j), Duration.ofMillis(j2), z);
    }
}
