package org.apache.flink.runtime.io.disk.iomanager;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.HashBufferAccumulatorTest;
import org.apache.flink.runtime.util.event.NotificationListener;
import org.apache.flink.util.IOUtils;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterFileSegmentReaderTest.class */
public class BufferFileWriterFileSegmentReaderTest {
    private static final int BUFFER_SIZE = 32768;
    private static final BufferRecycler BUFFER_RECYCLER = FreeingBufferRecycler.INSTANCE;
    private static final Random random = new Random();
    private static final IOManager ioManager = new IOManagerAsync();
    private BufferFileWriter writer;
    private AsynchronousBufferFileSegmentReader reader;
    private LinkedBlockingQueue<FileSegment> returnedFileSegments = new LinkedBlockingQueue<>();

    @AfterClass
    public static void shutdown() throws Exception {
        ioManager.close();
    }

    @Before
    public void setUpWriterAndReader() {
        FileIOChannel.ID createChannel = ioManager.createChannel();
        try {
            this.writer = ioManager.createBufferFileWriter(createChannel);
            this.reader = ioManager.createBufferFileSegmentReader(createChannel, new QueuingCallback(this.returnedFileSegments));
        } catch (IOException e) {
            tearDownWriterAndReader();
            Assert.fail("Failed to setup writer and reader.");
        }
    }

    @After
    public void tearDownWriterAndReader() {
        if (this.writer != null) {
            if (!this.writer.isClosed()) {
                IOUtils.closeQuietly(() -> {
                    this.writer.close();
                });
            }
            this.writer.deleteChannel();
        }
        if (this.reader != null) {
            if (!this.reader.isClosed()) {
                IOUtils.closeQuietly(() -> {
                    this.reader.close();
                });
            }
            this.reader.deleteChannel();
        }
        this.returnedFileSegments.clear();
    }

    @Test
    public void testWriteRead() throws IOException, InterruptedException {
        int i = 0;
        for (int i2 = 0; i2 < 1024; i2++) {
            Buffer createBuffer = createBuffer();
            i = BufferFileWriterReaderTest.fillBufferWithAscendingNumbers(createBuffer, i, getNextMultipleOf(getRandomNumberInRange(8192, 32768), 4));
            this.writer.writeBlock(createBuffer);
        }
        this.writer.close();
        for (int i3 = 0; i3 < 1024; i3++) {
            Assert.assertFalse(this.reader.hasReachedEndOfFile());
            this.reader.read();
        }
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        if (this.reader.registerAllRequestsProcessedListener(new NotificationListener() { // from class: org.apache.flink.runtime.io.disk.iomanager.BufferFileWriterFileSegmentReaderTest.1
            public void onNotification() {
                countDownLatch.countDown();
            }
        })) {
            countDownLatch.await();
        }
        Assert.assertTrue(this.reader.hasReachedEndOfFile());
        Assert.assertEquals("Read less buffers than written.", HashBufferAccumulatorTest.NETWORK_BUFFER_SIZE, this.returnedFileSegments.size());
        int i4 = 0;
        ByteBuffer allocate = ByteBuffer.allocate(32768);
        while (true) {
            FileSegment poll = this.returnedFileSegments.poll();
            if (poll == null) {
                this.reader.close();
                return;
            }
            allocate.position(0);
            allocate.limit(poll.getLength());
            poll.getFileChannel().read(allocate, poll.getPosition());
            NetworkBuffer networkBuffer = new NetworkBuffer(MemorySegmentFactory.wrap(allocate.array()), BUFFER_RECYCLER);
            networkBuffer.setSize(poll.getLength());
            i4 = BufferFileWriterReaderTest.verifyBufferFilledWithAscendingNumbers(networkBuffer, i4);
        }
    }

    private int getRandomNumberInRange(int i, int i2) {
        return random.nextInt((i2 - i) + 1) + i;
    }

    private int getNextMultipleOf(int i, int i2) {
        int i3 = i % i2;
        return i3 == 0 ? i : (i + i2) - i3;
    }

    private Buffer createBuffer() {
        return new NetworkBuffer(MemorySegmentFactory.allocateUnpooledSegment(32768), BUFFER_RECYCLER);
    }
}
