package org.apache.flink.streaming.runtime.tasks.mailbox;

import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.concurrent.FutureTaskWithException;
import org.apache.flink.streaming.api.operators.MailboxExecutor;
import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.function.RunnableWithException;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

/* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailboxProcessorTest.class */
public class TaskMailboxProcessorTest {
    public static final int DEFAULT_PRIORITY = 0;

    @Rule
    public ExpectedException expectedException = ExpectedException.none();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailboxProcessorTest$MailboxThread.class */
    public static class MailboxThread extends Thread implements MailboxDefaultAction {
        MailboxProcessor mailboxProcessor;
        OneShotLatch mailboxCreatedLatch = new OneShotLatch();
        OneShotLatch canRun = new OneShotLatch();
        private Throwable caughtException;

        MailboxThread() {
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public final void run() {
            this.mailboxProcessor = new MailboxProcessor(this);
            this.mailboxCreatedLatch.trigger();
            try {
                this.canRun.await();
                this.mailboxProcessor.runMailboxLoop();
            } catch (Throwable th) {
                this.caughtException = th;
            }
        }

        public void runDefaultAction(MailboxDefaultAction.Controller controller) throws Exception {
            controller.allActionsCompleted();
        }

        final MailboxProcessor getMailboxProcessor() {
            try {
                this.mailboxCreatedLatch.await();
                return this.mailboxProcessor;
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
        }

        final void signalStart() {
            if (this.mailboxCreatedLatch.isTriggered()) {
                this.canRun.trigger();
            }
        }

        void checkException() throws Exception {
            if (this.caughtException != null) {
                throw new Exception(this.caughtException);
            }
        }
    }

    @Test
    public void testRejectIfNotOpen() {
        MailboxProcessor mailboxProcessor = new MailboxProcessor(controller -> {
        });
        mailboxProcessor.prepareClose();
        try {
            mailboxProcessor.getMailboxExecutor(0).execute(() -> {
            }, "dummy");
            Assert.fail("Should not be able to accept runnables if not opened.");
        } catch (RejectedExecutionException e) {
        }
    }

    @Test
    public void testSubmittingRunnableWithException() throws Exception {
        this.expectedException.expectMessage("Expected");
        MailboxProcessor mailboxProcessor = new MailboxProcessor(controller -> {
        });
        Throwable th = null;
        try {
            Thread thread = new Thread(() -> {
                mailboxProcessor.getMainMailboxExecutor().execute(this::throwFlinkException, "testSubmittingRunnableWithException");
            });
            thread.start();
            mailboxProcessor.runMailboxLoop();
            thread.join();
            if (mailboxProcessor != null) {
                if (0 == 0) {
                    mailboxProcessor.close();
                    return;
                }
                try {
                    mailboxProcessor.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (mailboxProcessor != null) {
                if (0 != 0) {
                    try {
                        mailboxProcessor.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    mailboxProcessor.close();
                }
            }
            throw th3;
        }
    }

    private void throwFlinkException() throws FlinkException {
        throw new FlinkException("Expected");
    }

    @Test
    public void testShutdown() {
        MailboxProcessor mailboxProcessor = new MailboxProcessor(controller -> {
        });
        FutureTaskWithException futureTaskWithException = new FutureTaskWithException(() -> {
        });
        mailboxProcessor.getMailboxExecutor(0).execute(futureTaskWithException, "testRunnableFuture");
        mailboxProcessor.prepareClose();
        try {
            mailboxProcessor.getMailboxExecutor(0).execute(() -> {
            }, "dummy");
            Assert.fail("Should not be able to accept runnables if not opened.");
        } catch (RejectedExecutionException e) {
        }
        Assert.assertFalse(futureTaskWithException.isDone());
        mailboxProcessor.close();
        Assert.assertTrue(futureTaskWithException.isCancelled());
    }

    @Test
    public void testRunDefaultActionAndMails() throws Exception {
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        MailboxThread mailboxThread = new MailboxThread() { // from class: org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxProcessorTest.1
            @Override // org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxProcessorTest.MailboxThread
            public void runDefaultAction(MailboxDefaultAction.Controller controller) throws Exception {
                if (atomicBoolean.get()) {
                    controller.allActionsCompleted();
                } else {
                    Thread.sleep(10L);
                }
            }
        };
        start(mailboxThread).getMailboxExecutor(0).execute(() -> {
            atomicBoolean.set(true);
        }, "stop");
        mailboxThread.join();
    }

    @Test
    public void testRunDefaultAction() throws Exception {
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        MailboxThread mailboxThread = new MailboxThread() { // from class: org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxProcessorTest.2
            @Override // org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxProcessorTest.MailboxThread
            public void runDefaultAction(MailboxDefaultAction.Controller controller) {
                if (atomicInteger.incrementAndGet() == 3) {
                    controller.allActionsCompleted();
                }
            }
        };
        start(mailboxThread);
        mailboxThread.join();
        Assert.assertEquals(3L, atomicInteger.get());
    }

    @Test
    public void testSignalUnAvailable() throws Exception {
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        final AtomicReference atomicReference = new AtomicReference();
        final OneShotLatch oneShotLatch = new OneShotLatch();
        MailboxThread mailboxThread = new MailboxThread() { // from class: org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxProcessorTest.3
            @Override // org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxProcessorTest.MailboxThread
            public void runDefaultAction(MailboxDefaultAction.Controller controller) {
                if (atomicInteger.incrementAndGet() == 3) {
                    atomicReference.set(controller.suspendDefaultAction());
                    oneShotLatch.trigger();
                } else if (atomicInteger.get() == 6) {
                    controller.allActionsCompleted();
                }
            }
        };
        MailboxProcessor start = start(mailboxThread);
        oneShotLatch.await();
        Assert.assertEquals(3L, atomicInteger.get());
        MailboxDefaultAction.Suspension suspension = (MailboxDefaultAction.Suspension) atomicReference.get();
        MailboxExecutor mailboxExecutor = start.getMailboxExecutor(0);
        suspension.getClass();
        mailboxExecutor.execute(suspension::resume, "resume");
        mailboxThread.join();
        Assert.assertEquals(6L, atomicInteger.get());
    }

    @Test
    public void testSignalUnAvailablePingPong() throws Exception {
        final AtomicReference atomicReference = new AtomicReference();
        MailboxThread mailboxThread = new MailboxThread() { // from class: org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxProcessorTest.4
            int count = 0;

            @Override // org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxProcessorTest.MailboxThread
            public void runDefaultAction(MailboxDefaultAction.Controller controller) {
                Assert.assertTrue(atomicReference.compareAndSet(null, controller.suspendDefaultAction()));
                this.count++;
                if (this.count == 10000) {
                    controller.allActionsCompleted();
                } else if (this.count % 1000 == 0) {
                    try {
                        Thread.sleep(1L);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }
            }
        };
        mailboxThread.start();
        MailboxProcessor mailboxProcessor = mailboxThread.getMailboxProcessor();
        Thread thread = new Thread(() -> {
            int i = 0;
            while (!Thread.currentThread().isInterrupted()) {
                MailboxDefaultAction.Suspension suspension = (MailboxDefaultAction.Suspension) atomicReference.getAndSet(null);
                if (suspension != null) {
                    MailboxExecutor mailboxExecutor = mailboxProcessor.getMailboxExecutor(0);
                    suspension.getClass();
                    mailboxExecutor.execute(suspension::resume, "resume");
                } else {
                    try {
                        mailboxProcessor.getMailboxExecutor(0).execute(() -> {
                        }, "dummy");
                    } catch (RejectedExecutionException e) {
                    }
                }
                i++;
                if (i % 5000 == 0) {
                    try {
                        Thread.sleep(1L);
                    } catch (InterruptedException e2) {
                        Thread.currentThread().interrupt();
                    }
                }
            }
        });
        thread.start();
        mailboxThread.signalStart();
        mailboxThread.join();
        thread.interrupt();
        thread.join();
        mailboxThread.checkException();
    }

    @Test
    public void testCancelAfterClose() {
        MailboxProcessor mailboxProcessor = new MailboxProcessor(controller -> {
        });
        mailboxProcessor.close();
        mailboxProcessor.allActionsCompleted();
    }

    private static MailboxProcessor start(MailboxThread mailboxThread) {
        mailboxThread.start();
        MailboxProcessor mailboxProcessor = mailboxThread.getMailboxProcessor();
        mailboxThread.signalStart();
        return mailboxProcessor;
    }

    @Test
    public void testAvoidStarvation() throws Exception {
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        MailboxThread mailboxThread = new MailboxThread() { // from class: org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxProcessorTest.5
            @Override // org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxProcessorTest.MailboxThread
            public void runDefaultAction(MailboxDefaultAction.Controller controller) {
                if (atomicInteger.incrementAndGet() == 3) {
                    controller.allActionsCompleted();
                }
            }
        };
        mailboxThread.start();
        final MailboxExecutor mailboxExecutor = mailboxThread.getMailboxProcessor().getMailboxExecutor(0);
        final AtomicInteger atomicInteger2 = new AtomicInteger();
        mailboxExecutor.execute(new RunnableWithException() { // from class: org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxProcessorTest.6
            public void run() {
                mailboxExecutor.execute(this, "Blocking mail" + atomicInteger2.incrementAndGet());
            }
        }, "Blocking mail" + atomicInteger2.get());
        mailboxThread.signalStart();
        mailboxThread.join();
        Assert.assertEquals(3L, atomicInteger.get());
        Assert.assertEquals(3L, atomicInteger2.get());
    }

    @Test
    public void testSuspendRunningMailboxLoop() throws Exception {
        OneShotLatch oneShotLatch = new OneShotLatch();
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        MailboxProcessor mailboxProcessor = new MailboxProcessor(controller -> {
            oneShotLatch.trigger();
            if (atomicBoolean.get()) {
                controller.allActionsCompleted();
            }
        });
        Thread thread = new Thread(() -> {
            try {
                oneShotLatch.await();
                mailboxProcessor.suspend();
                mailboxProcessor.getMailboxExecutor(0).execute(() -> {
                    atomicBoolean.set(true);
                }, "stop");
            } catch (Exception e) {
            }
        });
        thread.start();
        mailboxProcessor.runMailboxLoop();
        thread.join();
        Assert.assertFalse(atomicBoolean.get());
        mailboxProcessor.runMailboxLoop();
        Assert.assertFalse(mailboxProcessor.isMailboxLoopRunning());
        Assert.assertTrue(atomicBoolean.get());
    }

    @Test
    public void testResumeMailboxLoopAfterAllActionsCompleted() throws Exception {
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        MailboxProcessor mailboxProcessor = new MailboxProcessor(controller -> {
            atomicBoolean.set(true);
        });
        mailboxProcessor.allActionsCompleted();
        mailboxProcessor.runMailboxLoop();
        Assert.assertFalse(mailboxProcessor.isMailboxLoopRunning());
        Assert.assertFalse(atomicBoolean.get());
        mailboxProcessor.runMailboxLoop();
        Assert.assertFalse(atomicBoolean.get());
    }

    @Test
    public void testResumeMailboxLoop() throws Exception {
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        MailboxProcessor mailboxProcessor = new MailboxProcessor(controller -> {
            atomicBoolean.set(true);
            controller.allActionsCompleted();
        });
        mailboxProcessor.suspend();
        mailboxProcessor.runMailboxLoop();
        Assert.assertFalse(atomicBoolean.get());
        mailboxProcessor.runMailboxLoop();
        Assert.assertTrue(atomicBoolean.get());
    }
}
