package org.apache.flink.streaming.api.operators.sorted.state;

import java.util.ArrayList;
import java.util.Collections;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
import org.apache.flink.runtime.asyncprocessing.MockStateExecutor;
import org.apache.flink.runtime.asyncprocessing.declare.DeclarationManager;
import org.apache.flink.runtime.mailbox.SyncMailboxExecutor;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.v2.adaptor.AsyncKeyedStateBackendAdaptor;
import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.KeyContext;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.tasks.StreamTaskCancellationContext;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionInternalTimeServiceWithAsyncStateTest.class */
class BatchExecutionInternalTimeServiceWithAsyncStateTest {
    public static final IntSerializer KEY_SERIALIZER = new IntSerializer();
    BatchExecutionKeyedStateBackend<Integer> keyedStatedBackend;
    InternalTimeServiceManager<Integer> timeServiceManager;
    TestProcessingTimeService processingTimeService;
    AsyncExecutionController<Integer> aec;

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionInternalTimeServiceWithAsyncStateTest$DummyKeyContext.class */
    private static class DummyKeyContext implements KeyContext {
        private DummyKeyContext() {
        }

        public void setCurrentKey(Object obj) {
        }

        public Object getCurrentKey() {
            return null;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionInternalTimeServiceWithAsyncStateTest$LambdaTrigger.class */
    private static class LambdaTrigger<K, N> implements Triggerable<K, N> {
        private final Consumer<InternalTimer<K, N>> eventTimeHandler;
        private final Consumer<InternalTimer<K, N>> processingTimeHandler;

        public static <K, N> LambdaTrigger<K, N> eventTimeTrigger(Consumer<InternalTimer<K, N>> consumer) {
            return new LambdaTrigger<>(consumer, internalTimer -> {
                Assertions.fail("We did not expect processing timer to be triggered.");
            });
        }

        public static <K, N> LambdaTrigger<K, N> processingTimeTrigger(Consumer<InternalTimer<K, N>> consumer) {
            return new LambdaTrigger<>(internalTimer -> {
                Assertions.fail("We did not expect event timer to be triggered.");
            }, consumer);
        }

        private LambdaTrigger(Consumer<InternalTimer<K, N>> consumer, Consumer<InternalTimer<K, N>> consumer2) {
            this.eventTimeHandler = consumer;
            this.processingTimeHandler = consumer2;
        }

        public void onEventTime(InternalTimer<K, N> internalTimer) throws Exception {
            this.eventTimeHandler.accept(internalTimer);
        }

        public void onProcessingTime(InternalTimer<K, N> internalTimer) throws Exception {
            this.processingTimeHandler.accept(internalTimer);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionInternalTimeServiceWithAsyncStateTest$TriggerWithTimerServiceAccess.class */
    private static class TriggerWithTimerServiceAccess<K, N> implements Triggerable<K, N> {
        private InternalTimerService<N> timerService;
        private final BiConsumer<InternalTimer<K, N>, InternalTimerService<N>> eventTimeHandler;
        private final BiConsumer<InternalTimer<K, N>, InternalTimerService<N>> processingTimeHandler;

        private TriggerWithTimerServiceAccess(BiConsumer<InternalTimer<K, N>, InternalTimerService<N>> biConsumer, BiConsumer<InternalTimer<K, N>, InternalTimerService<N>> biConsumer2) {
            this.eventTimeHandler = biConsumer;
            this.processingTimeHandler = biConsumer2;
        }

        public static <K, N> TriggerWithTimerServiceAccess<K, N> eventTimeTrigger(BiConsumer<InternalTimer<K, N>, InternalTimerService<N>> biConsumer) {
            return new TriggerWithTimerServiceAccess<>(biConsumer, (internalTimer, internalTimerService) -> {
                Assertions.fail("We did not expect processing timer to be triggered.");
            });
        }

        public static <K, N> TriggerWithTimerServiceAccess<K, N> processingTimeTrigger(BiConsumer<InternalTimer<K, N>, InternalTimerService<N>> biConsumer) {
            return new TriggerWithTimerServiceAccess<>((internalTimer, internalTimerService) -> {
                Assertions.fail("We did not expect event timer to be triggered.");
            }, biConsumer);
        }

        public void setTimerService(InternalTimerService<N> internalTimerService) {
            this.timerService = internalTimerService;
        }

        public void onEventTime(InternalTimer<K, N> internalTimer) throws Exception {
            this.eventTimeHandler.accept(internalTimer, this.timerService);
        }

        public void onProcessingTime(InternalTimer<K, N> internalTimer) throws Exception {
            this.processingTimeHandler.accept(internalTimer, this.timerService);
        }
    }

    BatchExecutionInternalTimeServiceWithAsyncStateTest() {
    }

    @BeforeEach
    public void setup() {
        this.keyedStatedBackend = new BatchExecutionKeyedStateBackend<>(KEY_SERIALIZER, new KeyGroupRange(0, 1), new ExecutionConfig());
        this.processingTimeService = new TestProcessingTimeService();
        this.aec = new AsyncExecutionController<>(new SyncMailboxExecutor(), (str, th) -> {
        }, new MockStateExecutor(), new DeclarationManager(), 1, 100, 1000L, 1, (AsyncExecutionController.SwitchContextListener) null, (MetricGroup) null);
        this.timeServiceManager = BatchExecutionInternalTimeServiceManager.create(UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup(), new AsyncKeyedStateBackendAdaptor(this.keyedStatedBackend), (KeyGroupRange) null, getClass().getClassLoader(), new DummyKeyContext(), this.processingTimeService, Collections.emptyList(), StreamTaskCancellationContext.alwaysRunning());
    }

    @Test
    void testForEachEventTimeTimerUnsupported() {
        BatchExecutionInternalTimeServiceWithAsyncState batchExecutionInternalTimeServiceWithAsyncState = new BatchExecutionInternalTimeServiceWithAsyncState(new TestProcessingTimeService(), LambdaTrigger.eventTimeTrigger(internalTimer -> {
        }));
        Assertions.assertThatThrownBy(() -> {
            batchExecutionInternalTimeServiceWithAsyncState.forEachEventTimeTimer((obj, l) -> {
                Assertions.fail("The forEachEventTimeTimer() should not be supported");
            });
        }).isInstanceOf(UnsupportedOperationException.class).hasMessageContaining("The BatchExecutionInternalTimeService should not be used in State Processor API");
    }

    @Test
    void testForEachProcessingTimeTimerUnsupported() {
        BatchExecutionInternalTimeServiceWithAsyncState batchExecutionInternalTimeServiceWithAsyncState = new BatchExecutionInternalTimeServiceWithAsyncState(new TestProcessingTimeService(), LambdaTrigger.eventTimeTrigger(internalTimer -> {
        }));
        Assertions.assertThatThrownBy(() -> {
            batchExecutionInternalTimeServiceWithAsyncState.forEachEventTimeTimer((obj, l) -> {
                Assertions.fail("The forEachProcessingTimeTimer() should not be supported");
            });
        }).isInstanceOf(UnsupportedOperationException.class).hasMessageContaining("The BatchExecutionInternalTimeService should not be used in State Processor API");
    }

    @Test
    void testFiringEventTimeTimers() throws Exception {
        ArrayList arrayList = new ArrayList();
        InternalTimerService<VoidNamespace> buildTimerService = buildTimerService(LambdaTrigger.eventTimeTrigger(internalTimer -> {
            arrayList.add(Long.valueOf(internalTimer.getTimestamp()));
        }));
        this.keyedStatedBackend.setCurrentKey(1);
        buildTimerService.registerEventTimeTimer(VoidNamespace.INSTANCE, 123L);
        this.timeServiceManager.advanceWatermark(new Watermark(1000L));
        buildTimerService.deleteEventTimeTimer(VoidNamespace.INSTANCE, 123L);
        buildTimerService.registerEventTimeTimer(VoidNamespace.INSTANCE, 150L);
        this.keyedStatedBackend.setCurrentKey(2);
        Assertions.assertThat(arrayList).containsExactly(new Long[]{150L});
    }

    @Test
    void testSettingSameKeyDoesNotFireTimers() {
        ArrayList arrayList = new ArrayList();
        InternalTimerService<VoidNamespace> buildTimerService = buildTimerService(LambdaTrigger.eventTimeTrigger(internalTimer -> {
            arrayList.add(Long.valueOf(internalTimer.getTimestamp()));
        }));
        this.keyedStatedBackend.setCurrentKey(1);
        buildTimerService.registerEventTimeTimer(VoidNamespace.INSTANCE, 123L);
        this.keyedStatedBackend.setCurrentKey(1);
        Assertions.assertThat(arrayList).isEmpty();
    }

    @Test
    void testCurrentWatermark() throws Exception {
        ArrayList arrayList = new ArrayList();
        TriggerWithTimerServiceAccess eventTimeTrigger = TriggerWithTimerServiceAccess.eventTimeTrigger((internalTimer, internalTimerService) -> {
            Assertions.assertThat(internalTimerService.currentWatermark()).isEqualTo(Long.MAX_VALUE);
            arrayList.add(Long.valueOf(internalTimer.getTimestamp()));
        });
        InternalTimerService<VoidNamespace> buildTimerService = buildTimerService(eventTimeTrigger);
        eventTimeTrigger.setTimerService(buildTimerService);
        Assertions.assertThat(buildTimerService.currentWatermark()).isEqualTo(Long.MIN_VALUE);
        this.keyedStatedBackend.setCurrentKey(1);
        buildTimerService.registerEventTimeTimer(VoidNamespace.INSTANCE, 123L);
        Assertions.assertThat(buildTimerService.currentWatermark()).isEqualTo(Long.MIN_VALUE);
        this.timeServiceManager.advanceWatermark(new Watermark(1000L));
        Assertions.assertThat(buildTimerService.currentWatermark()).isEqualTo(Long.MIN_VALUE);
        this.keyedStatedBackend.setCurrentKey(2);
        Assertions.assertThat(buildTimerService.currentWatermark()).isEqualTo(Long.MIN_VALUE);
        buildTimerService.registerEventTimeTimer(VoidNamespace.INSTANCE, 124L);
        this.timeServiceManager.advanceWatermark(Watermark.MAX_WATERMARK);
        Assertions.assertThat(arrayList).containsExactly(new Long[]{123L, 124L});
    }

    @Test
    void testProcessingTimeTimers() {
        ArrayList arrayList = new ArrayList();
        InternalTimerService<VoidNamespace> buildTimerService = buildTimerService(LambdaTrigger.processingTimeTrigger(internalTimer -> {
            arrayList.add(Long.valueOf(internalTimer.getTimestamp()));
        }));
        this.keyedStatedBackend.setCurrentKey(1);
        buildTimerService.registerProcessingTimeTimer(VoidNamespace.INSTANCE, 150L);
        Assertions.assertThat(this.processingTimeService.getNumActiveTimers()).isZero();
        this.keyedStatedBackend.setCurrentKey(2);
        Assertions.assertThat(arrayList).containsExactly(new Long[]{150L});
    }

    @Test
    void testIgnoringEventTimeTimersFromWithinCallback() {
        ArrayList arrayList = new ArrayList();
        TriggerWithTimerServiceAccess eventTimeTrigger = TriggerWithTimerServiceAccess.eventTimeTrigger((internalTimer, internalTimerService) -> {
            arrayList.add(Long.valueOf(internalTimer.getTimestamp()));
            internalTimerService.registerEventTimeTimer(VoidNamespace.INSTANCE, internalTimer.getTimestamp() + 20);
        });
        InternalTimerService<VoidNamespace> buildTimerService = buildTimerService(eventTimeTrigger);
        eventTimeTrigger.setTimerService(buildTimerService);
        this.keyedStatedBackend.setCurrentKey(1);
        buildTimerService.registerEventTimeTimer(VoidNamespace.INSTANCE, 150L);
        Assertions.assertThat(this.processingTimeService.getNumActiveTimers()).isZero();
        this.keyedStatedBackend.setCurrentKey(2);
        Assertions.assertThat(arrayList).containsExactly(new Long[]{150L});
    }

    @Test
    void testIgnoringProcessingTimeTimersFromWithinCallback() {
        ArrayList arrayList = new ArrayList();
        TriggerWithTimerServiceAccess processingTimeTrigger = TriggerWithTimerServiceAccess.processingTimeTrigger((internalTimer, internalTimerService) -> {
            arrayList.add(Long.valueOf(internalTimer.getTimestamp()));
            internalTimerService.registerProcessingTimeTimer(VoidNamespace.INSTANCE, internalTimer.getTimestamp() + 20);
        });
        InternalTimerService<VoidNamespace> buildTimerService = buildTimerService(processingTimeTrigger);
        processingTimeTrigger.setTimerService(buildTimerService);
        this.keyedStatedBackend.setCurrentKey(1);
        buildTimerService.registerProcessingTimeTimer(VoidNamespace.INSTANCE, 150L);
        Assertions.assertThat(this.processingTimeService.getNumActiveTimers()).isZero();
        this.keyedStatedBackend.setCurrentKey(2);
        Assertions.assertThat(arrayList).containsExactly(new Long[]{150L});
    }

    private InternalTimerService<VoidNamespace> buildTimerService(Triggerable<Integer, VoidNamespace> triggerable) {
        BatchExecutionInternalTimeServiceWithAsyncState internalTimerService = this.timeServiceManager.getInternalTimerService("test", KEY_SERIALIZER, new VoidNamespaceSerializer(), triggerable);
        internalTimerService.setup(this.aec);
        return internalTimerService;
    }
}
