package org.apache.flink.streaming.api.operators;

import java.util.ArrayList;
import java.util.Optional;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream;
import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor;
import org.apache.flink.streaming.runtime.tasks.mailbox.Mail;
import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl;
import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl;
import org.apache.flink.streaming.util.CollectorOutput;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/streaming/api/operators/MailboxWatermarkProcessorTest.class */
class MailboxWatermarkProcessorTest {

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/MailboxWatermarkProcessorTest$NoOpInternalTimeServiceManager.class */
    private static class NoOpInternalTimeServiceManager implements InternalTimeServiceManager<Object> {
        private NoOpInternalTimeServiceManager() {
        }

        public <N> InternalTimerService<N> getInternalTimerService(String str, TypeSerializer<Object> typeSerializer, TypeSerializer<N> typeSerializer2, Triggerable<Object, N> triggerable) {
            throw new UnsupportedOperationException();
        }

        public <N> InternalTimerService<N> getAsyncInternalTimerService(String str, TypeSerializer<Object> typeSerializer, TypeSerializer<N> typeSerializer2, Triggerable<Object, N> triggerable, AsyncExecutionController<Object> asyncExecutionController) {
            throw new UnsupportedOperationException();
        }

        public void advanceWatermark(Watermark watermark) throws Exception {
            throw new UnsupportedOperationException();
        }

        public boolean tryAdvanceWatermark(Watermark watermark, InternalTimeServiceManager.ShouldStopAdvancingFn shouldStopAdvancingFn) throws Exception {
            return !shouldStopAdvancingFn.test();
        }

        public void snapshotToRawKeyedState(KeyedStateCheckpointOutputStream keyedStateCheckpointOutputStream, String str) throws Exception {
            throw new UnsupportedOperationException();
        }
    }

    MailboxWatermarkProcessorTest() {
    }

    @Test
    void testEmitWatermarkInsideMailbox() throws Exception {
        ArrayList arrayList = new ArrayList();
        TaskMailboxImpl taskMailboxImpl = new TaskMailboxImpl();
        MailboxWatermarkProcessor mailboxWatermarkProcessor = new MailboxWatermarkProcessor(new CollectorOutput(arrayList), new MailboxExecutorImpl(taskMailboxImpl, 42, StreamTaskActionExecutor.IMMEDIATE), new NoOpInternalTimeServiceManager());
        ArrayList arrayList2 = new ArrayList();
        mailboxWatermarkProcessor.emitWatermarkInsideMailbox(new Watermark(1L));
        mailboxWatermarkProcessor.emitWatermarkInsideMailbox(new Watermark(2L));
        mailboxWatermarkProcessor.emitWatermarkInsideMailbox(new Watermark(3L));
        arrayList2.add(new Watermark(1L));
        arrayList2.add(new Watermark(2L));
        arrayList2.add(new Watermark(3L));
        Assertions.assertThat(arrayList).containsExactlyElementsOf(arrayList2);
        taskMailboxImpl.put(new Mail(() -> {
        }, -1, "checkpoint mail", new Object[0]));
        mailboxWatermarkProcessor.emitWatermarkInsideMailbox(new Watermark(4L));
        mailboxWatermarkProcessor.emitWatermarkInsideMailbox(new Watermark(5L));
        Assertions.assertThat(arrayList).containsExactlyElementsOf(arrayList2);
        Assertions.assertThat(taskMailboxImpl.tryTake(42)).isEqualTo(Optional.empty());
        Assertions.assertThat(arrayList).containsExactlyElementsOf(arrayList2);
        while (taskMailboxImpl.hasMail()) {
            taskMailboxImpl.take(-1).run();
        }
        arrayList2.add(new Watermark(5L));
        Assertions.assertThat(arrayList).containsExactlyElementsOf(arrayList2);
    }
}
