package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;

import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Supplier;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder;
import org.apache.flink.runtime.io.network.partition.consumer.TestInputChannel;
import org.apache.flink.util.ExceptionUtils;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyConnectionReaderTest.class */
class NettyConnectionReaderTest {
    private static final int INPUT_CHANNEL_INDEX = 0;
    private CompletableFuture<Integer> requiredSegmentIdFuture;

    NettyConnectionReaderTest() {
    }

    @BeforeEach
    void before() {
        this.requiredSegmentIdFuture = new CompletableFuture<>();
    }

    @Test
    void testReadBuffer() {
        Optional readBuffer = createNettyConnectionReader(createInputChannelSupplier(1, this.requiredSegmentIdFuture)).readBuffer(0);
        Assertions.assertThat(readBuffer).isPresent();
        Assertions.assertThat(((Buffer) readBuffer.get()).isBuffer()).isTrue();
        Assertions.assertThat(this.requiredSegmentIdFuture).isNotDone();
    }

    @Test
    void testReadEmptyBuffer() {
        Assertions.assertThat(createNettyConnectionReader(createInputChannelSupplier(0, this.requiredSegmentIdFuture)).readBuffer(0)).isNotPresent();
        Assertions.assertThat(this.requiredSegmentIdFuture).isNotDone();
    }

    @Test
    void testReadDifferentSegments() throws ExecutionException, InterruptedException {
        NettyConnectionReader createNettyConnectionReader = createNettyConnectionReader(createInputChannelSupplier(0, this.requiredSegmentIdFuture));
        createNettyConnectionReader.readBuffer(0);
        Assertions.assertThat(this.requiredSegmentIdFuture).isNotDone();
        createNettyConnectionReader.readBuffer(1);
        Assertions.assertThat(this.requiredSegmentIdFuture.get()).isEqualTo(1);
    }

    private static Supplier<InputChannel> createInputChannelSupplier(int i, CompletableFuture<Integer> completableFuture) {
        TestInputChannel testInputChannel = new TestInputChannel(new SingleInputGateBuilder().build(), 0, completableFuture);
        int i2 = 0;
        while (i2 < i) {
            try {
                testInputChannel.read(new NetworkBuffer(MemorySegmentFactory.allocateUnpooledSegment(0), FreeingBufferRecycler.INSTANCE, Buffer.DataType.DATA_BUFFER), i2 == i - 1 ? Buffer.DataType.NONE : Buffer.DataType.DATA_BUFFER);
                i2++;
            } catch (IOException | InterruptedException e) {
                ExceptionUtils.rethrow(e, "Failed to create test input channel.");
            }
        }
        return () -> {
            return testInputChannel;
        };
    }

    private static NettyConnectionReader createNettyConnectionReader(Supplier<InputChannel> supplier) {
        return new NettyConnectionReaderImpl(supplier);
    }
}
