package org.apache.flink.streaming.runtime.io.checkpointing;

import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.MailboxWatermarkProcessor;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.api.operators.YieldingOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarness;
import org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarnessBuilder;
import org.apache.flink.util.TestLoggerExtension;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith({TestLoggerExtension.class})
/* loaded from: input_file:org/apache/flink/streaming/runtime/io/checkpointing/UnalignedCheckpointsInterruptibleTimersTest.class */
class UnalignedCheckpointsInterruptibleTimersTest {

    /* loaded from: input_file:org/apache/flink/streaming/runtime/io/checkpointing/UnalignedCheckpointsInterruptibleTimersTest$MultipleTimersAtTheSameTimestamp.class */
    private static class MultipleTimersAtTheSameTimestamp extends AbstractStreamOperator<String> implements OneInputStreamOperator<String, String>, Triggerable<String, String>, YieldingOperator<String> {
        private final Map<Instant, Integer> timersToRegister;

        @Nullable
        private transient MailboxExecutor mailboxExecutor;

        @Nullable
        private transient MailboxWatermarkProcessor watermarkProcessor;

        MultipleTimersAtTheSameTimestamp() {
            this(Collections.emptyMap());
        }

        MultipleTimersAtTheSameTimestamp(Map<Instant, Integer> map) {
            this.timersToRegister = map;
        }

        public void setMailboxExecutor(MailboxExecutor mailboxExecutor) {
            this.mailboxExecutor = mailboxExecutor;
        }

        public void open() throws Exception {
            super.open();
            if (getTimeServiceManager().isPresent()) {
                this.watermarkProcessor = new MailboxWatermarkProcessor(this.output, this.mailboxExecutor, (InternalTimeServiceManager) getTimeServiceManager().get());
            }
        }

        public void processElement(StreamRecord<String> streamRecord) {
            if (this.timersToRegister.isEmpty()) {
                return;
            }
            InternalTimerService internalTimerService = getInternalTimerService("timers", StringSerializer.INSTANCE, this);
            for (Map.Entry<Instant, Integer> entry : this.timersToRegister.entrySet()) {
                for (int i = 0; i < entry.getValue().intValue(); i++) {
                    setCurrentKey(String.format("key-%d", Integer.valueOf(i)));
                    internalTimerService.registerEventTimeTimer(String.format("window-%s", entry.getKey()), entry.getKey().toEpochMilli());
                }
            }
        }

        public void processWatermark(Watermark watermark) throws Exception {
            if (this.watermarkProcessor == null) {
                super.processWatermark(watermark);
            } else {
                this.watermarkProcessor.emitWatermarkInsideMailbox(watermark);
            }
        }

        public void onEventTime(InternalTimer<String, String> internalTimer) throws Exception {
            this.mailboxExecutor.execute(() -> {
                this.output.collect(UnalignedCheckpointsInterruptibleTimersTest.asMailRecord((String) internalTimer.getKey()));
            }, "mail-" + ((String) internalTimer.getKey()));
            this.output.collect(UnalignedCheckpointsInterruptibleTimersTest.asFiredRecord((String) internalTimer.getKey()));
        }

        public void onProcessingTime(InternalTimer<String, String> internalTimer) throws Exception {
        }

        MultipleTimersAtTheSameTimestamp withTimers(Instant instant, int i) {
            HashMap hashMap = new HashMap(this.timersToRegister);
            hashMap.put(instant, Integer.valueOf(i));
            return new MultipleTimersAtTheSameTimestamp(hashMap);
        }
    }

    UnalignedCheckpointsInterruptibleTimersTest() {
    }

    @Test
    void testSingleWatermarkHoldingOperatorInTheChain() throws Exception {
        Instant ofEpochMilli = Instant.ofEpochMilli(1000L);
        Instant ofEpochMilli2 = Instant.ofEpochMilli(2000L);
        StreamTaskMailboxTestHarness build = ((StreamTaskMailboxTestHarnessBuilder) new StreamTaskMailboxTestHarnessBuilder(OneInputStreamTask::new, Types.STRING).modifyStreamConfig(UnalignedCheckpointsInterruptibleTimersTest::setupStreamConfig).addInput(Types.STRING).setupOperatorChain((StreamOperatorFactory<?>) SimpleOperatorFactory.of(new MultipleTimersAtTheSameTimestamp().withTimers(ofEpochMilli, 2).withTimers(ofEpochMilli2, 2))).name("first").finishForSingletonOperatorChain(StringSerializer.INSTANCE)).build();
        try {
            build.processElement(new StreamRecord("register timers"));
            build.processAll();
            build.processElement(asWatermark(ofEpochMilli));
            build.processElement(asWatermark(ofEpochMilli2));
            Assertions.assertThat(build.getOutput()).containsExactly(new Object[]{asFiredRecord("key-0"), asMailRecord("key-0"), asFiredRecord("key-1"), asMailRecord("key-1"), asWatermark(ofEpochMilli), asFiredRecord("key-0"), asMailRecord("key-0"), asFiredRecord("key-1"), asMailRecord("key-1"), asWatermark(ofEpochMilli2)});
            if (build != null) {
                build.close();
            }
        } catch (Throwable th) {
            if (build != null) {
                try {
                    build.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    void testWatermarkProgressWithNoTimers() throws Exception {
        Instant ofEpochMilli = Instant.ofEpochMilli(1000L);
        Instant ofEpochMilli2 = Instant.ofEpochMilli(2000L);
        StreamTaskMailboxTestHarness build = ((StreamTaskMailboxTestHarnessBuilder) new StreamTaskMailboxTestHarnessBuilder(OneInputStreamTask::new, Types.STRING).modifyStreamConfig(UnalignedCheckpointsInterruptibleTimersTest::setupStreamConfig).addInput(Types.STRING).setupOperatorChain((StreamOperatorFactory<?>) SimpleOperatorFactory.of(new MultipleTimersAtTheSameTimestamp())).name("first").finishForSingletonOperatorChain(StringSerializer.INSTANCE)).build();
        try {
            build.setAutoProcess(false);
            build.processElement(new StreamRecord("impulse"));
            build.processAll();
            build.processElement(asWatermark(ofEpochMilli));
            build.processElement(asWatermark(ofEpochMilli2));
            ArrayList arrayList = new ArrayList();
            while (arrayList.size() < 2) {
                build.processSingleStep();
                while (true) {
                    Object poll = build.getOutput().poll();
                    if (poll != null) {
                        if (poll instanceof Watermark) {
                            arrayList.add((Watermark) poll);
                        }
                    }
                }
            }
            Assertions.assertThat(arrayList).containsExactly(new Watermark[]{asWatermark(ofEpochMilli), asWatermark(ofEpochMilli2)});
            if (build != null) {
                build.close();
            }
        } catch (Throwable th) {
            if (build != null) {
                try {
                    build.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private static Watermark asWatermark(Instant instant) {
        return new Watermark(instant.toEpochMilli());
    }

    private static StreamRecord<String> asFiredRecord(String str) {
        return new StreamRecord<>("fired-" + str);
    }

    private static StreamRecord<String> asMailRecord(String str) {
        return new StreamRecord<>("mail-" + str);
    }

    private static void setupStreamConfig(StreamConfig streamConfig) {
        streamConfig.setUnalignedCheckpointsEnabled(true);
        streamConfig.setUnalignedCheckpointsSplittableTimersEnabled(true);
        streamConfig.setStateKeySerializer(StringSerializer.INSTANCE);
    }
}
