package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.TestingNettyServiceProducer;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TieredStorageResultSubpartitionViewTest.class */
public class TieredStorageResultSubpartitionViewTest {
    private static final int TIER_NUMBER = 2;
    private CompletableFuture<Void> availabilityListener;
    private List<NettyPayloadManager> nettyPayloadManagers;
    private List<CompletableFuture<NettyConnectionId>> connectionBrokenConsumers;
    private TieredStorageResultSubpartitionView tieredStorageResultSubpartitionView;

    @BeforeEach
    void before() {
        this.availabilityListener = new CompletableFuture<>();
        this.nettyPayloadManagers = createNettyPayloadManagers();
        this.connectionBrokenConsumers = Arrays.asList(new CompletableFuture(), new CompletableFuture());
        this.tieredStorageResultSubpartitionView = new TieredStorageResultSubpartitionView(createBufferAvailabilityListener(this.availabilityListener), this.nettyPayloadManagers, createNettyConnectionIds(), createNettyServiceProducers(this.connectionBrokenConsumers));
    }

    @Test
    void testGetNextBuffer() throws IOException {
        checkBufferAndBacklog(this.tieredStorageResultSubpartitionView.getNextBuffer(), 0);
        this.tieredStorageResultSubpartitionView.notifyRequiredSegmentId(1);
        Assertions.assertThat(this.availabilityListener).isDone();
        checkBufferAndBacklog(this.tieredStorageResultSubpartitionView.getNextBuffer(), 0);
        Assertions.assertThat(this.tieredStorageResultSubpartitionView.getNextBuffer()).isNull();
    }

    @Test
    void testGetNextBufferFailed() {
        IOException iOException = new IOException();
        this.nettyPayloadManagers = createNettyPayloadQueuesWithError(iOException);
        this.tieredStorageResultSubpartitionView = new TieredStorageResultSubpartitionView(createBufferAvailabilityListener(this.availabilityListener), this.nettyPayloadManagers, createNettyConnectionIds(), createNettyServiceProducers(this.connectionBrokenConsumers));
        TieredStorageResultSubpartitionView tieredStorageResultSubpartitionView = this.tieredStorageResultSubpartitionView;
        tieredStorageResultSubpartitionView.getClass();
        Assertions.assertThatThrownBy(tieredStorageResultSubpartitionView::getNextBuffer).hasCause(iOException);
        Assertions.assertThat(this.connectionBrokenConsumers.get(0)).isDone();
    }

    @Test
    void testGetAvailabilityAndBacklog() {
        ResultSubpartitionView.AvailabilityWithBacklog availabilityAndBacklog = this.tieredStorageResultSubpartitionView.getAvailabilityAndBacklog(0);
        Assertions.assertThat(availabilityAndBacklog.getBacklog()).isEqualTo(1);
        Assertions.assertThat(availabilityAndBacklog.isAvailable()).isEqualTo(false);
        ResultSubpartitionView.AvailabilityWithBacklog availabilityAndBacklog2 = this.tieredStorageResultSubpartitionView.getAvailabilityAndBacklog(2);
        Assertions.assertThat(availabilityAndBacklog2.getBacklog()).isEqualTo(1);
        Assertions.assertThat(availabilityAndBacklog2.isAvailable()).isEqualTo(true);
    }

    @Test
    void testNotifyRequiredSegmentId() {
        this.tieredStorageResultSubpartitionView.notifyRequiredSegmentId(1);
        Assertions.assertThat(this.availabilityListener).isDone();
    }

    @Test
    void testReleaseAllResources() throws IOException {
        this.tieredStorageResultSubpartitionView.releaseAllResources();
        Assertions.assertThat(this.nettyPayloadManagers.get(0).getBacklog()).isZero();
        Assertions.assertThat(this.nettyPayloadManagers.get(1).getBacklog()).isZero();
        Assertions.assertThat(this.connectionBrokenConsumers.get(0)).isDone();
        Assertions.assertThat(this.connectionBrokenConsumers.get(1)).isDone();
        Assertions.assertThat(this.tieredStorageResultSubpartitionView.isReleased()).isTrue();
    }

    @Test
    void testGetNumberOfQueuedBuffers() {
        Assertions.assertThat(this.tieredStorageResultSubpartitionView.getNumberOfQueuedBuffers()).isEqualTo(1);
        Assertions.assertThat(this.tieredStorageResultSubpartitionView.unsynchronizedGetNumberOfQueuedBuffers()).isEqualTo(1);
    }

    private static void checkBufferAndBacklog(ResultSubpartition.BufferAndBacklog bufferAndBacklog, int i) {
        Assertions.assertThat(bufferAndBacklog).isNotNull();
        Assertions.assertThat(bufferAndBacklog.buffer()).isNotNull();
        Assertions.assertThat(bufferAndBacklog.buffersInBacklog()).isEqualTo(i);
    }

    private static BufferAvailabilityListener createBufferAvailabilityListener(CompletableFuture<Void> completableFuture) {
        return () -> {
            completableFuture.complete(null);
        };
    }

    private static List<NettyPayloadManager> createNettyPayloadManagers() {
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 2; i++) {
            NettyPayloadManager nettyPayloadManager = new NettyPayloadManager();
            nettyPayloadManager.add(NettyPayload.newSegment(i));
            nettyPayloadManager.add(NettyPayload.newBuffer(BufferBuilderTestUtils.buildSomeBuffer(0), 0, i));
            nettyPayloadManager.add(NettyPayload.newBuffer(new NetworkBuffer(MemorySegmentFactory.allocateUnpooledSegment(0), FreeingBufferRecycler.INSTANCE, Buffer.DataType.END_OF_SEGMENT), 1, i));
            arrayList.add(nettyPayloadManager);
        }
        return arrayList;
    }

    private static List<NettyPayloadManager> createNettyPayloadQueuesWithError(Throwable th) {
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 2; i++) {
            NettyPayloadManager nettyPayloadManager = new NettyPayloadManager();
            nettyPayloadManager.add(NettyPayload.newSegment(i));
            nettyPayloadManager.add(NettyPayload.newError(th));
            arrayList.add(nettyPayloadManager);
        }
        return arrayList;
    }

    private static List<NettyConnectionId> createNettyConnectionIds() {
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 2; i++) {
            arrayList.add(NettyConnectionId.newId());
        }
        return arrayList;
    }

    private static List<NettyServiceProducer> createNettyServiceProducers(List<CompletableFuture<NettyConnectionId>> list) {
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < list.size(); i++) {
            int i2 = i;
            arrayList.add(new TestingNettyServiceProducer.Builder().setConnectionBrokenConsumer(nettyConnectionId -> {
                ((CompletableFuture) list.get(i2)).complete(nettyConnectionId);
            }).build());
        }
        return arrayList;
    }
}
