package org.apache.flink.runtime.io.network.partition.hybrid;

import java.nio.ByteBuffer;
import java.nio.file.Path;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.NoOpBufferAvailablityListener;
import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingStrategy;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.io.TempDir;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionViewTest.class */
class HsSubpartitionViewTest {
    HsSubpartitionViewTest() {
    }

    @Test
    void testGetNextBufferFromDisk() {
        HsSubpartitionConsumer createSubpartitionView = createSubpartitionView();
        ResultSubpartition.BufferAndBacklog createBufferAndBacklog = createBufferAndBacklog(1, Buffer.DataType.DATA_BUFFER, 0);
        CompletableFuture completableFuture = new CompletableFuture();
        TestingHsDataView build = TestingHsDataView.builder().setConsumeBufferFunction(num -> {
            return Optional.of(createBufferAndBacklog);
        }).build();
        TestingHsDataView build2 = TestingHsDataView.builder().setConsumeBufferFunction(num2 -> {
            completableFuture.complete(null);
            return Optional.empty();
        }).build();
        createSubpartitionView.setDiskDataView(build);
        createSubpartitionView.setMemoryDataView(build2);
        ResultSubpartition.BufferAndBacklog nextBuffer = createSubpartitionView.getNextBuffer();
        Assertions.assertThat(completableFuture).isNotCompleted();
        Assertions.assertThat(nextBuffer).isSameAs(createBufferAndBacklog);
    }

    @Timeout(60)
    @Test
    void testDeadLock(@TempDir Path path) throws Exception {
        BufferPool createBufferPool = new NetworkBufferPool(10, 16).createBufferPool(10, 10);
        final HsSubpartitionConsumer createSubpartitionView = createSubpartitionView();
        final CompletableFuture completableFuture = new CompletableFuture();
        CheckedThread checkedThread = new CheckedThread() { // from class: org.apache.flink.runtime.io.network.partition.hybrid.HsSubpartitionViewTest.1
            public void go() throws Exception {
                completableFuture.get();
                createSubpartitionView.getNextBuffer();
            }
        };
        HsMemoryDataManager hsMemoryDataManager = new HsMemoryDataManager(1, 16, createBufferPool, TestingSpillingStrategy.builder().setOnMemoryUsageChangedFunction((num, num2) -> {
            return Optional.empty();
        }).setDecideActionWithGlobalInfoFunction(hsSpillingInfoProvider -> {
            completableFuture.complete(null);
            try {
                checkedThread.trySync(10L);
                hsSpillingInfoProvider.getNextBufferIndexToConsume(HsConsumerId.DEFAULT);
                return HsSpillingStrategy.Decision.NO_ACTION;
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }).build(), new HsFileDataIndexImpl(1, path.resolve(".index"), 256, Long.MAX_VALUE), path.resolve(".data"), (BufferCompressor) null, 0L);
        hsMemoryDataManager.setOutputMetrics(HybridShuffleTestUtils.createTestingOutputMetrics());
        createSubpartitionView.setMemoryDataView(hsMemoryDataManager.registerNewConsumer(0, HsConsumerId.DEFAULT, createSubpartitionView));
        createSubpartitionView.setDiskDataView(TestingHsDataView.NO_OP);
        checkedThread.start();
        hsMemoryDataManager.append(ByteBuffer.allocate(16), 0, Buffer.DataType.DATA_BUFFER);
    }

    @Test
    void testGetNextBufferFromDiskNextDataTypeIsNone() {
        HsSubpartitionConsumer createSubpartitionView = createSubpartitionView();
        ResultSubpartition.BufferAndBacklog createBufferAndBacklog = createBufferAndBacklog(0, Buffer.DataType.NONE, 0);
        TestingHsDataView build = TestingHsDataView.builder().setConsumeBufferFunction(num -> {
            return Optional.of(createBufferAndBacklog);
        }).build();
        TestingHsDataView build2 = TestingHsDataView.builder().setPeekNextToConsumeDataTypeFunction(num2 -> {
            Assertions.assertThat(num2).isEqualTo(1);
            return Buffer.DataType.EVENT_BUFFER;
        }).build();
        createSubpartitionView.setDiskDataView(build);
        createSubpartitionView.setMemoryDataView(build2);
        ResultSubpartition.BufferAndBacklog nextBuffer = createSubpartitionView.getNextBuffer();
        Assertions.assertThat(nextBuffer).isNotNull();
        Assertions.assertThat(nextBuffer.buffer()).isSameAs(createBufferAndBacklog.buffer());
        Assertions.assertThat(nextBuffer.buffersInBacklog()).isEqualTo(createBufferAndBacklog.buffersInBacklog());
        Assertions.assertThat(nextBuffer.getSequenceNumber()).isEqualTo(createBufferAndBacklog.getSequenceNumber());
        Assertions.assertThat(nextBuffer.getNextDataType()).isEqualTo(Buffer.DataType.EVENT_BUFFER);
    }

    @Test
    void testGetNextBufferFromMemory() {
        HsSubpartitionConsumer createSubpartitionView = createSubpartitionView();
        ResultSubpartition.BufferAndBacklog createBufferAndBacklog = createBufferAndBacklog(1, Buffer.DataType.DATA_BUFFER, 0);
        TestingHsDataView build = TestingHsDataView.builder().setConsumeBufferFunction(num -> {
            return Optional.of(createBufferAndBacklog);
        }).build();
        createSubpartitionView.setDiskDataView(TestingHsDataView.builder().setConsumeBufferFunction(num2 -> {
            return Optional.empty();
        }).build());
        createSubpartitionView.setMemoryDataView(build);
        Assertions.assertThat(createSubpartitionView.getNextBuffer()).isSameAs(createBufferAndBacklog);
    }

    @Test
    void testGetNextBufferThrowException() {
        HsSubpartitionConsumer createSubpartitionView = createSubpartitionView();
        createSubpartitionView.setDiskDataView(TestingHsDataView.builder().setConsumeBufferFunction(num -> {
            throw new RuntimeException("expected exception.");
        }).build());
        createSubpartitionView.setMemoryDataView(TestingHsDataView.NO_OP);
        createSubpartitionView.getNextBuffer();
        Assertions.assertThat(createSubpartitionView.getFailureCause()).isInstanceOf(RuntimeException.class).hasMessageContaining("expected exception.");
        Assertions.assertThat(createSubpartitionView.isReleased()).isTrue();
    }

    @Test
    void testGetNextBufferZeroBacklog() {
        HsSubpartitionConsumer createSubpartitionView = createSubpartitionView();
        ResultSubpartition.BufferAndBacklog createBufferAndBacklog = createBufferAndBacklog(0, Buffer.DataType.DATA_BUFFER, 0);
        TestingHsDataView build = TestingHsDataView.builder().setConsumeBufferFunction(num -> {
            return Optional.of(createBufferAndBacklog);
        }).build();
        TestingHsDataView build2 = TestingHsDataView.builder().setGetBacklogSupplier(() -> {
            return 10;
        }).build();
        createSubpartitionView.setDiskDataView(build);
        createSubpartitionView.setMemoryDataView(build2);
        Assertions.assertThat(createSubpartitionView.getNextBuffer()).satisfies(new ThrowingConsumer[]{bufferAndBacklog -> {
            Assertions.assertThat(bufferAndBacklog.buffersInBacklog()).isEqualTo(10);
            Assertions.assertThat(bufferAndBacklog.buffer()).isEqualTo(createBufferAndBacklog.buffer());
            Assertions.assertThat(bufferAndBacklog.getNextDataType()).isEqualTo(createBufferAndBacklog.getNextDataType());
            Assertions.assertThat(bufferAndBacklog.getSequenceNumber()).isEqualTo(createBufferAndBacklog.getSequenceNumber());
        }});
    }

    @Test
    void testNotifyDataAvailableNeedNotify() {
        CompletableFuture completableFuture = new CompletableFuture();
        HsSubpartitionConsumer createSubpartitionView = createSubpartitionView(() -> {
            completableFuture.complete(null);
        });
        createSubpartitionView.setMemoryDataView(TestingHsDataView.builder().setConsumeBufferFunction(num -> {
            return Optional.of(createBufferAndBacklog(0, Buffer.DataType.NONE, 0));
        }).build());
        createSubpartitionView.setDiskDataView(TestingHsDataView.NO_OP);
        createSubpartitionView.getNextBuffer();
        createSubpartitionView.notifyDataAvailable();
        Assertions.assertThat(completableFuture).isCompleted();
    }

    @Test
    void testNotifyDataAvailableNotNeedNotify() {
        CompletableFuture completableFuture = new CompletableFuture();
        HsSubpartitionConsumer createSubpartitionView = createSubpartitionView(() -> {
            completableFuture.complete(null);
        });
        createSubpartitionView.setMemoryDataView(TestingHsDataView.builder().setConsumeBufferFunction(num -> {
            return Optional.of(createBufferAndBacklog(0, Buffer.DataType.DATA_BUFFER, 0));
        }).build());
        createSubpartitionView.setDiskDataView(TestingHsDataView.NO_OP);
        createSubpartitionView.getNextBuffer();
        createSubpartitionView.notifyDataAvailable();
        Assertions.assertThat(completableFuture).isNotCompleted();
    }

    @Test
    void testGetZeroBacklogNeedNotify() {
        CompletableFuture completableFuture = new CompletableFuture();
        HsSubpartitionConsumer createSubpartitionView = createSubpartitionView(() -> {
            completableFuture.complete(null);
        });
        createSubpartitionView.setMemoryDataView(TestingHsDataView.NO_OP);
        createSubpartitionView.setDiskDataView(TestingHsDataView.builder().setGetBacklogSupplier(() -> {
            return 0;
        }).build());
        Assertions.assertThat(createSubpartitionView.getAvailabilityAndBacklog(0).getBacklog()).isZero();
        Assertions.assertThat(completableFuture).isNotCompleted();
        createSubpartitionView.notifyDataAvailable();
        Assertions.assertThat(completableFuture).isCompleted();
    }

    @Test
    void testGetAvailabilityAndBacklogPositiveCredit() {
        HsSubpartitionConsumer createSubpartitionView = createSubpartitionView();
        createSubpartitionView.setMemoryDataView(TestingHsDataView.NO_OP);
        createSubpartitionView.setDiskDataView(TestingHsDataView.builder().setGetBacklogSupplier(() -> {
            return 2;
        }).build());
        ResultSubpartitionView.AvailabilityWithBacklog availabilityAndBacklog = createSubpartitionView.getAvailabilityAndBacklog(1);
        Assertions.assertThat(availabilityAndBacklog.getBacklog()).isEqualTo(2);
        Assertions.assertThat(availabilityAndBacklog.isAvailable()).isTrue();
    }

    @Test
    void testGetAvailabilityAndBacklogNonPositiveCreditNextIsData() {
        HsSubpartitionConsumer createSubpartitionView = createSubpartitionView();
        createSubpartitionView.setMemoryDataView(TestingHsDataView.builder().setConsumeBufferFunction(num -> {
            return Optional.of(createBufferAndBacklog(2, Buffer.DataType.DATA_BUFFER, 0));
        }).build());
        createSubpartitionView.setDiskDataView(TestingHsDataView.builder().setGetBacklogSupplier(() -> {
            return 2;
        }).build());
        createSubpartitionView.getNextBuffer();
        ResultSubpartitionView.AvailabilityWithBacklog availabilityAndBacklog = createSubpartitionView.getAvailabilityAndBacklog(0);
        Assertions.assertThat(availabilityAndBacklog.getBacklog()).isEqualTo(2);
        Assertions.assertThat(availabilityAndBacklog.isAvailable()).isFalse();
    }

    @Test
    void testGetAvailabilityAndBacklogNonPositiveCreditNextIsEvent() {
        HsSubpartitionConsumer createSubpartitionView = createSubpartitionView();
        createSubpartitionView.setMemoryDataView(TestingHsDataView.builder().setConsumeBufferFunction(num -> {
            return Optional.of(createBufferAndBacklog(2, Buffer.DataType.EVENT_BUFFER, 0));
        }).build());
        createSubpartitionView.setDiskDataView(TestingHsDataView.builder().setGetBacklogSupplier(() -> {
            return 2;
        }).build());
        createSubpartitionView.getNextBuffer();
        ResultSubpartitionView.AvailabilityWithBacklog availabilityAndBacklog = createSubpartitionView.getAvailabilityAndBacklog(0);
        Assertions.assertThat(availabilityAndBacklog.getBacklog()).isEqualTo(2);
        Assertions.assertThat(availabilityAndBacklog.isAvailable()).isTrue();
    }

    @Test
    void testRelease() throws Exception {
        HsSubpartitionConsumer createSubpartitionView = createSubpartitionView();
        CompletableFuture completableFuture = new CompletableFuture();
        CompletableFuture completableFuture2 = new CompletableFuture();
        TestingHsDataView build = TestingHsDataView.builder().setReleaseDataViewRunnable(() -> {
            completableFuture.complete(null);
        }).build();
        TestingHsDataView build2 = TestingHsDataView.builder().setReleaseDataViewRunnable(() -> {
            completableFuture2.complete(null);
        }).build();
        createSubpartitionView.setDiskDataView(build);
        createSubpartitionView.setMemoryDataView(build2);
        createSubpartitionView.releaseAllResources();
        Assertions.assertThat(createSubpartitionView.isReleased()).isTrue();
        Assertions.assertThat(completableFuture).isCompleted();
        Assertions.assertThat(completableFuture2).isCompleted();
    }

    @Test
    void testGetConsumingOffset() {
        AtomicInteger atomicInteger = new AtomicInteger(0);
        HsSubpartitionConsumer createSubpartitionView = createSubpartitionView();
        createSubpartitionView.setDiskDataView(TestingHsDataView.builder().setConsumeBufferFunction(num -> {
            return Optional.of(createBufferAndBacklog(0, Buffer.DataType.DATA_BUFFER, atomicInteger.getAndIncrement()));
        }).build());
        createSubpartitionView.setMemoryDataView(TestingHsDataView.NO_OP);
        Assertions.assertThat(createSubpartitionView.getConsumingOffset(true)).isEqualTo(-1);
        createSubpartitionView.getNextBuffer();
        Assertions.assertThat(createSubpartitionView.getConsumingOffset(true)).isEqualTo(0);
        createSubpartitionView.getNextBuffer();
        Assertions.assertThat(createSubpartitionView.getConsumingOffset(true)).isEqualTo(1);
    }

    @Test
    void testSetDataViewRepeatedly() {
        HsSubpartitionConsumer createSubpartitionView = createSubpartitionView();
        createSubpartitionView.setMemoryDataView(TestingHsDataView.NO_OP);
        Assertions.assertThatThrownBy(() -> {
            createSubpartitionView.setMemoryDataView(TestingHsDataView.NO_OP);
        }).isInstanceOf(IllegalStateException.class).hasMessageContaining("repeatedly set memory data view is not allowed.");
        createSubpartitionView.setDiskDataView(TestingHsDataView.NO_OP);
        Assertions.assertThatThrownBy(() -> {
            createSubpartitionView.setDiskDataView(TestingHsDataView.NO_OP);
        }).isInstanceOf(IllegalStateException.class).hasMessageContaining("repeatedly set disk data view is not allowed.");
    }

    private static HsSubpartitionConsumer createSubpartitionView() {
        return new HsSubpartitionConsumer(new NoOpBufferAvailablityListener());
    }

    private static HsSubpartitionConsumer createSubpartitionView(BufferAvailabilityListener bufferAvailabilityListener) {
        return new HsSubpartitionConsumer(bufferAvailabilityListener);
    }

    private static ResultSubpartition.BufferAndBacklog createBufferAndBacklog(int i, Buffer.DataType dataType, int i2) {
        return new ResultSubpartition.BufferAndBacklog(HybridShuffleTestUtils.createBuffer(8, true), i, dataType, i2);
    }
}
