package org.apache.flink.streaming.runtime.operators.windowing;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.shaded.guava32.com.google.common.collect.Lists;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.runtime.operators.windowing.MergingWindowSet;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSetTest.class */
class MergingWindowSetTest {

    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSetTest$NonEagerlyMergingWindowAssigner.class */
    static class NonEagerlyMergingWindowAssigner extends MergingWindowAssigner<Object, TimeWindow> {
        private static final long serialVersionUID = 1;
        protected long sessionTimeout;

        public NonEagerlyMergingWindowAssigner(long j) {
            this.sessionTimeout = j;
        }

        public Collection<TimeWindow> assignWindows(Object obj, long j, WindowAssigner.WindowAssignerContext windowAssignerContext) {
            return Collections.singletonList(new TimeWindow(j, j + this.sessionTimeout));
        }

        public Trigger<Object, TimeWindow> getDefaultTrigger() {
            return EventTimeTrigger.create();
        }

        public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
            return new TimeWindow.Serializer();
        }

        public boolean isEventTime() {
            return true;
        }

        public void mergeWindows(Collection<TimeWindow> collection, MergingWindowAssigner.MergeCallback<TimeWindow> mergeCallback) {
            TimeWindow timeWindow = null;
            for (TimeWindow timeWindow2 : collection) {
                if (timeWindow == null) {
                    timeWindow = timeWindow2;
                } else if (timeWindow2.getStart() < timeWindow.getStart()) {
                    timeWindow = timeWindow2;
                }
            }
            ArrayList arrayList = new ArrayList();
            for (TimeWindow timeWindow3 : collection) {
                if (timeWindow3.getStart() < timeWindow.getEnd() && timeWindow3.getStart() >= timeWindow.getStart()) {
                    arrayList.add(timeWindow3);
                }
            }
            TimeWindow timeWindow4 = new TimeWindow(timeWindow.getStart(), timeWindow.getEnd() + serialVersionUID);
            if (arrayList.size() > 1) {
                mergeCallback.merge(arrayList, timeWindow4);
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSetTest$TestingMergeFunction.class */
    private static class TestingMergeFunction implements MergingWindowSet.MergeFunction<TimeWindow> {
        private TimeWindow target = null;
        private Collection<TimeWindow> sources = null;
        private TimeWindow stateWindow = null;
        private Collection<TimeWindow> mergedStateWindows = null;

        private TestingMergeFunction() {
        }

        public void reset() {
            this.target = null;
            this.sources = null;
            this.stateWindow = null;
            this.mergedStateWindows = null;
        }

        public boolean hasMerged() {
            return this.target != null;
        }

        public TimeWindow mergeTarget() {
            return this.target;
        }

        public Collection<TimeWindow> mergeSources() {
            return this.sources;
        }

        public TimeWindow stateWindow() {
            return this.stateWindow;
        }

        public Collection<TimeWindow> mergedStateWindows() {
            return this.mergedStateWindows;
        }

        public void merge(TimeWindow timeWindow, Collection<TimeWindow> collection, TimeWindow timeWindow2, Collection<TimeWindow> collection2) throws Exception {
            Assertions.assertThat(this.target).as("More than one merge for adding a Window should not occur.", new Object[0]).isNull();
            this.stateWindow = timeWindow2;
            this.target = timeWindow;
            this.mergedStateWindows = collection2;
            this.sources = collection;
        }

        public /* bridge */ /* synthetic */ void merge(Object obj, Collection collection, Object obj2, Collection collection2) throws Exception {
            merge((TimeWindow) obj, (Collection<TimeWindow>) collection, (TimeWindow) obj2, (Collection<TimeWindow>) collection2);
        }
    }

    MergingWindowSetTest() {
    }

    @Test
    void testNonEagerMerging() throws Exception {
        MergingWindowSet mergingWindowSet = new MergingWindowSet(new NonEagerlyMergingWindowAssigner(3000L), (ListState) Mockito.mock(ListState.class));
        TestingMergeFunction testingMergeFunction = new TestingMergeFunction();
        testingMergeFunction.reset();
        Assertions.assertThat(mergingWindowSet.getStateWindow(mergingWindowSet.addWindow(new TimeWindow(0L, 2L), testingMergeFunction))).isNotNull();
        testingMergeFunction.reset();
        Assertions.assertThat(mergingWindowSet.getStateWindow(mergingWindowSet.addWindow(new TimeWindow(2L, 5L), testingMergeFunction))).isNotNull();
        testingMergeFunction.reset();
        Assertions.assertThat(mergingWindowSet.getStateWindow(mergingWindowSet.addWindow(new TimeWindow(1L, 2L), testingMergeFunction))).isNotNull();
        testingMergeFunction.reset();
        Assertions.assertThat(mergingWindowSet.getStateWindow(mergingWindowSet.addWindow(new TimeWindow(10L, 12L), testingMergeFunction))).isNotNull();
    }

    @Test
    void testIncrementalMerging() throws Exception {
        MergingWindowSet mergingWindowSet = new MergingWindowSet(EventTimeSessionWindows.withGap(Duration.ofMillis(3L)), (ListState) Mockito.mock(ListState.class));
        TestingMergeFunction testingMergeFunction = new TestingMergeFunction();
        testingMergeFunction.reset();
        Assertions.assertThat(mergingWindowSet.addWindow(new TimeWindow(0L, 4L), testingMergeFunction)).isEqualTo(new TimeWindow(0L, 4L));
        Assertions.assertThat(testingMergeFunction.hasMerged()).isFalse();
        Assertions.assertThat(mergingWindowSet.getStateWindow(new TimeWindow(0L, 4L))).isEqualTo(new TimeWindow(0L, 4L));
        testingMergeFunction.reset();
        Assertions.assertThat(mergingWindowSet.addWindow(new TimeWindow(0L, 4L), testingMergeFunction)).isEqualTo(new TimeWindow(0L, 4L));
        Assertions.assertThat(testingMergeFunction.hasMerged()).isFalse();
        testingMergeFunction.reset();
        Assertions.assertThat(mergingWindowSet.addWindow(new TimeWindow(3L, 5L), testingMergeFunction)).isEqualTo(new TimeWindow(0L, 5L));
        Assertions.assertThat(testingMergeFunction.hasMerged()).isTrue();
        Assertions.assertThat(testingMergeFunction.mergeTarget()).isEqualTo(new TimeWindow(0L, 5L));
        Assertions.assertThat(testingMergeFunction.stateWindow()).isEqualTo(new TimeWindow(0L, 4L));
        Assertions.assertThat(testingMergeFunction.mergeSources()).containsExactly(new TimeWindow[]{new TimeWindow(0L, 4L)});
        Assertions.assertThat(testingMergeFunction.mergedStateWindows()).isEmpty();
        testingMergeFunction.reset();
        Assertions.assertThat(mergingWindowSet.addWindow(new TimeWindow(4L, 6L), testingMergeFunction)).isEqualTo(new TimeWindow(0L, 6L));
        Assertions.assertThat(testingMergeFunction.hasMerged()).isTrue();
        Assertions.assertThat(testingMergeFunction.mergeTarget()).isEqualTo(new TimeWindow(0L, 6L));
        Assertions.assertThat(testingMergeFunction.stateWindow()).isEqualTo(new TimeWindow(0L, 4L));
        Assertions.assertThat(testingMergeFunction.mergeSources()).containsExactly(new TimeWindow[]{new TimeWindow(0L, 5L)});
        Assertions.assertThat(testingMergeFunction.mergedStateWindows()).isEmpty();
        Assertions.assertThat(mergingWindowSet.getStateWindow(new TimeWindow(0L, 6L))).isEqualTo(new TimeWindow(0L, 4L));
        testingMergeFunction.reset();
        Assertions.assertThat(mergingWindowSet.addWindow(new TimeWindow(1L, 4L), testingMergeFunction)).isEqualTo(new TimeWindow(0L, 6L));
        Assertions.assertThat(testingMergeFunction.hasMerged()).isFalse();
        testingMergeFunction.reset();
        Assertions.assertThat(mergingWindowSet.addWindow(new TimeWindow(0L, 4L), testingMergeFunction)).isEqualTo(new TimeWindow(0L, 6L));
        Assertions.assertThat(testingMergeFunction.hasMerged()).isFalse();
        testingMergeFunction.reset();
        Assertions.assertThat(mergingWindowSet.addWindow(new TimeWindow(3L, 5L), testingMergeFunction)).isEqualTo(new TimeWindow(0L, 6L));
        Assertions.assertThat(testingMergeFunction.hasMerged()).isFalse();
        testingMergeFunction.reset();
        Assertions.assertThat(mergingWindowSet.addWindow(new TimeWindow(4L, 6L), testingMergeFunction)).isEqualTo(new TimeWindow(0L, 6L));
        Assertions.assertThat(testingMergeFunction.hasMerged()).isFalse();
        Assertions.assertThat(mergingWindowSet.getStateWindow(new TimeWindow(0L, 6L))).isEqualTo(new TimeWindow(0L, 4L));
        testingMergeFunction.reset();
        Assertions.assertThat(mergingWindowSet.addWindow(new TimeWindow(11L, 14L), testingMergeFunction)).isEqualTo(new TimeWindow(11L, 14L));
        Assertions.assertThat(testingMergeFunction.hasMerged()).isFalse();
        Assertions.assertThat(mergingWindowSet.getStateWindow(new TimeWindow(0L, 6L))).isEqualTo(new TimeWindow(0L, 4L));
        Assertions.assertThat(mergingWindowSet.getStateWindow(new TimeWindow(11L, 14L))).isEqualTo(new TimeWindow(11L, 14L));
        testingMergeFunction.reset();
        Assertions.assertThat(mergingWindowSet.addWindow(new TimeWindow(10L, 13L), testingMergeFunction)).isEqualTo(new TimeWindow(10L, 14L));
        Assertions.assertThat(testingMergeFunction.hasMerged()).isTrue();
        Assertions.assertThat(testingMergeFunction.mergeTarget()).isEqualTo(new TimeWindow(10L, 14L));
        Assertions.assertThat(testingMergeFunction.stateWindow()).isEqualTo(new TimeWindow(11L, 14L));
        Assertions.assertThat(testingMergeFunction.mergeSources()).containsExactly(new TimeWindow[]{new TimeWindow(11L, 14L)});
        Assertions.assertThat(testingMergeFunction.mergedStateWindows()).isEmpty();
        testingMergeFunction.reset();
        Assertions.assertThat(mergingWindowSet.addWindow(new TimeWindow(12L, 15L), testingMergeFunction)).isEqualTo(new TimeWindow(10L, 15L));
        Assertions.assertThat(testingMergeFunction.hasMerged()).isTrue();
        Assertions.assertThat(testingMergeFunction.mergeTarget()).isEqualTo(new TimeWindow(10L, 15L));
        Assertions.assertThat(testingMergeFunction.stateWindow()).isEqualTo(new TimeWindow(11L, 14L));
        Assertions.assertThat(testingMergeFunction.mergeSources()).containsExactly(new TimeWindow[]{new TimeWindow(10L, 14L)});
        Assertions.assertThat(testingMergeFunction.mergedStateWindows()).isEmpty();
        testingMergeFunction.reset();
        Assertions.assertThat(mergingWindowSet.addWindow(new TimeWindow(11L, 14L), testingMergeFunction)).isEqualTo(new TimeWindow(10L, 15L));
        Assertions.assertThat(testingMergeFunction.hasMerged()).isFalse();
        Assertions.assertThat(mergingWindowSet.getStateWindow(new TimeWindow(0L, 6L))).isEqualTo(new TimeWindow(0L, 4L));
        Assertions.assertThat(mergingWindowSet.getStateWindow(new TimeWindow(10L, 15L))).isEqualTo(new TimeWindow(11L, 14L));
        mergingWindowSet.retireWindow(new TimeWindow(0L, 6L));
        Assertions.assertThat(mergingWindowSet.getStateWindow(new TimeWindow(0L, 6L))).isNull();
        Assertions.assertThat(mergingWindowSet.getStateWindow(new TimeWindow(10L, 15L))).isEqualTo(new TimeWindow(11L, 14L));
    }

    @Test
    void testLateMerging() throws Exception {
        MergingWindowSet mergingWindowSet = new MergingWindowSet(EventTimeSessionWindows.withGap(Duration.ofMillis(3L)), (ListState) Mockito.mock(ListState.class));
        TestingMergeFunction testingMergeFunction = new TestingMergeFunction();
        testingMergeFunction.reset();
        Assertions.assertThat(mergingWindowSet.addWindow(new TimeWindow(0L, 3L), testingMergeFunction)).isEqualTo(new TimeWindow(0L, 3L));
        Assertions.assertThat(testingMergeFunction.hasMerged()).isFalse();
        Assertions.assertThat(mergingWindowSet.getStateWindow(new TimeWindow(0L, 3L))).isEqualTo(new TimeWindow(0L, 3L));
        testingMergeFunction.reset();
        Assertions.assertThat(mergingWindowSet.addWindow(new TimeWindow(5L, 8L), testingMergeFunction)).isEqualTo(new TimeWindow(5L, 8L));
        Assertions.assertThat(testingMergeFunction.hasMerged()).isFalse();
        Assertions.assertThat(mergingWindowSet.getStateWindow(new TimeWindow(5L, 8L))).isEqualTo(new TimeWindow(5L, 8L));
        testingMergeFunction.reset();
        Assertions.assertThat(mergingWindowSet.addWindow(new TimeWindow(10L, 13L), testingMergeFunction)).isEqualTo(new TimeWindow(10L, 13L));
        Assertions.assertThat(testingMergeFunction.hasMerged()).isFalse();
        Assertions.assertThat(mergingWindowSet.getStateWindow(new TimeWindow(10L, 13L))).isEqualTo(new TimeWindow(10L, 13L));
        testingMergeFunction.reset();
        Assertions.assertThat(mergingWindowSet.addWindow(new TimeWindow(8L, 10L), testingMergeFunction)).isEqualTo(new TimeWindow(5L, 13L));
        Assertions.assertThat(testingMergeFunction.hasMerged()).isTrue();
        Assertions.assertThat(testingMergeFunction.mergeTarget()).isEqualTo(new TimeWindow(5L, 13L));
        Assertions.assertThat(testingMergeFunction.stateWindow()).satisfiesAnyOf(new ThrowingConsumer[]{timeWindow -> {
            Assertions.assertThat(timeWindow).isEqualTo(new TimeWindow(5L, 8L));
        }, timeWindow2 -> {
            Assertions.assertThat(timeWindow2).isEqualTo(new TimeWindow(10L, 13L));
        }});
        Assertions.assertThat(testingMergeFunction.mergeSources()).containsExactlyInAnyOrder(new TimeWindow[]{new TimeWindow(5L, 8L), new TimeWindow(10L, 13L)});
        Assertions.assertThat(testingMergeFunction.mergedStateWindows()).containsAnyOf(new TimeWindow[]{new TimeWindow(5L, 8L), new TimeWindow(10L, 13L)});
        Assertions.assertThat(testingMergeFunction.mergedStateWindows().toArray()).satisfiesAnyOf(new ThrowingConsumer[]{objArr -> {
            Assertions.assertThat(objArr).containsExactly(new Object[]{new TimeWindow(10L, 13L)});
        }, objArr2 -> {
            Assertions.assertThat(objArr2).containsExactly(new Object[]{new TimeWindow(5L, 8L)});
        }});
        Assertions.assertThat(testingMergeFunction.mergedStateWindows()).doesNotContain(new TimeWindow[]{testingMergeFunction.mergeTarget()});
        Assertions.assertThat(mergingWindowSet.getStateWindow(new TimeWindow(0L, 3L))).isEqualTo(new TimeWindow(0L, 3L));
        testingMergeFunction.reset();
        Assertions.assertThat(mergingWindowSet.addWindow(new TimeWindow(5L, 8L), testingMergeFunction)).isEqualTo(new TimeWindow(5L, 13L));
        Assertions.assertThat(testingMergeFunction.hasMerged()).isFalse();
        testingMergeFunction.reset();
        Assertions.assertThat(mergingWindowSet.addWindow(new TimeWindow(8L, 10L), testingMergeFunction)).isEqualTo(new TimeWindow(5L, 13L));
        Assertions.assertThat(testingMergeFunction.hasMerged()).isFalse();
        testingMergeFunction.reset();
        Assertions.assertThat(mergingWindowSet.addWindow(new TimeWindow(10L, 13L), testingMergeFunction)).isEqualTo(new TimeWindow(5L, 13L));
        Assertions.assertThat(testingMergeFunction.hasMerged()).isFalse();
        Assertions.assertThat(mergingWindowSet.getStateWindow(new TimeWindow(5L, 13L))).isIn(new Object[]{new TimeWindow(5L, 8L), new TimeWindow(10L, 13L)});
        testingMergeFunction.reset();
        Assertions.assertThat(mergingWindowSet.addWindow(new TimeWindow(3L, 5L), testingMergeFunction)).isEqualTo(new TimeWindow(0L, 13L));
        Assertions.assertThat(testingMergeFunction.hasMerged()).isTrue();
        Assertions.assertThat(testingMergeFunction.mergeTarget()).isEqualTo(new TimeWindow(0L, 13L));
        Assertions.assertThat(testingMergeFunction.stateWindow()).isIn(new Object[]{new TimeWindow(0L, 3L), new TimeWindow(5L, 8L), new TimeWindow(10L, 13L)});
        Assertions.assertThat(testingMergeFunction.mergeSources()).containsExactlyInAnyOrder(new TimeWindow[]{new TimeWindow(0L, 3L), new TimeWindow(5L, 13L)});
        Assertions.assertThat(testingMergeFunction.mergedStateWindows().toArray()).satisfiesAnyOf(new ThrowingConsumer[]{objArr3 -> {
            Assertions.assertThat(objArr3).containsExactly(new Object[]{new TimeWindow(0L, 3L)});
        }, objArr4 -> {
            Assertions.assertThat(objArr4).containsExactly(new Object[]{new TimeWindow(5L, 8L)});
        }, objArr5 -> {
            Assertions.assertThat(objArr5).containsExactly(new Object[]{new TimeWindow(10L, 13L)});
        }});
        Assertions.assertThat(testingMergeFunction.mergedStateWindows()).doesNotContain(new TimeWindow[]{testingMergeFunction.mergeTarget()});
        Assertions.assertThat(mergingWindowSet.getStateWindow(new TimeWindow(0L, 13L))).isIn(new Object[]{new TimeWindow(0L, 3L), new TimeWindow(5L, 8L), new TimeWindow(10L, 13L)});
    }

    @Test
    void testMergeLargeWindowCoveringSingleWindow() throws Exception {
        MergingWindowSet mergingWindowSet = new MergingWindowSet(EventTimeSessionWindows.withGap(Duration.ofMillis(3L)), (ListState) Mockito.mock(ListState.class));
        TestingMergeFunction testingMergeFunction = new TestingMergeFunction();
        testingMergeFunction.reset();
        Assertions.assertThat(mergingWindowSet.addWindow(new TimeWindow(1L, 2L), testingMergeFunction)).isEqualTo(new TimeWindow(1L, 2L));
        Assertions.assertThat(testingMergeFunction.hasMerged()).isFalse();
        Assertions.assertThat(mergingWindowSet.getStateWindow(new TimeWindow(1L, 2L))).isEqualTo(new TimeWindow(1L, 2L));
        testingMergeFunction.reset();
        Assertions.assertThat(mergingWindowSet.addWindow(new TimeWindow(0L, 3L), testingMergeFunction)).isEqualTo(new TimeWindow(0L, 3L));
        Assertions.assertThat(testingMergeFunction.hasMerged()).isTrue();
        Assertions.assertThat(mergingWindowSet.getStateWindow(new TimeWindow(0L, 3L))).isEqualTo(new TimeWindow(1L, 2L));
    }

    @Test
    void testAddingIdenticalWindows() throws Exception {
        MergingWindowSet mergingWindowSet = new MergingWindowSet(EventTimeSessionWindows.withGap(Duration.ofMillis(3L)), (ListState) Mockito.mock(ListState.class));
        TestingMergeFunction testingMergeFunction = new TestingMergeFunction();
        testingMergeFunction.reset();
        Assertions.assertThat(mergingWindowSet.addWindow(new TimeWindow(1L, 2L), testingMergeFunction)).isEqualTo(new TimeWindow(1L, 2L));
        Assertions.assertThat(testingMergeFunction.hasMerged()).isFalse();
        Assertions.assertThat(mergingWindowSet.getStateWindow(new TimeWindow(1L, 2L))).isEqualTo(new TimeWindow(1L, 2L));
        testingMergeFunction.reset();
        Assertions.assertThat(mergingWindowSet.addWindow(new TimeWindow(1L, 2L), testingMergeFunction)).isEqualTo(new TimeWindow(1L, 2L));
        Assertions.assertThat(testingMergeFunction.hasMerged()).isFalse();
        Assertions.assertThat(mergingWindowSet.getStateWindow(new TimeWindow(1L, 2L))).isEqualTo(new TimeWindow(1L, 2L));
    }

    @Test
    void testMergeLargeWindowCoveringMultipleWindows() throws Exception {
        MergingWindowSet mergingWindowSet = new MergingWindowSet(EventTimeSessionWindows.withGap(Duration.ofMillis(3L)), (ListState) Mockito.mock(ListState.class));
        TestingMergeFunction testingMergeFunction = new TestingMergeFunction();
        testingMergeFunction.reset();
        Assertions.assertThat(mergingWindowSet.addWindow(new TimeWindow(1L, 3L), testingMergeFunction)).isEqualTo(new TimeWindow(1L, 3L));
        Assertions.assertThat(testingMergeFunction.hasMerged()).isFalse();
        Assertions.assertThat(mergingWindowSet.getStateWindow(new TimeWindow(1L, 3L))).isEqualTo(new TimeWindow(1L, 3L));
        testingMergeFunction.reset();
        Assertions.assertThat(mergingWindowSet.addWindow(new TimeWindow(5L, 8L), testingMergeFunction)).isEqualTo(new TimeWindow(5L, 8L));
        Assertions.assertThat(testingMergeFunction.hasMerged()).isFalse();
        Assertions.assertThat(mergingWindowSet.getStateWindow(new TimeWindow(5L, 8L))).isEqualTo(new TimeWindow(5L, 8L));
        testingMergeFunction.reset();
        Assertions.assertThat(mergingWindowSet.addWindow(new TimeWindow(10L, 13L), testingMergeFunction)).isEqualTo(new TimeWindow(10L, 13L));
        Assertions.assertThat(testingMergeFunction.hasMerged()).isFalse();
        Assertions.assertThat(mergingWindowSet.getStateWindow(new TimeWindow(10L, 13L))).isEqualTo(new TimeWindow(10L, 13L));
        testingMergeFunction.reset();
        Assertions.assertThat(mergingWindowSet.addWindow(new TimeWindow(0L, 13L), testingMergeFunction)).isEqualTo(new TimeWindow(0L, 13L));
        Assertions.assertThat(testingMergeFunction.hasMerged()).isTrue();
        Assertions.assertThat(testingMergeFunction.mergedStateWindows().toArray()).satisfiesAnyOf(new ThrowingConsumer[]{objArr -> {
            Assertions.assertThat(objArr).containsExactlyInAnyOrder(new Object[]{new TimeWindow(0L, 3L), new TimeWindow(5L, 8L)});
        }, objArr2 -> {
            Assertions.assertThat(objArr2).containsExactlyInAnyOrder(new Object[]{new TimeWindow(0L, 3L), new TimeWindow(10L, 13L)});
        }, objArr3 -> {
            Assertions.assertThat(objArr3).containsExactlyInAnyOrder(new Object[]{new TimeWindow(5L, 8L), new TimeWindow(10L, 13L)});
        }});
        Assertions.assertThat(mergingWindowSet.getStateWindow(new TimeWindow(0L, 13L))).isIn(new Object[]{new TimeWindow(1L, 3L), new TimeWindow(5L, 8L), new TimeWindow(10L, 13L)});
    }

    @Test
    void testRestoreFromState() throws Exception {
        ListState listState = (ListState) Mockito.mock(ListState.class);
        Mockito.when((Iterable) listState.get()).thenReturn(Lists.newArrayList(new Tuple2[]{new Tuple2(new TimeWindow(17L, 42L), new TimeWindow(42L, 17L)), new Tuple2(new TimeWindow(1L, 2L), new TimeWindow(3L, 4L))}));
        MergingWindowSet mergingWindowSet = new MergingWindowSet(EventTimeSessionWindows.withGap(Duration.ofMillis(3L)), listState);
        Assertions.assertThat(mergingWindowSet.getStateWindow(new TimeWindow(17L, 42L))).isEqualTo(new TimeWindow(42L, 17L));
        Assertions.assertThat(mergingWindowSet.getStateWindow(new TimeWindow(1L, 2L))).isEqualTo(new TimeWindow(3L, 4L));
    }

    @Test
    void testPersist() throws Exception {
        ListState listState = (ListState) Mockito.mock(ListState.class);
        MergingWindowSet mergingWindowSet = new MergingWindowSet(EventTimeSessionWindows.withGap(Duration.ofMillis(3L)), listState);
        TestingMergeFunction testingMergeFunction = new TestingMergeFunction();
        mergingWindowSet.addWindow(new TimeWindow(1L, 2L), testingMergeFunction);
        mergingWindowSet.addWindow(new TimeWindow(17L, 42L), testingMergeFunction);
        Assertions.assertThat(mergingWindowSet.getStateWindow(new TimeWindow(1L, 2L))).isEqualTo(new TimeWindow(1L, 2L));
        Assertions.assertThat(mergingWindowSet.getStateWindow(new TimeWindow(17L, 42L))).isEqualTo(new TimeWindow(17L, 42L));
        mergingWindowSet.persist();
        ((ListState) Mockito.verify(listState)).update((List) ArgumentMatchers.eq(new ArrayList(Arrays.asList(new Tuple2(new TimeWindow(1L, 2L), new TimeWindow(1L, 2L)), new Tuple2(new TimeWindow(17L, 42L), new TimeWindow(17L, 42L))))));
        ((ListState) Mockito.verify(listState, Mockito.times(1))).update((List) ArgumentMatchers.any());
    }

    @Test
    void testPersistOnlyIfHaveUpdates() throws Exception {
        ListState listState = (ListState) Mockito.mock(ListState.class);
        Mockito.when((Iterable) listState.get()).thenReturn(Lists.newArrayList(new Tuple2[]{new Tuple2(new TimeWindow(17L, 42L), new TimeWindow(42L, 17L)), new Tuple2(new TimeWindow(1L, 2L), new TimeWindow(3L, 4L))}));
        MergingWindowSet mergingWindowSet = new MergingWindowSet(EventTimeSessionWindows.withGap(Duration.ofMillis(3L)), listState);
        Assertions.assertThat(mergingWindowSet.getStateWindow(new TimeWindow(17L, 42L))).isEqualTo(new TimeWindow(42L, 17L));
        Assertions.assertThat(mergingWindowSet.getStateWindow(new TimeWindow(1L, 2L))).isEqualTo(new TimeWindow(3L, 4L));
        mergingWindowSet.persist();
        ((ListState) Mockito.verify(listState, Mockito.times(0))).add((Tuple2) ArgumentMatchers.any());
    }
}
