package org.apache.flink.streaming.api.operators;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
import org.apache.flink.runtime.state.PriorityQueueSetFactory;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTaskCancellationContext;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

@ExtendWith({ParameterizedTestExtension.class})
/* loaded from: input_file:org/apache/flink/streaming/api/operators/InternalTimerServiceImplTest.class */
class InternalTimerServiceImplTest {
    private final int maxParallelism;
    private final KeyGroupRange testKeyGroupRange;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/api/operators/InternalTimerServiceImplTest$TestKeyContext.class */
    public static class TestKeyContext implements KeyContext {
        private Object key;

        private TestKeyContext() {
        }

        public void setCurrentKey(Object obj) {
            this.key = obj;
        }

        public Object getCurrentKey() {
            return this.key;
        }
    }

    private static InternalTimer<Integer, String> anyInternalTimer() {
        return (InternalTimer) Mockito.any();
    }

    InternalTimerServiceImplTest(int i, int i2, int i3) {
        this.testKeyGroupRange = new KeyGroupRange(i, i2);
        this.maxParallelism = i3;
    }

    @TestTemplate
    void testKeyGroupStartIndexSetting() {
        Assertions.assertThat(createInternalTimerService(UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup(), new KeyGroupRange(7, 21), new TestKeyContext(), new TestProcessingTimeService(), IntSerializer.INSTANCE, StringSerializer.INSTANCE, createQueueFactory()).getLocalKeyGroupRangeStartIdx()).isEqualTo(7);
    }

    @TestTemplate
    void testTimerAssignmentToKeyGroups() {
        HashSet[] hashSetArr = new HashSet[100];
        TestKeyContext testKeyContext = new TestKeyContext();
        KeyGroupRange keyGroupRange = new KeyGroupRange(0, 100 - 1);
        InternalTimerServiceImpl createInternalTimerService = createInternalTimerService(UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup(), keyGroupRange, testKeyContext, new TestProcessingTimeService(), IntSerializer.INSTANCE, StringSerializer.INSTANCE, createQueueFactory(keyGroupRange, 100));
        createInternalTimerService.startTimerService(IntSerializer.INSTANCE, StringSerializer.INSTANCE, (Triggerable) Mockito.mock(Triggerable.class));
        for (int i = 0; i < 100; i++) {
            TimerHeapInternalTimer timerHeapInternalTimer = new TimerHeapInternalTimer(10 + i, Integer.valueOf(i), "hello_world_" + i);
            int assignToKeyGroup = KeyGroupRangeAssignment.assignToKeyGroup(timerHeapInternalTimer.getKey(), 100);
            HashSet hashSet = hashSetArr[assignToKeyGroup];
            if (hashSet == null) {
                hashSet = new HashSet();
                hashSetArr[assignToKeyGroup] = hashSet;
            }
            hashSet.add(timerHeapInternalTimer);
            testKeyContext.setCurrentKey(timerHeapInternalTimer.getKey());
            createInternalTimerService.registerEventTimeTimer((String) timerHeapInternalTimer.getNamespace(), timerHeapInternalTimer.getTimestamp());
            createInternalTimerService.registerProcessingTimeTimer((String) timerHeapInternalTimer.getNamespace(), timerHeapInternalTimer.getTimestamp());
        }
        List eventTimeTimersPerKeyGroup = createInternalTimerService.getEventTimeTimersPerKeyGroup();
        List processingTimeTimersPerKeyGroup = createInternalTimerService.getProcessingTimeTimersPerKeyGroup();
        for (int i2 = 0; i2 < hashSetArr.length; i2++) {
            HashSet hashSet2 = hashSetArr[i2];
            Set set = (Set) eventTimeTimersPerKeyGroup.get(i2);
            Set set2 = (Set) processingTimeTimersPerKeyGroup.get(i2);
            if (hashSet2 == null) {
                Assertions.assertThat(set).isEmpty();
                Assertions.assertThat(set2).isEmpty();
            } else {
                Assertions.assertThat(set).isEqualTo(hashSet2);
                Assertions.assertThat(set2).isEqualTo(hashSet2);
            }
        }
    }

    @TestTemplate
    void testOnlySetsOnePhysicalProcessingTimeTimer() throws Exception {
        Triggerable triggerable = (Triggerable) Mockito.mock(Triggerable.class);
        TestKeyContext testKeyContext = new TestKeyContext();
        TestProcessingTimeService testProcessingTimeService = new TestProcessingTimeService();
        InternalTimerServiceImpl<Integer, String> createAndStartInternalTimerService = createAndStartInternalTimerService(triggerable, testKeyContext, testProcessingTimeService, this.testKeyGroupRange, new HeapPriorityQueueSetFactory(this.testKeyGroupRange, this.maxParallelism, 128));
        testKeyContext.setCurrentKey(Integer.valueOf(getKeyInKeyGroupRange(this.testKeyGroupRange, this.maxParallelism)));
        createAndStartInternalTimerService.registerProcessingTimeTimer("ciao", 10L);
        createAndStartInternalTimerService.registerProcessingTimeTimer("ciao", 20L);
        createAndStartInternalTimerService.registerProcessingTimeTimer("ciao", 30L);
        createAndStartInternalTimerService.registerProcessingTimeTimer("hello", 10L);
        createAndStartInternalTimerService.registerProcessingTimeTimer("hello", 20L);
        Assertions.assertThat(createAndStartInternalTimerService.numProcessingTimeTimers()).isEqualTo(5);
        Assertions.assertThat(createAndStartInternalTimerService.numProcessingTimeTimers("hello")).isEqualTo(2);
        Assertions.assertThat(createAndStartInternalTimerService.numProcessingTimeTimers("ciao")).isEqualTo(3);
        Assertions.assertThat(testProcessingTimeService.getNumActiveTimers()).isOne();
        Assertions.assertThat(testProcessingTimeService.getActiveTimerTimestamps()).contains(new Long[]{10L});
        testProcessingTimeService.setCurrentTime(10L);
        Assertions.assertThat(createAndStartInternalTimerService.numProcessingTimeTimers()).isEqualTo(3);
        Assertions.assertThat(createAndStartInternalTimerService.numProcessingTimeTimers("hello")).isOne();
        Assertions.assertThat(createAndStartInternalTimerService.numProcessingTimeTimers("ciao")).isEqualTo(2);
        Assertions.assertThat(testProcessingTimeService.getNumActiveTimers()).isOne();
        Assertions.assertThat(testProcessingTimeService.getActiveTimerTimestamps()).contains(new Long[]{20L});
        testProcessingTimeService.setCurrentTime(20L);
        Assertions.assertThat(createAndStartInternalTimerService.numProcessingTimeTimers()).isOne();
        Assertions.assertThat(createAndStartInternalTimerService.numProcessingTimeTimers("hello")).isZero();
        Assertions.assertThat(createAndStartInternalTimerService.numProcessingTimeTimers("ciao")).isOne();
        Assertions.assertThat(testProcessingTimeService.getNumActiveTimers()).isOne();
        Assertions.assertThat(testProcessingTimeService.getActiveTimerTimestamps()).contains(new Long[]{30L});
        testProcessingTimeService.setCurrentTime(30L);
        Assertions.assertThat(createAndStartInternalTimerService.numProcessingTimeTimers()).isZero();
        Assertions.assertThat(testProcessingTimeService.getNumActiveTimers()).isZero();
        createAndStartInternalTimerService.registerProcessingTimeTimer("ciao", 40L);
        Assertions.assertThat(testProcessingTimeService.getNumActiveTimers()).isOne();
    }

    @TestTemplate
    void testRegisterEarlierProcessingTimerMovesPhysicalProcessingTimer() {
        Triggerable triggerable = (Triggerable) Mockito.mock(Triggerable.class);
        TestKeyContext testKeyContext = new TestKeyContext();
        TestProcessingTimeService testProcessingTimeService = new TestProcessingTimeService();
        InternalTimerServiceImpl<Integer, String> createAndStartInternalTimerService = createAndStartInternalTimerService(triggerable, testKeyContext, testProcessingTimeService, this.testKeyGroupRange, createQueueFactory());
        testKeyContext.setCurrentKey(Integer.valueOf(getKeyInKeyGroupRange(this.testKeyGroupRange, this.maxParallelism)));
        createAndStartInternalTimerService.registerProcessingTimeTimer("ciao", 20L);
        Assertions.assertThat(createAndStartInternalTimerService.numProcessingTimeTimers()).isOne();
        Assertions.assertThat(testProcessingTimeService.getNumActiveTimers()).isOne();
        Assertions.assertThat(testProcessingTimeService.getActiveTimerTimestamps()).contains(new Long[]{20L});
        createAndStartInternalTimerService.registerProcessingTimeTimer("ciao", 10L);
        Assertions.assertThat(createAndStartInternalTimerService.numProcessingTimeTimers()).isEqualTo(2L);
        Assertions.assertThat(testProcessingTimeService.getNumActiveTimers()).isOne();
        Assertions.assertThat(testProcessingTimeService.getActiveTimerTimestamps()).contains(new Long[]{10L});
    }

    @TestTemplate
    void testRegisteringProcessingTimeTimerInOnProcessingTimeDoesNotLeakPhysicalTimers() throws Exception {
        Triggerable triggerable = (Triggerable) Mockito.mock(Triggerable.class);
        TestKeyContext testKeyContext = new TestKeyContext();
        TestProcessingTimeService testProcessingTimeService = new TestProcessingTimeService();
        final InternalTimerServiceImpl<Integer, String> createAndStartInternalTimerService = createAndStartInternalTimerService(triggerable, testKeyContext, testProcessingTimeService, this.testKeyGroupRange, createQueueFactory());
        testKeyContext.setCurrentKey(Integer.valueOf(getKeyInKeyGroupRange(this.testKeyGroupRange, this.maxParallelism)));
        createAndStartInternalTimerService.registerProcessingTimeTimer("ciao", 10L);
        Assertions.assertThat(createAndStartInternalTimerService.numProcessingTimeTimers()).isOne();
        Assertions.assertThat(testProcessingTimeService.getNumActiveTimers()).isOne();
        Assertions.assertThat(testProcessingTimeService.getActiveTimerTimestamps()).contains(new Long[]{10L});
        ((Triggerable) Mockito.doAnswer(new Answer<Object>() { // from class: org.apache.flink.streaming.api.operators.InternalTimerServiceImplTest.1
            public Object answer(InvocationOnMock invocationOnMock) throws Exception {
                createAndStartInternalTimerService.registerProcessingTimeTimer("ciao", 20L);
                return null;
            }
        }).when(triggerable)).onProcessingTime(anyInternalTimer());
        testProcessingTimeService.setCurrentTime(10L);
        Assertions.assertThat(testProcessingTimeService.getNumActiveTimers()).isOne();
        Assertions.assertThat(testProcessingTimeService.getActiveTimerTimestamps()).contains(new Long[]{20L});
        ((Triggerable) Mockito.doAnswer(new Answer<Object>() { // from class: org.apache.flink.streaming.api.operators.InternalTimerServiceImplTest.2
            public Object answer(InvocationOnMock invocationOnMock) throws Exception {
                createAndStartInternalTimerService.registerProcessingTimeTimer("ciao", 30L);
                return null;
            }
        }).when(triggerable)).onProcessingTime(anyInternalTimer());
        testProcessingTimeService.setCurrentTime(20L);
        Assertions.assertThat(createAndStartInternalTimerService.numProcessingTimeTimers()).isOne();
        Assertions.assertThat(testProcessingTimeService.getNumActiveTimers()).isOne();
        Assertions.assertThat(testProcessingTimeService.getActiveTimerTimestamps()).contains(new Long[]{30L});
    }

    @TestTemplate
    void testCurrentProcessingTime() throws Exception {
        Triggerable triggerable = (Triggerable) Mockito.mock(Triggerable.class);
        TestKeyContext testKeyContext = new TestKeyContext();
        TestProcessingTimeService testProcessingTimeService = new TestProcessingTimeService();
        InternalTimerServiceImpl<Integer, String> createAndStartInternalTimerService = createAndStartInternalTimerService(triggerable, testKeyContext, testProcessingTimeService, this.testKeyGroupRange, createQueueFactory());
        testProcessingTimeService.setCurrentTime(17L);
        Assertions.assertThat(createAndStartInternalTimerService.currentProcessingTime()).isEqualTo(17L);
        testProcessingTimeService.setCurrentTime(42L);
        Assertions.assertThat(createAndStartInternalTimerService.currentProcessingTime()).isEqualTo(42L);
    }

    @TestTemplate
    void testCurrentEventTime() throws Exception {
        InternalTimerServiceImpl<Integer, String> createAndStartInternalTimerService = createAndStartInternalTimerService((Triggerable) Mockito.mock(Triggerable.class), new TestKeyContext(), new TestProcessingTimeService(), this.testKeyGroupRange, createQueueFactory());
        createAndStartInternalTimerService.advanceWatermark(17L);
        Assertions.assertThat(createAndStartInternalTimerService.currentWatermark()).isEqualTo(17L);
        createAndStartInternalTimerService.advanceWatermark(42L);
        Assertions.assertThat(createAndStartInternalTimerService.currentWatermark()).isEqualTo(42L);
    }

    @TestTemplate
    void testSetAndFireEventTimeTimers() throws Exception {
        Triggerable triggerable = (Triggerable) Mockito.mock(Triggerable.class);
        TestKeyContext testKeyContext = new TestKeyContext();
        TestProcessingTimeService testProcessingTimeService = new TestProcessingTimeService();
        PriorityQueueSetFactory createQueueFactory = createQueueFactory();
        TaskIOMetricGroup iOMetricGroup = UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup();
        InternalTimerServiceImpl createInternalTimerService = createInternalTimerService(iOMetricGroup, this.testKeyGroupRange, testKeyContext, testProcessingTimeService, IntSerializer.INSTANCE, StringSerializer.INSTANCE, createQueueFactory);
        createInternalTimerService.startTimerService(IntSerializer.INSTANCE, StringSerializer.INSTANCE, triggerable);
        int keyInKeyGroupRange = getKeyInKeyGroupRange(this.testKeyGroupRange, this.maxParallelism);
        int keyInKeyGroupRange2 = getKeyInKeyGroupRange(this.testKeyGroupRange, this.maxParallelism);
        while (true) {
            int i = keyInKeyGroupRange2;
            if (i != keyInKeyGroupRange) {
                testKeyContext.setCurrentKey(Integer.valueOf(keyInKeyGroupRange));
                createInternalTimerService.registerEventTimeTimer("ciao", 10L);
                createInternalTimerService.registerEventTimeTimer("hello", 10L);
                testKeyContext.setCurrentKey(Integer.valueOf(i));
                createInternalTimerService.registerEventTimeTimer("ciao", 10L);
                createInternalTimerService.registerEventTimeTimer("hello", 10L);
                Assertions.assertThat(createInternalTimerService.numEventTimeTimers()).isEqualTo(4);
                Assertions.assertThat(createInternalTimerService.numEventTimeTimers("hello")).isEqualTo(2);
                Assertions.assertThat(createInternalTimerService.numEventTimeTimers("ciao")).isEqualTo(2);
                createInternalTimerService.advanceWatermark(10L);
                Assertions.assertThat(iOMetricGroup.getNumFiredTimers().getCount()).isEqualTo(4L);
                ((Triggerable) Mockito.verify(triggerable, Mockito.times(4))).onEventTime(anyInternalTimer());
                ((Triggerable) Mockito.verify(triggerable, Mockito.times(1))).onEventTime((InternalTimer) Mockito.eq(new TimerHeapInternalTimer(10L, Integer.valueOf(keyInKeyGroupRange), "ciao")));
                ((Triggerable) Mockito.verify(triggerable, Mockito.times(1))).onEventTime((InternalTimer) Mockito.eq(new TimerHeapInternalTimer(10L, Integer.valueOf(keyInKeyGroupRange), "hello")));
                ((Triggerable) Mockito.verify(triggerable, Mockito.times(1))).onEventTime((InternalTimer) Mockito.eq(new TimerHeapInternalTimer(10L, Integer.valueOf(i), "ciao")));
                ((Triggerable) Mockito.verify(triggerable, Mockito.times(1))).onEventTime((InternalTimer) Mockito.eq(new TimerHeapInternalTimer(10L, Integer.valueOf(i), "hello")));
                Assertions.assertThat(createInternalTimerService.numEventTimeTimers()).isZero();
                return;
            }
            keyInKeyGroupRange2 = getKeyInKeyGroupRange(this.testKeyGroupRange, this.maxParallelism);
        }
    }

    @TestTemplate
    void testSetAndFireProcessingTimeTimers() throws Exception {
        Triggerable triggerable = (Triggerable) Mockito.mock(Triggerable.class);
        TestKeyContext testKeyContext = new TestKeyContext();
        TestProcessingTimeService testProcessingTimeService = new TestProcessingTimeService();
        PriorityQueueSetFactory createQueueFactory = createQueueFactory();
        TaskIOMetricGroup iOMetricGroup = UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup();
        InternalTimerServiceImpl createInternalTimerService = createInternalTimerService(iOMetricGroup, this.testKeyGroupRange, testKeyContext, testProcessingTimeService, IntSerializer.INSTANCE, StringSerializer.INSTANCE, createQueueFactory);
        createInternalTimerService.startTimerService(IntSerializer.INSTANCE, StringSerializer.INSTANCE, triggerable);
        int keyInKeyGroupRange = getKeyInKeyGroupRange(this.testKeyGroupRange, this.maxParallelism);
        int keyInKeyGroupRange2 = getKeyInKeyGroupRange(this.testKeyGroupRange, this.maxParallelism);
        while (true) {
            int i = keyInKeyGroupRange2;
            if (i != keyInKeyGroupRange) {
                testKeyContext.setCurrentKey(Integer.valueOf(keyInKeyGroupRange));
                createInternalTimerService.registerProcessingTimeTimer("ciao", 10L);
                createInternalTimerService.registerProcessingTimeTimer("hello", 10L);
                testKeyContext.setCurrentKey(Integer.valueOf(i));
                createInternalTimerService.registerProcessingTimeTimer("ciao", 10L);
                createInternalTimerService.registerProcessingTimeTimer("hello", 10L);
                Assertions.assertThat(createInternalTimerService.numProcessingTimeTimers()).isEqualTo(4L);
                Assertions.assertThat(createInternalTimerService.numProcessingTimeTimers("hello")).isEqualTo(2);
                Assertions.assertThat(createInternalTimerService.numProcessingTimeTimers("ciao")).isEqualTo(2);
                testProcessingTimeService.setCurrentTime(10L);
                Assertions.assertThat(iOMetricGroup.getNumFiredTimers().getCount()).isEqualTo(4L);
                ((Triggerable) Mockito.verify(triggerable, Mockito.times(4))).onProcessingTime(anyInternalTimer());
                ((Triggerable) Mockito.verify(triggerable, Mockito.times(1))).onProcessingTime((InternalTimer) Mockito.eq(new TimerHeapInternalTimer(10L, Integer.valueOf(keyInKeyGroupRange), "ciao")));
                ((Triggerable) Mockito.verify(triggerable, Mockito.times(1))).onProcessingTime((InternalTimer) Mockito.eq(new TimerHeapInternalTimer(10L, Integer.valueOf(keyInKeyGroupRange), "hello")));
                ((Triggerable) Mockito.verify(triggerable, Mockito.times(1))).onProcessingTime((InternalTimer) Mockito.eq(new TimerHeapInternalTimer(10L, Integer.valueOf(i), "ciao")));
                ((Triggerable) Mockito.verify(triggerable, Mockito.times(1))).onProcessingTime((InternalTimer) Mockito.eq(new TimerHeapInternalTimer(10L, Integer.valueOf(i), "hello")));
                Assertions.assertThat(createInternalTimerService.numProcessingTimeTimers()).isZero();
                return;
            }
            keyInKeyGroupRange2 = getKeyInKeyGroupRange(this.testKeyGroupRange, this.maxParallelism);
        }
    }

    @TestTemplate
    void testDeleteEventTimeTimers() throws Exception {
        Triggerable triggerable = (Triggerable) Mockito.mock(Triggerable.class);
        TestKeyContext testKeyContext = new TestKeyContext();
        InternalTimerServiceImpl<Integer, String> createAndStartInternalTimerService = createAndStartInternalTimerService(triggerable, testKeyContext, new TestProcessingTimeService(), this.testKeyGroupRange, createQueueFactory());
        int keyInKeyGroupRange = getKeyInKeyGroupRange(this.testKeyGroupRange, this.maxParallelism);
        int keyInKeyGroupRange2 = getKeyInKeyGroupRange(this.testKeyGroupRange, this.maxParallelism);
        while (true) {
            int i = keyInKeyGroupRange2;
            if (i != keyInKeyGroupRange) {
                testKeyContext.setCurrentKey(Integer.valueOf(keyInKeyGroupRange));
                createAndStartInternalTimerService.registerEventTimeTimer("ciao", 10L);
                createAndStartInternalTimerService.registerEventTimeTimer("hello", 10L);
                testKeyContext.setCurrentKey(Integer.valueOf(i));
                createAndStartInternalTimerService.registerEventTimeTimer("ciao", 10L);
                createAndStartInternalTimerService.registerEventTimeTimer("hello", 10L);
                Assertions.assertThat(createAndStartInternalTimerService.numEventTimeTimers()).isEqualTo(4);
                Assertions.assertThat(createAndStartInternalTimerService.numEventTimeTimers("hello")).isEqualTo(2);
                Assertions.assertThat(createAndStartInternalTimerService.numEventTimeTimers("ciao")).isEqualTo(2);
                testKeyContext.setCurrentKey(Integer.valueOf(keyInKeyGroupRange));
                createAndStartInternalTimerService.deleteEventTimeTimer("hello", 10L);
                testKeyContext.setCurrentKey(Integer.valueOf(i));
                createAndStartInternalTimerService.deleteEventTimeTimer("ciao", 10L);
                Assertions.assertThat(createAndStartInternalTimerService.numEventTimeTimers()).isEqualTo(2);
                Assertions.assertThat(createAndStartInternalTimerService.numEventTimeTimers("hello")).isOne();
                Assertions.assertThat(createAndStartInternalTimerService.numEventTimeTimers("ciao")).isOne();
                createAndStartInternalTimerService.advanceWatermark(10L);
                ((Triggerable) Mockito.verify(triggerable, Mockito.times(2))).onEventTime(anyInternalTimer());
                ((Triggerable) Mockito.verify(triggerable, Mockito.times(1))).onEventTime((InternalTimer) Mockito.eq(new TimerHeapInternalTimer(10L, Integer.valueOf(keyInKeyGroupRange), "ciao")));
                ((Triggerable) Mockito.verify(triggerable, Mockito.times(0))).onEventTime((InternalTimer) Mockito.eq(new TimerHeapInternalTimer(10L, Integer.valueOf(keyInKeyGroupRange), "hello")));
                ((Triggerable) Mockito.verify(triggerable, Mockito.times(0))).onEventTime((InternalTimer) Mockito.eq(new TimerHeapInternalTimer(10L, Integer.valueOf(i), "ciao")));
                ((Triggerable) Mockito.verify(triggerable, Mockito.times(1))).onEventTime((InternalTimer) Mockito.eq(new TimerHeapInternalTimer(10L, Integer.valueOf(i), "hello")));
                Assertions.assertThat(createAndStartInternalTimerService.numEventTimeTimers()).isZero();
                return;
            }
            keyInKeyGroupRange2 = getKeyInKeyGroupRange(this.testKeyGroupRange, this.maxParallelism);
        }
    }

    @TestTemplate
    void testDeleteProcessingTimeTimers() throws Exception {
        Triggerable triggerable = (Triggerable) Mockito.mock(Triggerable.class);
        TestKeyContext testKeyContext = new TestKeyContext();
        TestProcessingTimeService testProcessingTimeService = new TestProcessingTimeService();
        InternalTimerServiceImpl<Integer, String> createAndStartInternalTimerService = createAndStartInternalTimerService(triggerable, testKeyContext, testProcessingTimeService, this.testKeyGroupRange, createQueueFactory());
        int keyInKeyGroupRange = getKeyInKeyGroupRange(this.testKeyGroupRange, this.maxParallelism);
        int keyInKeyGroupRange2 = getKeyInKeyGroupRange(this.testKeyGroupRange, this.maxParallelism);
        while (true) {
            int i = keyInKeyGroupRange2;
            if (i != keyInKeyGroupRange) {
                testKeyContext.setCurrentKey(Integer.valueOf(keyInKeyGroupRange));
                createAndStartInternalTimerService.registerProcessingTimeTimer("ciao", 10L);
                createAndStartInternalTimerService.registerProcessingTimeTimer("hello", 10L);
                testKeyContext.setCurrentKey(Integer.valueOf(i));
                createAndStartInternalTimerService.registerProcessingTimeTimer("ciao", 10L);
                createAndStartInternalTimerService.registerProcessingTimeTimer("hello", 10L);
                Assertions.assertThat(createAndStartInternalTimerService.numProcessingTimeTimers()).isEqualTo(4);
                Assertions.assertThat(createAndStartInternalTimerService.numProcessingTimeTimers("hello")).isEqualTo(2);
                Assertions.assertThat(createAndStartInternalTimerService.numProcessingTimeTimers("ciao")).isEqualTo(2);
                testKeyContext.setCurrentKey(Integer.valueOf(keyInKeyGroupRange));
                createAndStartInternalTimerService.deleteProcessingTimeTimer("hello", 10L);
                testKeyContext.setCurrentKey(Integer.valueOf(i));
                createAndStartInternalTimerService.deleteProcessingTimeTimer("ciao", 10L);
                Assertions.assertThat(createAndStartInternalTimerService.numProcessingTimeTimers()).isEqualTo(2);
                Assertions.assertThat(createAndStartInternalTimerService.numProcessingTimeTimers("hello")).isOne();
                Assertions.assertThat(createAndStartInternalTimerService.numProcessingTimeTimers("ciao")).isOne();
                testProcessingTimeService.setCurrentTime(10L);
                ((Triggerable) Mockito.verify(triggerable, Mockito.times(2))).onProcessingTime(anyInternalTimer());
                ((Triggerable) Mockito.verify(triggerable, Mockito.times(1))).onProcessingTime((InternalTimer) Mockito.eq(new TimerHeapInternalTimer(10L, Integer.valueOf(keyInKeyGroupRange), "ciao")));
                ((Triggerable) Mockito.verify(triggerable, Mockito.times(0))).onProcessingTime((InternalTimer) Mockito.eq(new TimerHeapInternalTimer(10L, Integer.valueOf(keyInKeyGroupRange), "hello")));
                ((Triggerable) Mockito.verify(triggerable, Mockito.times(0))).onProcessingTime((InternalTimer) Mockito.eq(new TimerHeapInternalTimer(10L, Integer.valueOf(i), "ciao")));
                ((Triggerable) Mockito.verify(triggerable, Mockito.times(1))).onProcessingTime((InternalTimer) Mockito.eq(new TimerHeapInternalTimer(10L, Integer.valueOf(i), "hello")));
                Assertions.assertThat(createAndStartInternalTimerService.numEventTimeTimers()).isZero();
                return;
            }
            keyInKeyGroupRange2 = getKeyInKeyGroupRange(this.testKeyGroupRange, this.maxParallelism);
        }
    }

    @TestTemplate
    void testForEachEventTimeTimers() throws Exception {
        int i;
        Triggerable triggerable = (Triggerable) Mockito.mock(Triggerable.class);
        TestKeyContext testKeyContext = new TestKeyContext();
        InternalTimerServiceImpl<Integer, String> createAndStartInternalTimerService = createAndStartInternalTimerService(triggerable, testKeyContext, new TestProcessingTimeService(), this.testKeyGroupRange, createQueueFactory());
        int keyInKeyGroupRange = getKeyInKeyGroupRange(this.testKeyGroupRange, this.maxParallelism);
        int keyInKeyGroupRange2 = getKeyInKeyGroupRange(this.testKeyGroupRange, this.maxParallelism);
        while (true) {
            i = keyInKeyGroupRange2;
            if (i != keyInKeyGroupRange) {
                break;
            } else {
                keyInKeyGroupRange2 = getKeyInKeyGroupRange(this.testKeyGroupRange, this.maxParallelism);
            }
        }
        HashSet<Tuple3> hashSet = new HashSet();
        hashSet.add(Tuple3.of(Integer.valueOf(keyInKeyGroupRange), "ciao", 10L));
        hashSet.add(Tuple3.of(Integer.valueOf(keyInKeyGroupRange), "hello", 10L));
        hashSet.add(Tuple3.of(Integer.valueOf(i), "ciao", 10L));
        hashSet.add(Tuple3.of(Integer.valueOf(i), "hello", 10L));
        for (Tuple3 tuple3 : hashSet) {
            testKeyContext.setCurrentKey(tuple3.f0);
            createAndStartInternalTimerService.registerEventTimeTimer((String) tuple3.f1, ((Long) tuple3.f2).longValue());
        }
        HashSet hashSet2 = new HashSet();
        createAndStartInternalTimerService.forEachEventTimeTimer((str, l) -> {
            hashSet2.add(Tuple3.of((Integer) testKeyContext.getCurrentKey(), str, l));
        });
        Assertions.assertThat(hashSet2).isEqualTo(hashSet);
    }

    @TestTemplate
    void testForEachProcessingTimeTimers() throws Exception {
        int i;
        Triggerable triggerable = (Triggerable) Mockito.mock(Triggerable.class);
        TestKeyContext testKeyContext = new TestKeyContext();
        InternalTimerServiceImpl<Integer, String> createAndStartInternalTimerService = createAndStartInternalTimerService(triggerable, testKeyContext, new TestProcessingTimeService(), this.testKeyGroupRange, createQueueFactory());
        int keyInKeyGroupRange = getKeyInKeyGroupRange(this.testKeyGroupRange, this.maxParallelism);
        int keyInKeyGroupRange2 = getKeyInKeyGroupRange(this.testKeyGroupRange, this.maxParallelism);
        while (true) {
            i = keyInKeyGroupRange2;
            if (i != keyInKeyGroupRange) {
                break;
            } else {
                keyInKeyGroupRange2 = getKeyInKeyGroupRange(this.testKeyGroupRange, this.maxParallelism);
            }
        }
        HashSet<Tuple3> hashSet = new HashSet();
        hashSet.add(Tuple3.of(Integer.valueOf(keyInKeyGroupRange), "ciao", 10L));
        hashSet.add(Tuple3.of(Integer.valueOf(keyInKeyGroupRange), "hello", 10L));
        hashSet.add(Tuple3.of(Integer.valueOf(i), "ciao", 10L));
        hashSet.add(Tuple3.of(Integer.valueOf(i), "hello", 10L));
        for (Tuple3 tuple3 : hashSet) {
            testKeyContext.setCurrentKey(tuple3.f0);
            createAndStartInternalTimerService.registerProcessingTimeTimer((String) tuple3.f1, ((Long) tuple3.f2).longValue());
        }
        HashSet hashSet2 = new HashSet();
        createAndStartInternalTimerService.forEachProcessingTimeTimer((str, l) -> {
            hashSet2.add(Tuple3.of((Integer) testKeyContext.getCurrentKey(), str, l));
        });
        Assertions.assertThat(hashSet2).isEqualTo(hashSet);
    }

    @TestTemplate
    void testSnapshotAndRestore() throws Exception {
        testSnapshotAndRestore(2);
    }

    @TestTemplate
    void testSnapshotAndRebalancingRestore() throws Exception {
        testSnapshotAndRebalancingRestore(2);
    }

    private void testSnapshotAndRestore(int i) throws Exception {
        int i2;
        Triggerable triggerable = (Triggerable) Mockito.mock(Triggerable.class);
        TestKeyContext testKeyContext = new TestKeyContext();
        InternalTimerServiceImpl<Integer, String> createAndStartInternalTimerService = createAndStartInternalTimerService(triggerable, testKeyContext, new TestProcessingTimeService(), this.testKeyGroupRange, createQueueFactory());
        int keyInKeyGroupRange = getKeyInKeyGroupRange(this.testKeyGroupRange, this.maxParallelism);
        int keyInKeyGroupRange2 = getKeyInKeyGroupRange(this.testKeyGroupRange, this.maxParallelism);
        while (true) {
            i2 = keyInKeyGroupRange2;
            if (i2 != keyInKeyGroupRange) {
                break;
            } else {
                keyInKeyGroupRange2 = getKeyInKeyGroupRange(this.testKeyGroupRange, this.maxParallelism);
            }
        }
        testKeyContext.setCurrentKey(Integer.valueOf(keyInKeyGroupRange));
        createAndStartInternalTimerService.registerProcessingTimeTimer("ciao", 10L);
        createAndStartInternalTimerService.registerEventTimeTimer("hello", 10L);
        testKeyContext.setCurrentKey(Integer.valueOf(i2));
        createAndStartInternalTimerService.registerEventTimeTimer("ciao", 10L);
        createAndStartInternalTimerService.registerProcessingTimeTimer("hello", 10L);
        Assertions.assertThat(createAndStartInternalTimerService.numProcessingTimeTimers()).isEqualTo(2);
        Assertions.assertThat(createAndStartInternalTimerService.numProcessingTimeTimers("hello")).isOne();
        Assertions.assertThat(createAndStartInternalTimerService.numProcessingTimeTimers("ciao")).isOne();
        Assertions.assertThat(createAndStartInternalTimerService.numEventTimeTimers()).isEqualTo(2);
        Assertions.assertThat(createAndStartInternalTimerService.numEventTimeTimers("hello")).isOne();
        Assertions.assertThat(createAndStartInternalTimerService.numEventTimeTimers("ciao")).isOne();
        HashMap hashMap = new HashMap();
        Iterator it = this.testKeyGroupRange.iterator();
        while (it.hasNext()) {
            Integer num = (Integer) it.next();
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            try {
                InternalTimersSnapshotReaderWriters.getWriterForVersion(i, createAndStartInternalTimerService.snapshotTimersForKeyGroup(num.intValue()), createAndStartInternalTimerService.getKeySerializer(), createAndStartInternalTimerService.getNamespaceSerializer()).writeTimersSnapshot(new DataOutputViewStreamWrapper(byteArrayOutputStream));
                hashMap.put(num, byteArrayOutputStream.toByteArray());
                byteArrayOutputStream.close();
            } catch (Throwable th) {
                try {
                    byteArrayOutputStream.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
                throw th;
            }
        }
        Triggerable triggerable2 = (Triggerable) Mockito.mock(Triggerable.class);
        TestKeyContext testKeyContext2 = new TestKeyContext();
        TestProcessingTimeService testProcessingTimeService = new TestProcessingTimeService();
        InternalTimerServiceImpl<Integer, String> restoreTimerService = restoreTimerService(hashMap, i, triggerable2, testKeyContext2, testProcessingTimeService, this.testKeyGroupRange, createQueueFactory());
        testProcessingTimeService.setCurrentTime(10L);
        restoreTimerService.advanceWatermark(10L);
        ((Triggerable) Mockito.verify(triggerable2, Mockito.times(2))).onProcessingTime(anyInternalTimer());
        ((Triggerable) Mockito.verify(triggerable2, Mockito.times(1))).onProcessingTime((InternalTimer) Mockito.eq(new TimerHeapInternalTimer(10L, Integer.valueOf(keyInKeyGroupRange), "ciao")));
        ((Triggerable) Mockito.verify(triggerable2, Mockito.times(1))).onProcessingTime((InternalTimer) Mockito.eq(new TimerHeapInternalTimer(10L, Integer.valueOf(i2), "hello")));
        ((Triggerable) Mockito.verify(triggerable2, Mockito.times(2))).onEventTime(anyInternalTimer());
        ((Triggerable) Mockito.verify(triggerable2, Mockito.times(1))).onEventTime((InternalTimer) Mockito.eq(new TimerHeapInternalTimer(10L, Integer.valueOf(keyInKeyGroupRange), "hello")));
        ((Triggerable) Mockito.verify(triggerable2, Mockito.times(1))).onEventTime((InternalTimer) Mockito.eq(new TimerHeapInternalTimer(10L, Integer.valueOf(i2), "ciao")));
        Assertions.assertThat(restoreTimerService.numEventTimeTimers()).isZero();
    }

    private void testSnapshotAndRebalancingRestore(int i) throws Exception {
        Triggerable triggerable = (Triggerable) Mockito.mock(Triggerable.class);
        TestKeyContext testKeyContext = new TestKeyContext();
        TestProcessingTimeService testProcessingTimeService = new TestProcessingTimeService();
        PriorityQueueSetFactory createQueueFactory = createQueueFactory();
        InternalTimerServiceImpl<Integer, String> createAndStartInternalTimerService = createAndStartInternalTimerService(triggerable, testKeyContext, testProcessingTimeService, this.testKeyGroupRange, createQueueFactory);
        int startKeyGroup = this.testKeyGroupRange.getStartKeyGroup() + ((this.testKeyGroupRange.getEndKeyGroup() - this.testKeyGroupRange.getStartKeyGroup()) / 2);
        KeyGroupRange keyGroupRange = new KeyGroupRange(this.testKeyGroupRange.getStartKeyGroup(), startKeyGroup);
        KeyGroupRange keyGroupRange2 = new KeyGroupRange(startKeyGroup + 1, this.testKeyGroupRange.getEndKeyGroup());
        int keyInKeyGroupRange = getKeyInKeyGroupRange(keyGroupRange, this.maxParallelism);
        int keyInKeyGroupRange2 = getKeyInKeyGroupRange(keyGroupRange2, this.maxParallelism);
        testKeyContext.setCurrentKey(Integer.valueOf(keyInKeyGroupRange));
        createAndStartInternalTimerService.registerProcessingTimeTimer("ciao", 10L);
        createAndStartInternalTimerService.registerEventTimeTimer("hello", 10L);
        testKeyContext.setCurrentKey(Integer.valueOf(keyInKeyGroupRange2));
        createAndStartInternalTimerService.registerEventTimeTimer("ciao", 10L);
        createAndStartInternalTimerService.registerProcessingTimeTimer("hello", 10L);
        Assertions.assertThat(createAndStartInternalTimerService.numProcessingTimeTimers()).isEqualTo(2);
        Assertions.assertThat(createAndStartInternalTimerService.numProcessingTimeTimers("hello")).isOne();
        Assertions.assertThat(createAndStartInternalTimerService.numProcessingTimeTimers("ciao")).isOne();
        Assertions.assertThat(createAndStartInternalTimerService.numEventTimeTimers()).isEqualTo(2);
        Assertions.assertThat(createAndStartInternalTimerService.numEventTimeTimers("hello")).isOne();
        Assertions.assertThat(createAndStartInternalTimerService.numEventTimeTimers("ciao")).isOne();
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        Iterator it = this.testKeyGroupRange.iterator();
        while (it.hasNext()) {
            Integer num = (Integer) it.next();
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            try {
                InternalTimersSnapshotReaderWriters.getWriterForVersion(i, createAndStartInternalTimerService.snapshotTimersForKeyGroup(num.intValue()), createAndStartInternalTimerService.getKeySerializer(), createAndStartInternalTimerService.getNamespaceSerializer()).writeTimersSnapshot(new DataOutputViewStreamWrapper(byteArrayOutputStream));
                if (keyGroupRange.contains(num.intValue())) {
                    hashMap.put(num, byteArrayOutputStream.toByteArray());
                } else {
                    if (!keyGroupRange2.contains(num.intValue())) {
                        throw new IllegalStateException("Key-Group index doesn't belong to any sub range.");
                    }
                    hashMap2.put(num, byteArrayOutputStream.toByteArray());
                }
                byteArrayOutputStream.close();
            } catch (Throwable th) {
                try {
                    byteArrayOutputStream.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
                throw th;
            }
        }
        Triggerable triggerable2 = (Triggerable) Mockito.mock(Triggerable.class);
        Triggerable triggerable3 = (Triggerable) Mockito.mock(Triggerable.class);
        TestKeyContext testKeyContext2 = new TestKeyContext();
        TestKeyContext testKeyContext3 = new TestKeyContext();
        TestProcessingTimeService testProcessingTimeService2 = new TestProcessingTimeService();
        TestProcessingTimeService testProcessingTimeService3 = new TestProcessingTimeService();
        InternalTimerServiceImpl<Integer, String> restoreTimerService = restoreTimerService(hashMap, i, triggerable2, testKeyContext2, testProcessingTimeService2, keyGroupRange, createQueueFactory);
        InternalTimerServiceImpl<Integer, String> restoreTimerService2 = restoreTimerService(hashMap2, i, triggerable3, testKeyContext3, testProcessingTimeService3, keyGroupRange2, createQueueFactory);
        testProcessingTimeService2.setCurrentTime(10L);
        restoreTimerService.advanceWatermark(10L);
        ((Triggerable) Mockito.verify(triggerable2, Mockito.times(1))).onProcessingTime(anyInternalTimer());
        ((Triggerable) Mockito.verify(triggerable2, Mockito.times(1))).onProcessingTime((InternalTimer) Mockito.eq(new TimerHeapInternalTimer(10L, Integer.valueOf(keyInKeyGroupRange), "ciao")));
        ((Triggerable) Mockito.verify(triggerable2, Mockito.never())).onProcessingTime((InternalTimer) Mockito.eq(new TimerHeapInternalTimer(10L, Integer.valueOf(keyInKeyGroupRange2), "hello")));
        ((Triggerable) Mockito.verify(triggerable2, Mockito.times(1))).onEventTime(anyInternalTimer());
        ((Triggerable) Mockito.verify(triggerable2, Mockito.times(1))).onEventTime((InternalTimer) Mockito.eq(new TimerHeapInternalTimer(10L, Integer.valueOf(keyInKeyGroupRange), "hello")));
        ((Triggerable) Mockito.verify(triggerable2, Mockito.never())).onEventTime((InternalTimer) Mockito.eq(new TimerHeapInternalTimer(10L, Integer.valueOf(keyInKeyGroupRange2), "ciao")));
        Assertions.assertThat(restoreTimerService.numEventTimeTimers()).isZero();
        testProcessingTimeService3.setCurrentTime(10L);
        restoreTimerService2.advanceWatermark(10L);
        ((Triggerable) Mockito.verify(triggerable3, Mockito.times(1))).onProcessingTime(anyInternalTimer());
        ((Triggerable) Mockito.verify(triggerable3, Mockito.never())).onProcessingTime((InternalTimer) Mockito.eq(new TimerHeapInternalTimer(10L, Integer.valueOf(keyInKeyGroupRange), "ciao")));
        ((Triggerable) Mockito.verify(triggerable3, Mockito.times(1))).onProcessingTime((InternalTimer) Mockito.eq(new TimerHeapInternalTimer(10L, Integer.valueOf(keyInKeyGroupRange2), "hello")));
        ((Triggerable) Mockito.verify(triggerable3, Mockito.times(1))).onEventTime(anyInternalTimer());
        ((Triggerable) Mockito.verify(triggerable3, Mockito.never())).onEventTime((InternalTimer) Mockito.eq(new TimerHeapInternalTimer(10L, Integer.valueOf(keyInKeyGroupRange), "hello")));
        ((Triggerable) Mockito.verify(triggerable3, Mockito.times(1))).onEventTime((InternalTimer) Mockito.eq(new TimerHeapInternalTimer(10L, Integer.valueOf(keyInKeyGroupRange2), "ciao")));
        Assertions.assertThat(restoreTimerService2.numEventTimeTimers()).isZero();
    }

    private static int getKeyInKeyGroup(int i, int i2) {
        Random random = new Random(System.currentTimeMillis());
        int nextInt = random.nextInt();
        while (true) {
            int i3 = nextInt;
            if (KeyGroupRangeAssignment.assignToKeyGroup(Integer.valueOf(i3), i2) == i) {
                return i3;
            }
            nextInt = random.nextInt();
        }
    }

    private static int getKeyInKeyGroupRange(KeyGroupRange keyGroupRange, int i) {
        Random random = new Random(System.currentTimeMillis());
        int nextInt = random.nextInt();
        while (true) {
            int i2 = nextInt;
            if (keyGroupRange.contains(KeyGroupRangeAssignment.assignToKeyGroup(Integer.valueOf(i2), i))) {
                return i2;
            }
            nextInt = random.nextInt();
        }
    }

    private static InternalTimerServiceImpl<Integer, String> createAndStartInternalTimerService(Triggerable<Integer, String> triggerable, KeyContext keyContext, ProcessingTimeService processingTimeService, KeyGroupRange keyGroupRange, PriorityQueueSetFactory priorityQueueSetFactory) {
        InternalTimerServiceImpl<Integer, String> createInternalTimerService = createInternalTimerService(UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup(), keyGroupRange, keyContext, processingTimeService, IntSerializer.INSTANCE, StringSerializer.INSTANCE, priorityQueueSetFactory);
        createInternalTimerService.startTimerService(IntSerializer.INSTANCE, StringSerializer.INSTANCE, triggerable);
        return createInternalTimerService;
    }

    private static InternalTimerServiceImpl<Integer, String> restoreTimerService(Map<Integer, byte[]> map, int i, Triggerable<Integer, String> triggerable, KeyContext keyContext, ProcessingTimeService processingTimeService, KeyGroupRange keyGroupRange, PriorityQueueSetFactory priorityQueueSetFactory) throws Exception {
        InternalTimerServiceImpl<Integer, String> createInternalTimerService = createInternalTimerService(UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup(), keyGroupRange, keyContext, processingTimeService, IntSerializer.INSTANCE, StringSerializer.INSTANCE, priorityQueueSetFactory);
        Iterator it = keyGroupRange.iterator();
        while (it.hasNext()) {
            Integer num = (Integer) it.next();
            if (map.containsKey(num)) {
                ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(map.get(num));
                try {
                    createInternalTimerService.restoreTimersForKeyGroup(InternalTimersSnapshotReaderWriters.getReaderForVersion(i, InternalTimerServiceImplTest.class.getClassLoader()).readTimersSnapshot(new DataInputViewStreamWrapper(byteArrayInputStream)), num.intValue());
                    byteArrayInputStream.close();
                } catch (Throwable th) {
                    try {
                        byteArrayInputStream.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                    throw th;
                }
            }
        }
        createInternalTimerService.startTimerService(IntSerializer.INSTANCE, StringSerializer.INSTANCE, triggerable);
        return createInternalTimerService;
    }

    private PriorityQueueSetFactory createQueueFactory() {
        return createQueueFactory(this.testKeyGroupRange, this.maxParallelism);
    }

    protected PriorityQueueSetFactory createQueueFactory(KeyGroupRange keyGroupRange, int i) {
        return new HeapPriorityQueueSetFactory(keyGroupRange, i, 128);
    }

    @Parameters(name = "start = {0}, end = {1}, max = {2}")
    private static Collection<Object[]> keyRanges() {
        return Arrays.asList(new Object[]{0, 32766, Short.MAX_VALUE}, new Object[]{0, 10, Short.MAX_VALUE}, new Object[]{0, 10, 10}, new Object[]{10, 32766, Short.MAX_VALUE}, new Object[]{2, 5, 100}, new Object[]{2, 5, 6});
    }

    private static <K, N> InternalTimerServiceImpl<K, N> createInternalTimerService(TaskIOMetricGroup taskIOMetricGroup, KeyGroupRange keyGroupRange, KeyContext keyContext, ProcessingTimeService processingTimeService, TypeSerializer<K> typeSerializer, TypeSerializer<N> typeSerializer2, PriorityQueueSetFactory priorityQueueSetFactory) {
        TimerSerializer timerSerializer = new TimerSerializer(typeSerializer, typeSerializer2);
        return new InternalTimerServiceImpl<>(taskIOMetricGroup, keyGroupRange, keyContext, processingTimeService, createTimerQueue("__test_processing_timers", timerSerializer, priorityQueueSetFactory), createTimerQueue("__test_event_timers", timerSerializer, priorityQueueSetFactory), StreamTaskCancellationContext.alwaysRunning());
    }

    private static <K, N> KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> createTimerQueue(String str, TimerSerializer<K, N> timerSerializer, PriorityQueueSetFactory priorityQueueSetFactory) {
        return priorityQueueSetFactory.create(str, timerSerializer);
    }
}
