/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks.mailbox;

import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.streaming.runtime.tasks.mailbox.Mail;
import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailbox;
import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl;
import org.apache.flink.util.function.FunctionWithException;
import org.apache.flink.util.function.RunnableWithException;
import org.apache.flink.util.function.ThrowingRunnable;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

class TaskMailboxImplTest {
    private static final RunnableWithException NO_OP = () -> {};
    private static final int DEFAULT_PRIORITY = 0;
    private TaskMailbox taskMailbox;

    TaskMailboxImplTest() {
    }

    @BeforeEach
    void setUp() {
        this.taskMailbox = new TaskMailboxImpl();
    }

    @AfterEach
    void tearDown() {
        this.taskMailbox.close();
    }

    @Test
    void testContracts() throws InterruptedException {
        LinkedList<Mail> testObjects = new LinkedList<Mail>();
        Assertions.assertThat((boolean)this.taskMailbox.hasMail()).isFalse();
        for (int i = 0; i < 10; ++i) {
            Mail mail = new Mail((ThrowingRunnable)NO_OP, 0, "mail, DEFAULT_PRIORITY", new Object[0]);
            testObjects.add(mail);
            this.taskMailbox.put(mail);
            Assertions.assertThat((boolean)this.taskMailbox.hasMail()).isTrue();
        }
        while (!testObjects.isEmpty()) {
            Assertions.assertThat((Object)this.taskMailbox.take(0)).isEqualTo(testObjects.remove());
            Assertions.assertThat((boolean)this.taskMailbox.hasMail()).isEqualTo(!testObjects.isEmpty());
        }
    }

    @Test
    void testConcurrentPutTakeBlocking() throws Exception {
        this.testPutTake((FunctionWithException<TaskMailbox, Mail, InterruptedException>)((FunctionWithException)mailbox -> mailbox.take(0)));
    }

    @Test
    void testConcurrentPutTakeNonBlockingAndWait() throws Exception {
        this.testPutTake((FunctionWithException<TaskMailbox, Mail, InterruptedException>)((FunctionWithException)mailbox -> {
            Optional optionalMail = mailbox.tryTake(0);
            while (!optionalMail.isPresent()) {
                optionalMail = mailbox.tryTake(0);
            }
            return (Mail)optionalMail.get();
        }));
    }

    @Test
    void testCloseUnblocks() throws InterruptedException {
        this.testAllPuttingUnblocksInternal(TaskMailbox::close);
    }

    @Test
    void testQuiesceUnblocks() throws InterruptedException {
        this.testAllPuttingUnblocksInternal(TaskMailbox::quiesce);
    }

    @Test
    void testLifeCycleQuiesce() throws InterruptedException {
        this.taskMailbox.put(new Mail((ThrowingRunnable)NO_OP, 0, "NO_OP, DEFAULT_PRIORITY", new Object[0]));
        this.taskMailbox.put(new Mail((ThrowingRunnable)NO_OP, 0, "NO_OP, DEFAULT_PRIORITY", new Object[0]));
        this.taskMailbox.quiesce();
        this.testLifecyclePuttingInternal();
        this.taskMailbox.take(0);
        Assertions.assertThat((Optional)this.taskMailbox.tryTake(0)).isPresent();
        Assertions.assertThat((Optional)this.taskMailbox.tryTake(0)).isNotPresent();
    }

    @Test
    void testLifeCycleClose() {
        this.taskMailbox.close();
        this.testLifecyclePuttingInternal();
        Assertions.assertThatThrownBy(() -> this.taskMailbox.take(0)).isInstanceOf(TaskMailbox.MailboxClosedException.class);
        Assertions.assertThatThrownBy(() -> this.taskMailbox.tryTake(0)).isInstanceOf(TaskMailbox.MailboxClosedException.class);
    }

    private void testLifecyclePuttingInternal() {
        Assertions.assertThatThrownBy(() -> this.taskMailbox.put(new Mail((ThrowingRunnable)NO_OP, 0, "NO_OP, DEFAULT_PRIORITY", new Object[0]))).isInstanceOf(TaskMailbox.MailboxClosedException.class);
        Assertions.assertThatThrownBy(() -> this.taskMailbox.put(new Mail(MailboxExecutor.MailOptions.urgent(), (ThrowingRunnable)NO_OP, Integer.MAX_VALUE, "NO_OP", new Object[0]))).isInstanceOf(TaskMailbox.MailboxClosedException.class);
    }

    private void testAllPuttingUnblocksInternal(Consumer<TaskMailbox> unblockMethod) throws InterruptedException {
        this.testUnblocksInternal(() -> this.taskMailbox.put(new Mail((ThrowingRunnable)NO_OP, 0, "NO_OP, DEFAULT_PRIORITY", new Object[0])), unblockMethod);
        this.setUp();
        this.testUnblocksInternal(() -> this.taskMailbox.put(new Mail(MailboxExecutor.MailOptions.urgent(), (ThrowingRunnable)NO_OP, Integer.MAX_VALUE, "NO_OP", new Object[0])), unblockMethod);
    }

    private void testUnblocksInternal(final RunnableWithException testMethod, Consumer<TaskMailbox> unblockMethod) throws InterruptedException {
        CheckedThread[] blockedThreads = new CheckedThread[8];
        final CountDownLatch countDownLatch = new CountDownLatch(blockedThreads.length);
        for (int i = 0; i < blockedThreads.length; ++i) {
            CheckedThread blocked;
            blockedThreads[i] = blocked = new CheckedThread(){

                public void go() throws Exception {
                    countDownLatch.countDown();
                    while (true) {
                        testMethod.run();
                    }
                }
            };
            blocked.start();
        }
        countDownLatch.await();
        unblockMethod.accept(this.taskMailbox);
        for (CheckedThread blockedThread : blockedThreads) {
            Assertions.assertThatThrownBy(() -> ((CheckedThread)blockedThread).sync()).isInstanceOf(TaskMailbox.MailboxClosedException.class);
        }
    }

    private void testPutTake(FunctionWithException<TaskMailbox, Mail, InterruptedException> takeMethod) throws Exception {
        int numThreads = 10;
        int numMailsPerThread = 1000;
        int[] results = new int[10];
        Thread[] writerThreads = new Thread[10];
        for (int i = 0; i < writerThreads.length; ++i) {
            int threadId = i;
            writerThreads[i] = new Thread(ThrowingRunnable.unchecked(() -> {
                for (int k = 0; k < 1000; ++k) {
                    this.taskMailbox.put(new Mail(() -> {
                        int n = threadId;
                        results[n] = results[n] + 1;
                    }, 0, "result " + k, new Object[0]));
                }
            }));
        }
        for (Thread writerThread : writerThreads) {
            writerThread.start();
        }
        for (Thread writerThread : writerThreads) {
            writerThread.join();
        }
        AtomicBoolean isRunning = new AtomicBoolean(true);
        this.taskMailbox.put(new Mail(() -> isRunning.set(false), 0, "POISON_MAIL, DEFAULT_PRIORITY", new Object[0]));
        while (isRunning.get()) {
            ((Mail)takeMethod.apply((Object)this.taskMailbox)).run();
        }
        for (int perThreadResult : results) {
            Assertions.assertThat((int)perThreadResult).isEqualTo(1000);
        }
    }

    @Test
    void testPutAsHeadWithPriority() throws InterruptedException {
        Mail mailA = new Mail(MailboxExecutor.MailOptions.urgent(), () -> {}, 2, "mailA", new Object[0]);
        Mail mailB = new Mail(() -> {}, 2, "mailB", new Object[0]);
        Mail mailC = new Mail(() -> {}, 1, "mailC", new Object[0]);
        Mail mailD = new Mail(() -> {}, 1, "mailD", new Object[0]);
        this.taskMailbox.put(mailC);
        this.taskMailbox.put(mailB);
        this.taskMailbox.put(mailD);
        this.taskMailbox.put(mailA);
        Assertions.assertThat((Object)this.taskMailbox.take(2)).isSameAs((Object)mailA);
        Assertions.assertThat((Object)this.taskMailbox.take(2)).isSameAs((Object)mailB);
        Assertions.assertThat((Optional)this.taskMailbox.tryTake(2)).isNotPresent();
        Assertions.assertThat((Object)this.taskMailbox.take(1)).isSameAs((Object)mailC);
        Assertions.assertThat((Object)this.taskMailbox.take(1)).isSameAs((Object)mailD);
        Assertions.assertThat((Optional)this.taskMailbox.tryTake(1)).isNotPresent();
    }

    @Test
    void testPutWithPriorityAndReadingFromMainMailbox() throws InterruptedException {
        Mail mailA = new Mail(MailboxExecutor.MailOptions.urgent(), () -> {}, 2, "mailA", new Object[0]);
        Mail mailB = new Mail(() -> {}, 2, "mailB", new Object[0]);
        Mail mailC = new Mail(() -> {}, 1, "mailC", new Object[0]);
        Mail mailD = new Mail(() -> {}, 1, "mailD", new Object[0]);
        this.taskMailbox.put(mailC);
        this.taskMailbox.put(mailB);
        this.taskMailbox.put(mailD);
        this.taskMailbox.put(mailA);
        Assertions.assertThat((Object)this.taskMailbox.take(0)).isSameAs((Object)mailA);
        Assertions.assertThat((Object)this.taskMailbox.take(0)).isSameAs((Object)mailC);
        Assertions.assertThat((Object)this.taskMailbox.take(0)).isSameAs((Object)mailB);
        Assertions.assertThat((Object)this.taskMailbox.take(0)).isSameAs((Object)mailD);
    }

    @Test
    void testBatchAndNonBatchTake() throws InterruptedException {
        List mails = IntStream.range(0, 6).mapToObj(i -> new Mail((ThrowingRunnable)NO_OP, 0, String.valueOf(i), new Object[0])).collect(Collectors.toList());
        mails.subList(0, 3).forEach(arg_0 -> ((TaskMailbox)this.taskMailbox).put(arg_0));
        Assertions.assertThat((boolean)this.taskMailbox.createBatch()).isTrue();
        mails.subList(3, 6).forEach(arg_0 -> ((TaskMailbox)this.taskMailbox).put(arg_0));
        Assertions.assertThat((Optional)this.taskMailbox.tryTakeFromBatch()).hasValue((Object)((Mail)mails.get(0)));
        Assertions.assertThat((Optional)this.taskMailbox.tryTake(0)).hasValue((Object)((Mail)mails.get(1)));
        Assertions.assertThat((Object)this.taskMailbox.take(0)).isEqualTo(mails.get(2));
        Assertions.assertThat((Optional)this.taskMailbox.tryTakeFromBatch()).isEmpty();
        Assertions.assertThat((Optional)this.taskMailbox.tryTake(0)).hasValue((Object)((Mail)mails.get(3)));
        Assertions.assertThat((Object)this.taskMailbox.take(0)).isEqualTo(mails.get(4));
        Assertions.assertThat((List)this.taskMailbox.close()).containsExactly((Object[])new Mail[]{(Mail)mails.get(5)});
    }

    @Test
    void testBatchDrain() {
        Mail mailA = new Mail(() -> {}, Integer.MAX_VALUE, "mailA", new Object[0]);
        Mail mailB = new Mail(() -> {}, Integer.MAX_VALUE, "mailB", new Object[0]);
        this.taskMailbox.put(mailA);
        Assertions.assertThat((boolean)this.taskMailbox.createBatch()).isTrue();
        this.taskMailbox.put(mailB);
        Assertions.assertThat((List)this.taskMailbox.drain()).containsExactly((Object[])new Mail[]{mailA, mailB});
    }

    @Test
    void testBatchPriority() throws Exception {
        Mail mailA = new Mail(() -> {}, 1, "mailA", new Object[0]);
        Mail mailB = new Mail(() -> {}, 2, "mailB", new Object[0]);
        this.taskMailbox.put(mailA);
        Assertions.assertThat((boolean)this.taskMailbox.createBatch()).isTrue();
        this.taskMailbox.put(mailB);
        Assertions.assertThat((Object)this.taskMailbox.take(2)).isEqualTo((Object)mailB);
        Assertions.assertThat((Optional)this.taskMailbox.tryTakeFromBatch()).hasValue((Object)mailA);
    }

    @Test
    void testRunExclusively() throws InterruptedException {
        CountDownLatch exclusiveCodeStarted = new CountDownLatch(1);
        int numMails = 10;
        new Thread(() -> this.taskMailbox.runExclusively(() -> {
            exclusiveCodeStarted.countDown();
            for (int index = 0; index < 10; ++index) {
                try {
                    this.taskMailbox.put(new Mail(() -> {}, 1, "mailD", new Object[0]));
                    Thread.sleep(1L);
                    continue;
                }
                catch (Exception exception) {
                    // empty catch block
                }
            }
        })).start();
        exclusiveCodeStarted.await();
        Assertions.assertThat((List)this.taskMailbox.close()).hasSize(10);
    }

    @ValueSource(booleans={true, false})
    @ParameterizedTest
    void testPutHighPriorityMailDuringTakingFromBatch(boolean isAsyncPut) throws Exception {
        Mail mailA = new Mail(() -> {}, 0, "mailA", new Object[0]);
        Mail mailB = new Mail(() -> {}, 0, "mailB", new Object[0]);
        Mail mailC = new Mail(MailboxExecutor.MailOptions.urgent(), () -> {}, 0, "mailC", new Object[0]);
        this.taskMailbox.put(mailA);
        this.taskMailbox.put(mailB);
        Assertions.assertThat((boolean)this.taskMailbox.createBatch()).isTrue();
        Assertions.assertThat((Optional)this.taskMailbox.tryTakeFromBatch()).hasValue((Object)mailA);
        this.putMail(mailC, isAsyncPut);
        Assertions.assertThat((Optional)this.taskMailbox.tryTakeFromBatch()).hasValue((Object)mailC);
        Assertions.assertThat((Optional)this.taskMailbox.tryTakeFromBatch()).hasValue((Object)mailB);
    }

    @ValueSource(booleans={true, false})
    @ParameterizedTest
    void testMailOrderWithBatch(boolean isAsyncPut) throws Exception {
        Mail mailA = new Mail(MailboxExecutor.MailOptions.urgent(), () -> {}, 0, "mailA", new Object[0]);
        Mail mailB = new Mail(MailboxExecutor.MailOptions.urgent(), () -> {}, 0, "mailB", new Object[0]);
        Mail mailC = new Mail(MailboxExecutor.MailOptions.urgent(), () -> {}, 0, "mailC", new Object[0]);
        Mail mailD = new Mail(MailboxExecutor.MailOptions.urgent(), () -> {}, 0, "mailD", new Object[0]);
        Mail mailE = new Mail(MailboxExecutor.MailOptions.urgent(), () -> {}, 0, "mailE", new Object[0]);
        Mail mailF = new Mail(MailboxExecutor.MailOptions.urgent(), () -> {}, 0, "mailF", new Object[0]);
        Mail mailG = new Mail(MailboxExecutor.MailOptions.urgent(), () -> {}, 0, "mailG", new Object[0]);
        Mail mail1 = new Mail(() -> {}, 0, "mail1", new Object[0]);
        Mail mail2 = new Mail(() -> {}, 0, "mail2", new Object[0]);
        Mail mail3 = new Mail(() -> {}, 0, "mail3", new Object[0]);
        Mail mail4 = new Mail(() -> {}, 0, "mail4", new Object[0]);
        Mail mail5 = new Mail(() -> {}, 0, "mail5", new Object[0]);
        Mail mail6 = new Mail(() -> {}, 0, "mail6", new Object[0]);
        Mail mail7 = new Mail(() -> {}, 0, "mail7", new Object[0]);
        this.putMail(mail1, isAsyncPut);
        this.putMail(mail2, isAsyncPut);
        this.putMail(mail3, isAsyncPut);
        this.putMail(mail4, isAsyncPut);
        Assertions.assertThat((boolean)this.taskMailbox.createBatch()).isTrue();
        Assertions.assertThat((Optional)this.taskMailbox.tryTakeFromBatch()).hasValue((Object)mail1);
        this.putMail(mailA, isAsyncPut);
        this.putMail(mailB, isAsyncPut);
        this.putMail(mailC, isAsyncPut);
        Assertions.assertThat((Optional)this.taskMailbox.tryTakeFromBatch()).hasValue((Object)mailA);
        Assertions.assertThat((Optional)this.taskMailbox.tryTake(0)).hasValue((Object)mailB);
        Assertions.assertThat((Object)this.taskMailbox.take(0)).isEqualTo((Object)mailC);
        Assertions.assertThat((Optional)this.taskMailbox.tryTakeFromBatch()).hasValue((Object)mail2);
        this.putMail(mail5, isAsyncPut);
        this.putMail(mail6, isAsyncPut);
        this.putMail(mail7, isAsyncPut);
        Assertions.assertThat((Optional)this.taskMailbox.tryTakeFromBatch()).hasValue((Object)mail3);
        this.putMail(mailD, isAsyncPut);
        this.putMail(mailE, isAsyncPut);
        Assertions.assertThat((boolean)this.taskMailbox.createBatch()).isTrue();
        this.putMail(mailF, isAsyncPut);
        this.putMail(mailG, isAsyncPut);
        Assertions.assertThat((Optional)this.taskMailbox.tryTakeFromBatch()).hasValue((Object)mailD);
        Assertions.assertThat((Optional)this.taskMailbox.tryTake(0)).hasValue((Object)mailE);
        Assertions.assertThat((Object)this.taskMailbox.take(0)).isEqualTo((Object)mailF);
        Assertions.assertThat((Optional)this.taskMailbox.tryTakeFromBatch()).hasValue((Object)mailG);
        Assertions.assertThat((Optional)this.taskMailbox.tryTakeFromBatch()).hasValue((Object)mail4);
        Assertions.assertThat((Optional)this.taskMailbox.tryTake(0)).hasValue((Object)mail5);
        Assertions.assertThat((Object)this.taskMailbox.take(0)).isEqualTo((Object)mail6);
        Assertions.assertThat((Optional)this.taskMailbox.tryTake(0)).hasValue((Object)mail7);
    }

    @ValueSource(booleans={true, false})
    @ParameterizedTest
    void testOnlyOneUrgentMailAtTheSameTime(boolean isAsyncPut) throws Exception {
        Mail mailA = new Mail(MailboxExecutor.MailOptions.urgent(), () -> {}, 0, "mailA", new Object[0]);
        Mail mailB = new Mail(MailboxExecutor.MailOptions.urgent(), () -> {}, 0, "mailB", new Object[0]);
        Mail mailC = new Mail(MailboxExecutor.MailOptions.urgent(), () -> {}, 0, "mailC", new Object[0]);
        Mail mail1 = new Mail(() -> {}, 0, "mail1", new Object[0]);
        Mail mail2 = new Mail(() -> {}, 0, "mail2", new Object[0]);
        Mail mail3 = new Mail(() -> {}, 0, "mail3", new Object[0]);
        Mail mail4 = new Mail(() -> {}, 0, "mail4", new Object[0]);
        this.putMail(mail1, isAsyncPut);
        this.putMail(mail2, isAsyncPut);
        this.putMail(mail3, isAsyncPut);
        Assertions.assertThat((Optional)this.taskMailbox.tryTakeFromBatch()).isEmpty();
        Assertions.assertThat((boolean)this.taskMailbox.createBatch()).isTrue();
        Assertions.assertThat((Optional)this.taskMailbox.tryTakeFromBatch()).hasValue((Object)mail1);
        this.putMail(mailA, isAsyncPut);
        Assertions.assertThat((Optional)this.taskMailbox.tryTakeFromBatch()).hasValue((Object)mailA);
        this.putMail(mailB, isAsyncPut);
        Assertions.assertThat((Optional)this.taskMailbox.tryTake(0)).hasValue((Object)mailB);
        this.putMail(mailC, isAsyncPut);
        Assertions.assertThat((Object)this.taskMailbox.take(0)).isEqualTo((Object)mailC);
        this.putMail(mail4, isAsyncPut);
        Assertions.assertThat((Optional)this.taskMailbox.tryTakeFromBatch()).hasValue((Object)mail2);
        Assertions.assertThat((Optional)this.taskMailbox.tryTakeFromBatch()).hasValue((Object)mail3);
        Assertions.assertThat((Optional)this.taskMailbox.tryTakeFromBatch()).isEmpty();
        Assertions.assertThat((Optional)this.taskMailbox.tryTake(0)).hasValue((Object)mail4);
    }

    private void putMail(Mail mail, boolean isAsyncPut) throws Exception {
        if (isAsyncPut) {
            Thread thread = new Thread(() -> this.taskMailbox.put(mail));
            thread.start();
            thread.join();
        } else {
            this.taskMailbox.put(mail);
        }
    }
}

