package org.apache.flink.streaming.runtime.tasks.mailbox;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.streaming.api.operators.MailboxExecutor;
import org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor;
import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailbox;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.RunnableWithException;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

/* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/mailbox/MailboxExecutorImplTest.class */
public class MailboxExecutorImplTest {
    public static final int DEFAULT_PRIORITY = 0;
    private MailboxExecutor mailboxExecutor;
    private ExecutorService otherThreadExecutor;
    private TaskMailboxImpl mailbox;
    private MailboxProcessor mailboxProcessor;

    @Rule
    public ExpectedException expectedException = ExpectedException.none();

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/mailbox/MailboxExecutorImplTest$TestRunnable.class */
    static class TestRunnable implements RunnableWithException {
        private Thread executedByThread = null;

        TestRunnable() {
        }

        public void run() {
            Preconditions.checkState(!isExecuted(), "Runnable was already executed before by " + this.executedByThread);
            this.executedByThread = Thread.currentThread();
        }

        boolean isExecuted() {
            return this.executedByThread != null;
        }

        Thread wasExecutedBy() {
            return this.executedByThread;
        }
    }

    @Before
    public void setUp() throws Exception {
        this.mailbox = new TaskMailboxImpl();
        this.mailboxExecutor = new MailboxExecutorImpl(this.mailbox, 0, StreamTaskActionExecutor.IMMEDIATE);
        this.otherThreadExecutor = Executors.newSingleThreadScheduledExecutor();
        this.mailboxProcessor = new MailboxProcessor(controller -> {
        }, this.mailbox, StreamTaskActionExecutor.IMMEDIATE);
    }

    @After
    public void tearDown() {
        this.otherThreadExecutor.shutdown();
        try {
            if (!this.otherThreadExecutor.awaitTermination(60L, TimeUnit.SECONDS)) {
                this.otherThreadExecutor.shutdownNow();
                if (!this.otherThreadExecutor.awaitTermination(60L, TimeUnit.SECONDS)) {
                    throw new IllegalStateException("Thread pool did not terminate on time!");
                }
            }
        } catch (InterruptedException e) {
            this.otherThreadExecutor.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }

    @Test
    public void testIsIdle() throws Exception {
        MailboxProcessor mailboxProcessor = new MailboxProcessor();
        MailboxExecutorImpl mailboxExecutor = mailboxProcessor.getMailboxExecutor(0);
        Assert.assertFalse(mailboxExecutor.isIdle());
        mailboxProcessor.runMailboxStep();
        mailboxProcessor.mailbox.drain();
        Assert.assertTrue(mailboxExecutor.isIdle());
        mailboxExecutor.execute(() -> {
        }, "");
        Assert.assertFalse(mailboxExecutor.isIdle());
        mailboxProcessor.mailbox.drain();
        mailboxProcessor.mailbox.quiesce();
        Assert.assertFalse(mailboxExecutor.isIdle());
    }

    @Test
    public void testOperations() throws Exception {
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        CompletableFuture.runAsync(() -> {
            this.mailboxExecutor.execute(() -> {
                atomicBoolean.set(true);
            }, "");
        }, this.otherThreadExecutor).get();
        this.mailbox.take(0).run();
        Assert.assertTrue(atomicBoolean.get());
    }

    @Test
    public void testClose() throws Exception {
        TestRunnable testRunnable = new TestRunnable();
        TestRunnable testRunnable2 = new TestRunnable();
        this.mailboxExecutor.execute(testRunnable, "yieldRun");
        Future future = (Future) CompletableFuture.supplyAsync(() -> {
            return this.mailboxExecutor.submit(testRunnable2, "leftoverRun");
        }, this.otherThreadExecutor).get();
        Assert.assertTrue(this.mailboxExecutor.tryYield());
        Assert.assertEquals(Thread.currentThread(), testRunnable.wasExecutedBy());
        Assert.assertFalse(future.isDone());
        Assert.assertFalse(future.isCancelled());
        this.mailboxProcessor.close();
        Assert.assertTrue(future.isCancelled());
        try {
            this.mailboxExecutor.tryYield();
            Assert.fail("yielding should not work after shutdown().");
        } catch (TaskMailbox.MailboxClosedException e) {
        }
        try {
            this.mailboxExecutor.yield();
            Assert.fail("yielding should not work after shutdown().");
        } catch (TaskMailbox.MailboxClosedException e2) {
        }
    }

    @Test
    public void testTryYield() throws Exception {
        TestRunnable testRunnable = new TestRunnable();
        CompletableFuture.runAsync(() -> {
            this.mailboxExecutor.execute(testRunnable, "testRunnable");
        }, this.otherThreadExecutor).get();
        Assert.assertTrue(this.mailboxExecutor.tryYield());
        Assert.assertFalse(this.mailboxExecutor.tryYield());
        Assert.assertEquals(Thread.currentThread(), testRunnable.wasExecutedBy());
    }

    @Test
    public void testYield() throws Exception {
        AtomicReference atomicReference = new AtomicReference();
        TestRunnable testRunnable = new TestRunnable();
        Thread thread = new Thread(() -> {
            try {
                this.mailboxExecutor.execute(testRunnable, "testRunnable");
            } catch (Exception e) {
                atomicReference.set(e);
            }
        });
        thread.start();
        this.mailboxExecutor.yield();
        thread.join();
        Assert.assertNull(atomicReference.get());
        Assert.assertEquals(Thread.currentThread(), testRunnable.wasExecutedBy());
    }
}
