package org.apache.flink.streaming.runtime.tasks;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.common.operators.ProcessingTimeService;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.io.network.api.StopMode;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper;
import org.apache.flink.streaming.util.MockStreamTaskBuilder;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamOperatorWrapperTest.class */
class StreamOperatorWrapperTest {
    private static SystemProcessingTimeService timerService;
    private static final int numOperators = 3;
    private List<StreamOperatorWrapper<?, ?>> operatorWrappers;
    private ConcurrentLinkedQueue<Object> output;
    private volatile StreamTask<?, ?> containingTask;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamOperatorWrapperTest$TestOneInputStreamOperator.class */
    public static class TestOneInputStreamOperator extends AbstractStreamOperator<String> implements OneInputStreamOperator<String, String>, BoundedOneInput {
        private static final long serialVersionUID = 1;
        private final String name;
        private final ConcurrentLinkedQueue<Object> output;
        private final ProcessingTimeService processingTimeService;
        private final MailboxExecutor mailboxExecutor;
        private final TimerMailController timerMailController;

        TestOneInputStreamOperator(String str, ConcurrentLinkedQueue<Object> concurrentLinkedQueue, ProcessingTimeService processingTimeService, MailboxExecutor mailboxExecutor, TimerMailController timerMailController) {
            this.name = str;
            this.output = concurrentLinkedQueue;
            this.processingTimeService = processingTimeService;
            this.mailboxExecutor = mailboxExecutor;
            this.timerMailController = timerMailController;
            processingTimeService.registerTimer(Long.MAX_VALUE, j -> {
                concurrentLinkedQueue.add("[" + str + "]: Timer not triggered");
            });
            super.setProcessingTimeService(processingTimeService);
        }

        public String getName() {
            return this.name;
        }

        public void processElement(StreamRecord<String> streamRecord) {
        }

        public void endInput() throws InterruptedException {
            this.output.add("[" + this.name + "]: End of input");
            ProcessingTimeService.ProcessingTimeCallback processingTimeCallback = j -> {
                this.output.add("[" + this.name + "]: Timer that was in mailbox before closing operator");
            };
            this.processingTimeService.registerTimer(0L, processingTimeCallback);
            this.timerMailController.getInMailboxLatch(processingTimeCallback).await();
        }

        public void finish() throws Exception {
            ProcessingTimeService.ProcessingTimeCallback processingTimeCallback = j -> {
                this.output.add("[" + this.name + "]: Timer to put in mailbox when finishing operator");
            };
            Assertions.assertThat(this.processingTimeService.registerTimer(0L, processingTimeCallback)).isNotNull();
            Assertions.assertThat(this.timerMailController.getPuttingLatch(processingTimeCallback)).isNull();
            this.mailboxExecutor.execute(() -> {
                this.output.add("[" + this.name + "]: Mail to put in mailbox when finishing operator");
            }, "");
            this.output.add("[" + this.name + "]: Bye");
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamOperatorWrapperTest$TimerMailController.class */
    private static class TimerMailController {
        private final StreamTask<?, ?> containingTask;
        private final MailboxExecutor mailboxExecutor;
        private final ConcurrentHashMap<ProcessingTimeService.ProcessingTimeCallback, OneShotLatch> puttingLatches = new ConcurrentHashMap<>();
        private final ConcurrentHashMap<ProcessingTimeService.ProcessingTimeCallback, OneShotLatch> inMailboxLatches = new ConcurrentHashMap<>();

        TimerMailController(StreamTask<?, ?> streamTask, MailboxExecutor mailboxExecutor) {
            this.containingTask = streamTask;
            this.mailboxExecutor = mailboxExecutor;
        }

        OneShotLatch getPuttingLatch(ProcessingTimeService.ProcessingTimeCallback processingTimeCallback) {
            return this.puttingLatches.get(processingTimeCallback);
        }

        OneShotLatch getInMailboxLatch(ProcessingTimeService.ProcessingTimeCallback processingTimeCallback) {
            return this.inMailboxLatches.get(processingTimeCallback);
        }

        ProcessingTimeService.ProcessingTimeCallback wrapCallback(ProcessingTimeService.ProcessingTimeCallback processingTimeCallback) {
            this.puttingLatches.put(processingTimeCallback, new OneShotLatch());
            this.inMailboxLatches.put(processingTimeCallback, new OneShotLatch());
            return j -> {
                this.puttingLatches.get(processingTimeCallback).trigger();
                this.containingTask.deferCallbackToMailbox(this.mailboxExecutor, processingTimeCallback).onProcessingTime(j);
                this.inMailboxLatches.get(processingTimeCallback).trigger();
            };
        }
    }

    StreamOperatorWrapperTest() {
    }

    @BeforeAll
    static void startTimeService() {
        CompletableFuture completableFuture = new CompletableFuture();
        Objects.requireNonNull(completableFuture);
        timerService = new SystemProcessingTimeService((v1) -> {
            r2.complete(v1);
        });
    }

    @AfterAll
    static void shutdownTimeService() {
        timerService.shutdownService();
    }

    @BeforeEach
    void setup() throws Exception {
        this.operatorWrappers = new ArrayList();
        this.output = new ConcurrentLinkedQueue<>();
        MockEnvironment build = MockEnvironment.builder().build();
        try {
            this.containingTask = new MockStreamTaskBuilder(build).build();
            int i = 0;
            while (i < 3) {
                MailboxExecutor createExecutor = this.containingTask.getMailboxExecutorFactory().createExecutor(i);
                TimerMailController timerMailController = new TimerMailController(this.containingTask, createExecutor);
                SystemProcessingTimeService systemProcessingTimeService = timerService;
                Objects.requireNonNull(timerMailController);
                TestOneInputStreamOperator testOneInputStreamOperator = new TestOneInputStreamOperator("Operator" + i, this.output, new ProcessingTimeServiceImpl(systemProcessingTimeService, timerMailController::wrapCallback), createExecutor, timerMailController);
                this.operatorWrappers.add(new StreamOperatorWrapper<>(testOneInputStreamOperator, Optional.ofNullable(testOneInputStreamOperator.getProcessingTimeService()), createExecutor, i == 0));
                i++;
            }
            StreamOperatorWrapper<?, ?> streamOperatorWrapper = null;
            for (StreamOperatorWrapper<?, ?> streamOperatorWrapper2 : this.operatorWrappers) {
                if (streamOperatorWrapper != null) {
                    streamOperatorWrapper.setNext(streamOperatorWrapper2);
                }
                streamOperatorWrapper2.setPrevious(streamOperatorWrapper);
                streamOperatorWrapper = streamOperatorWrapper2;
            }
            if (build != null) {
                build.close();
            }
        } catch (Throwable th) {
            if (build != null) {
                try {
                    build.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @AfterEach
    void teardown() throws Exception {
        this.containingTask.cleanUpInternal();
    }

    @Test
    void testFinish() throws Exception {
        this.output.clear();
        this.operatorWrappers.get(0).finish(this.containingTask.getActionExecutor(), StopMode.DRAIN);
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < this.operatorWrappers.size(); i++) {
            String str = "[Operator" + i + "]";
            Collections.addAll(arrayList, str + ": End of input", str + ": Timer that was in mailbox before closing operator", str + ": Bye", str + ": Mail to put in mailbox when finishing operator");
        }
        Assertions.assertThat(this.output).as("Output was not correct.", new Object[0]).containsExactlyElementsOf(arrayList.subList(2, arrayList.size()));
    }

    @Test
    void testFinishingOperatorWithException() {
        AbstractStreamOperator<Void> abstractStreamOperator = new AbstractStreamOperator<Void>() { // from class: org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapperTest.1
            public void finish() throws Exception {
                throw new Exception("test exception at finishing");
            }
        };
        StreamOperatorWrapper streamOperatorWrapper = new StreamOperatorWrapper(abstractStreamOperator, Optional.ofNullable(abstractStreamOperator.getProcessingTimeService()), this.containingTask.getMailboxExecutorFactory().createExecutor(2147483646), true);
        Assertions.assertThatThrownBy(() -> {
            streamOperatorWrapper.finish(this.containingTask.getActionExecutor(), StopMode.DRAIN);
        }).hasMessageContaining("test exception at finishing");
    }

    @Test
    void testReadIterator() {
        StreamOperatorWrapper.ReadIterator readIterator = new StreamOperatorWrapper.ReadIterator(this.operatorWrappers.get(0), false);
        for (int i = 0; i < this.operatorWrappers.size(); i++) {
            Assertions.assertThat(readIterator).hasNext();
            StreamOperatorWrapper<?, ?> streamOperatorWrapper = (StreamOperatorWrapper) readIterator.next();
            Assertions.assertThat(streamOperatorWrapper).isNotNull();
            Assertions.assertThat(getStreamOperatorFromWrapper(streamOperatorWrapper).getName()).isEqualTo("Operator" + i);
        }
        Assertions.assertThat(readIterator).isExhausted();
        StreamOperatorWrapper.ReadIterator readIterator2 = new StreamOperatorWrapper.ReadIterator(this.operatorWrappers.get(this.operatorWrappers.size() - 1), true);
        for (int size = this.operatorWrappers.size() - 1; size >= 0; size--) {
            Assertions.assertThat(readIterator2).hasNext();
            StreamOperatorWrapper<?, ?> streamOperatorWrapper2 = (StreamOperatorWrapper) readIterator2.next();
            Assertions.assertThat(streamOperatorWrapper2).isNotNull();
            Assertions.assertThat(getStreamOperatorFromWrapper(streamOperatorWrapper2).getName()).isEqualTo("Operator" + size);
        }
        Assertions.assertThat(readIterator2).isExhausted();
    }

    private TestOneInputStreamOperator getStreamOperatorFromWrapper(StreamOperatorWrapper<?, ?> streamOperatorWrapper) {
        return (TestOneInputStreamOperator) Objects.requireNonNull(streamOperatorWrapper.getStreamOperator());
    }
}
