package org.apache.flink.runtime.io.network.partition.hybrid.tiered.file;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutorService;
import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool;
import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageIdMappingUtils;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.PartitionFileReader;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.TestingPartitionFileReader;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.NettyConnectionId;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.TestingNettyConnectionWriter;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.disk.DiskIOScheduler;
import org.apache.flink.util.ExceptionUtils;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/DiskIOSchedulerTest.class */
class DiskIOSchedulerTest {
    private static final int BUFFER_POOL_SIZE = 1;
    private static final int DEFAULT_MAX_READ_AHEAD = 5;
    private BatchShuffleReadBufferPool bufferPool;
    private ManuallyTriggeredScheduledExecutorService ioExecutor;
    private CompletableFuture<Integer> segmentIdFuture;
    private CompletableFuture<Void> readerReleaseFuture;
    private DiskIOScheduler diskIOScheduler;
    private List<Map<Integer, Integer>> firstBufferIndexInSegment;
    private static final TieredStoragePartitionId DEFAULT_PARTITION_ID = TieredStorageIdMappingUtils.convertId(new ResultPartitionID());
    private static final TieredStorageSubpartitionId DEFAULT_SUBPARTITION_ID = new TieredStorageSubpartitionId(0);
    private static final Duration DEFAULT_BUFFER_REQUEST_TIMEOUT = Duration.ofMinutes(5);

    DiskIOSchedulerTest() {
    }

    @BeforeEach
    void before() {
        this.ioExecutor = new ManuallyTriggeredScheduledExecutorService();
        this.bufferPool = new BatchShuffleReadBufferPool(1L, 1);
        this.bufferPool.initialize();
        this.segmentIdFuture = new CompletableFuture<>();
        this.readerReleaseFuture = new CompletableFuture<>();
        this.firstBufferIndexInSegment = createFirstBufferIndexInSegment();
        this.diskIOScheduler = new DiskIOScheduler(DEFAULT_PARTITION_ID, this.bufferPool, this.ioExecutor, 1, DEFAULT_BUFFER_REQUEST_TIMEOUT, DEFAULT_MAX_READ_AHEAD, (num, num2) -> {
            return this.firstBufferIndexInSegment.get(num.intValue()).get(num2);
        }, new TestingPartitionFileReader.Builder().setReadBufferSupplier((num3, num4) -> {
            this.segmentIdFuture.complete(num4);
            return new PartitionFileReader.ReadBufferResult(Collections.singletonList(BufferBuilderTestUtils.buildSomeBuffer(0)), true, (PartitionFileReader.ReadProgress) null);
        }).setReleaseNotifier(() -> {
            this.readerReleaseFuture.complete(null);
        }).setPrioritySupplier(num5 -> {
            return Long.valueOf(num5.intValue());
        }).build());
    }

    @AfterEach
    void after() {
        this.bufferPool.destroy();
    }

    @Test
    void testConnectionEstablished() {
        CompletableFuture completableFuture = new CompletableFuture();
        this.diskIOScheduler.connectionEstablished(DEFAULT_SUBPARTITION_ID, new TestingNettyConnectionWriter.Builder().setWriteBufferFunction(nettyPayload -> {
            if (nettyPayload.getSegmentId() != -1) {
                return null;
            }
            completableFuture.complete(nettyPayload);
            return null;
        }).build());
        Assertions.assertThat(this.segmentIdFuture).isNotDone();
        Assertions.assertThat(completableFuture).isNotDone();
        this.ioExecutor.trigger();
        Assertions.assertThat(this.segmentIdFuture).isCompletedWithValue(0);
        Assertions.assertThat(completableFuture).isDone();
    }

    @Test
    void testSequenceReading() {
        CompletableFuture completableFuture = new CompletableFuture();
        CompletableFuture completableFuture2 = new CompletableFuture();
        TestingNettyConnectionWriter build = new TestingNettyConnectionWriter.Builder().setWriteBufferFunction(nettyPayload -> {
            if (nettyPayload.getSegmentId() != -1) {
                return null;
            }
            completableFuture.complete(nettyPayload);
            return null;
        }).build();
        this.diskIOScheduler.connectionEstablished(new TieredStorageSubpartitionId(1), new TestingNettyConnectionWriter.Builder().setWriteBufferFunction(nettyPayload2 -> {
            if (nettyPayload2.getSegmentId() != -1) {
                return null;
            }
            completableFuture2.complete(nettyPayload2);
            return null;
        }).build());
        this.diskIOScheduler.connectionEstablished(new TieredStorageSubpartitionId(0), build);
        Assertions.assertThat(completableFuture).isNotDone();
        Assertions.assertThat(completableFuture2).isNotDone();
        this.ioExecutor.trigger();
        Assertions.assertThat(completableFuture).isDone();
        Assertions.assertThat(completableFuture2).isNotDone();
    }

    @Test
    void testConnectionBroken() {
        CompletableFuture completableFuture = new CompletableFuture();
        NettyConnectionId newId = NettyConnectionId.newId();
        this.diskIOScheduler.connectionEstablished(DEFAULT_SUBPARTITION_ID, new TestingNettyConnectionWriter.Builder().setWriteBufferFunction(nettyPayload -> {
            if (nettyPayload.getSegmentId() != -1) {
                return null;
            }
            completableFuture.complete(nettyPayload);
            return null;
        }).setNettyConnectionIdSupplier(() -> {
            return newId;
        }).build());
        this.diskIOScheduler.connectionBroken(newId);
        this.ioExecutor.trigger();
        Assertions.assertThat(this.segmentIdFuture).isNotDone();
        Assertions.assertThat(completableFuture).isNotDone();
    }

    @Test
    void testRelease() {
        CompletableFuture completableFuture = new CompletableFuture();
        NettyConnectionId newId = NettyConnectionId.newId();
        TestingNettyConnectionWriter build = new TestingNettyConnectionWriter.Builder().setWriteBufferFunction(nettyPayload -> {
            completableFuture.complete(nettyPayload);
            return null;
        }).setNettyConnectionIdSupplier(() -> {
            return newId;
        }).build();
        this.diskIOScheduler.connectionEstablished(DEFAULT_SUBPARTITION_ID, build);
        this.diskIOScheduler.release();
        Assertions.assertThat(this.readerReleaseFuture).isDone();
        Assertions.assertThatThrownBy(() -> {
            this.diskIOScheduler.connectionEstablished(DEFAULT_SUBPARTITION_ID, build);
        }).isInstanceOf(IllegalStateException.class);
    }

    @Test
    void testDeadLock() {
        final CompletableFuture completableFuture = new CompletableFuture();
        final CompletableFuture completableFuture2 = new CompletableFuture();
        TestingNettyConnectionWriter build = new TestingNettyConnectionWriter.Builder().setWriteBufferFunction(nettyPayload -> {
            try {
                completableFuture2.complete(null);
                completableFuture.get();
                return null;
            } catch (InterruptedException | ExecutionException e) {
                ExceptionUtils.rethrow(e);
                return null;
            }
        }).build();
        new CheckedThread() { // from class: org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.DiskIOSchedulerTest.1
            public void go() throws Exception {
                completableFuture2.get();
                DiskIOSchedulerTest.this.diskIOScheduler.release();
                completableFuture.complete(null);
            }
        }.start();
        this.diskIOScheduler.connectionEstablished(new TieredStorageSubpartitionId(0), build);
        this.ioExecutor.trigger();
        Assertions.assertThat(completableFuture).isDone();
        Assertions.assertThat(completableFuture2).isDone();
    }

    private List<Map<Integer, Integer>> createFirstBufferIndexInSegment() {
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        hashMap.put(0, 0);
        hashMap2.put(0, 0);
        ArrayList arrayList = new ArrayList();
        arrayList.add(hashMap);
        arrayList.add(hashMap2);
        return arrayList;
    }
}
