package org.apache.flink.runtime.io.disk.iomanager;

import java.io.IOException;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.io.network.util.TestNotificationListener;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriterTest.class */
class AsynchronousBufferFileWriterTest {
    private static final IOManager ioManager = new IOManagerAsync();
    private static final Buffer mockBuffer = (Buffer) Mockito.mock(Buffer.class);
    private AsynchronousBufferFileWriter writer;

    AsynchronousBufferFileWriterTest() {
    }

    @AfterAll
    static void shutdown() throws Exception {
        ioManager.close();
    }

    @BeforeEach
    void setUp() throws IOException {
        this.writer = new AsynchronousBufferFileWriter(ioManager.createChannel(), new RequestQueue());
    }

    @Test
    void testAddAndHandleRequest() throws Exception {
        addRequest();
        Assertions.assertThat(this.writer.getNumberOfOutstandingRequests()).withFailMessage("Didn't increment number of outstanding requests.", new Object[0]).isOne();
        handleRequest();
        Assertions.assertThat(this.writer.getNumberOfOutstandingRequests()).withFailMessage("Didn't decrement number of outstanding requests.", new Object[0]).isZero();
    }

    @Test
    void testAddWithFailingWriter() throws Exception {
        AsynchronousBufferFileWriter asynchronousBufferFileWriter = new AsynchronousBufferFileWriter(ioManager.createChannel(), new RequestQueue());
        asynchronousBufferFileWriter.close();
        NetworkBuffer networkBuffer = new NetworkBuffer(MemorySegmentFactory.allocateUnpooledSegment(4096), FreeingBufferRecycler.INSTANCE);
        Assertions.assertThatThrownBy(() -> {
            asynchronousBufferFileWriter.writeBlock(networkBuffer);
        }).isInstanceOf(IOException.class);
        if (!networkBuffer.isRecycled()) {
            networkBuffer.recycleBuffer();
            Assertions.fail("buffer not recycled");
        }
        Assertions.assertThat(asynchronousBufferFileWriter.getNumberOfOutstandingRequests()).withFailMessage("Shouldn't increment number of outstanding requests.", new Object[0]).isZero();
    }

    @Test
    void testSubscribe() throws Exception {
        TestNotificationListener testNotificationListener = new TestNotificationListener();
        Assertions.assertThat(this.writer.registerAllRequestsProcessedListener(testNotificationListener)).withFailMessage("Allowed to subscribe w/o any outstanding requests.", new Object[0]).isFalse();
        addRequest();
        Assertions.assertThat(this.writer.registerAllRequestsProcessedListener(testNotificationListener)).withFailMessage("Didn't allow to subscribe.", new Object[0]).isTrue();
        handleRequest();
        Assertions.assertThat(testNotificationListener.getNumberOfNotifications()).withFailMessage("Listener was not notified.", new Object[0]).isOne();
    }

    @Test
    void testSubscribeAndClose() throws Exception {
        TestNotificationListener testNotificationListener = new TestNotificationListener();
        addRequest();
        addRequest();
        this.writer.registerAllRequestsProcessedListener(testNotificationListener);
        CheckedThread checkedThread = new CheckedThread() { // from class: org.apache.flink.runtime.io.disk.iomanager.AsynchronousBufferFileWriterTest.1
            public void go() throws Exception {
                AsynchronousBufferFileWriterTest.this.writer.close();
            }
        };
        checkedThread.start();
        handleRequest();
        handleRequest();
        checkedThread.sync();
        Assertions.assertThat(testNotificationListener.getNumberOfNotifications()).withFailMessage("Listener was not notified.", new Object[0]).isOne();
    }

    @Test
    void testConcurrentSubscribeAndHandleRequest() throws Exception {
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(2);
        TestNotificationListener testNotificationListener = new TestNotificationListener();
        Callable callable = () -> {
            return Boolean.valueOf(this.writer.registerAllRequestsProcessedListener(testNotificationListener));
        };
        Callable callable2 = () -> {
            handleRequest();
            return null;
        };
        for (int i = 0; i < 50000; i++) {
            try {
                testNotificationListener.reset();
                addRequest();
                Future submit = newFixedThreadPool.submit(callable2);
                Future submit2 = newFixedThreadPool.submit(callable);
                submit.get();
                boolean booleanValue = ((Boolean) submit2.get()).booleanValue();
                Assertions.assertThat(testNotificationListener.getNumberOfNotifications()).withFailMessage(booleanValue ? "Race: Successfully subscribed, but was never notified." : "Race: Never subscribed successfully, but was notified.", new Object[0]).isEqualTo(booleanValue ? 1 : 0);
            } finally {
                newFixedThreadPool.shutdownNow();
            }
        }
    }

    private void addRequest() throws IOException {
        this.writer.writeBlock(mockBuffer);
    }

    private void handleRequest() {
        this.writer.handleProcessedBuffer(mockBuffer, (IOException) null);
    }
}
