package org.apache.flink.runtime.io.network.partition.hybrid;

import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Deque;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.annotation.Nullable;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
import org.apache.flink.runtime.io.network.buffer.BufferDecompressor;
import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
import org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingInfoProvider;
import org.apache.flink.runtime.io.network.partition.hybrid.TestingMemoryDataManagerOperation;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionMemoryDataManagerTest.class */
class HsSubpartitionMemoryDataManagerTest {
    private static final int SUBPARTITION_ID = 0;
    private static final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    private static final int RECORD_SIZE = 8;
    private int bufferSize = RECORD_SIZE;

    HsSubpartitionMemoryDataManagerTest() {
    }

    @Test
    void testAppendDataRequestBuffer() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        createSubpartitionMemoryDataManager(TestingMemoryDataManagerOperation.builder().setRequestBufferFromPoolSupplier(() -> {
            completableFuture.complete(null);
            return HybridShuffleTestUtils.createBufferBuilder(this.bufferSize);
        }).build()).append(createRecord(0L), Buffer.DataType.DATA_BUFFER);
        Assertions.assertThat(completableFuture).isCompleted();
    }

    @Test
    void testAppendEventNotRequestBuffer() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        createSubpartitionMemoryDataManager(TestingMemoryDataManagerOperation.builder().setRequestBufferFromPoolSupplier(() -> {
            completableFuture.complete(null);
            return null;
        }).build()).append(createRecord(0L), Buffer.DataType.EVENT_BUFFER);
        Assertions.assertThat(completableFuture).isNotDone();
    }

    @Test
    void testAppendEventFinishCurrentBuffer() throws Exception {
        this.bufferSize = 24;
        AtomicInteger atomicInteger = new AtomicInteger(0);
        TestingMemoryDataManagerOperation.Builder requestBufferFromPoolSupplier = TestingMemoryDataManagerOperation.builder().setRequestBufferFromPoolSupplier(() -> {
            return HybridShuffleTestUtils.createBufferBuilder(this.bufferSize);
        });
        atomicInteger.getClass();
        HsSubpartitionMemoryDataManager createSubpartitionMemoryDataManager = createSubpartitionMemoryDataManager(requestBufferFromPoolSupplier.setOnBufferFinishedRunnable(atomicInteger::incrementAndGet).build());
        createSubpartitionMemoryDataManager.append(createRecord(0L), Buffer.DataType.DATA_BUFFER);
        createSubpartitionMemoryDataManager.append(createRecord(1L), Buffer.DataType.DATA_BUFFER);
        Assertions.assertThat(atomicInteger).hasValue(0);
        createSubpartitionMemoryDataManager.append(createRecord(2L), Buffer.DataType.EVENT_BUFFER);
        Assertions.assertThat(atomicInteger).hasValue(2);
    }

    /* JADX WARN: Type inference failed for: r0v18, types: [java.util.List, long, java.util.ArrayList] */
    @ValueSource(strings = {"LZ4", "LZO", "ZSTD", "NULL"})
    @ParameterizedTest
    void testCompressBufferAndConsume(String str) throws Exception {
        this.bufferSize = 80;
        BufferCompressor bufferCompressor = str.equals("NULL") ? null : new BufferCompressor(this.bufferSize, NettyShuffleEnvironmentOptions.CompressionCodec.valueOf(str));
        BufferDecompressor bufferDecompressor = str.equals("NULL") ? null : new BufferDecompressor(this.bufferSize, NettyShuffleEnvironmentOptions.CompressionCodec.valueOf(str));
        ArrayList arrayList = new ArrayList();
        TestingMemoryDataManagerOperation.Builder requestBufferFromPoolSupplier = TestingMemoryDataManagerOperation.builder().setRequestBufferFromPoolSupplier(() -> {
            return HybridShuffleTestUtils.createBufferBuilder(this.bufferSize);
        });
        arrayList.getClass();
        HsSubpartitionMemoryDataManager createSubpartitionMemoryDataManager = createSubpartitionMemoryDataManager(requestBufferFromPoolSupplier.setOnBufferConsumedConsumer((v1) -> {
            r1.add(v1);
        }).build(), bufferCompressor);
        ?? arrayList2 = new ArrayList();
        long j = 0;
        for (int i = 0; i < 10; i++) {
            for (int i2 = 0; i2 < 10; i2++) {
                createSubpartitionMemoryDataManager.append(createRecord(j), Buffer.DataType.DATA_BUFFER);
                long j2 = j;
                j = arrayList2 + 1;
                arrayList2.add(Tuple2.of(Long.valueOf(j2), Buffer.DataType.DATA_BUFFER));
            }
        }
        createSubpartitionMemoryDataManager.append(createRecord(j), Buffer.DataType.EVENT_BUFFER);
        arrayList2.add(Tuple2.of(Long.valueOf(j), Buffer.DataType.EVENT_BUFFER));
        HsSubpartitionConsumerMemoryDataManager registerNewConsumer = createSubpartitionMemoryDataManager.registerNewConsumer(HsConsumerId.DEFAULT);
        ArrayList arrayList3 = new ArrayList();
        for (int i3 = 0; i3 < 11; i3++) {
            arrayList3.add(registerNewConsumer.consumeBuffer(i3, Collections.emptyList()));
        }
        checkConsumedBufferAndNextDataType(10, bufferDecompressor, arrayList2, arrayList3);
        Assertions.assertThat(arrayList).zipSatisfy(HybridShuffleTestUtils.createBufferIndexAndChannelsList(0, IntStream.range(0, 11).toArray()), (bufferIndexAndChannel, bufferIndexAndChannel2) -> {
            Assertions.assertThat(bufferIndexAndChannel.getChannel()).isEqualTo(bufferIndexAndChannel2.getChannel());
            Assertions.assertThat(bufferIndexAndChannel.getBufferIndex()).isEqualTo(bufferIndexAndChannel2.getBufferIndex());
        });
    }

    @Test
    void testGetBuffersSatisfyStatus() throws Exception {
        HsSubpartitionMemoryDataManager createSubpartitionMemoryDataManager = createSubpartitionMemoryDataManager(TestingMemoryDataManagerOperation.builder().setRequestBufferFromPoolSupplier(() -> {
            return HybridShuffleTestUtils.createBufferBuilder(RECORD_SIZE);
        }).build());
        HsSubpartitionConsumerMemoryDataManager registerNewConsumer = createSubpartitionMemoryDataManager.registerNewConsumer(HsConsumerId.DEFAULT);
        for (int i = 0; i < 4; i++) {
            createSubpartitionMemoryDataManager.append(createRecord(i), Buffer.DataType.DATA_BUFFER);
        }
        createSubpartitionMemoryDataManager.spillSubpartitionBuffers(HybridShuffleTestUtils.createBufferIndexAndChannelsList(0, 1, 2), new CompletableFuture());
        registerNewConsumer.consumeBuffer(0, Collections.emptyList());
        registerNewConsumer.consumeBuffer(1, Collections.emptyList());
        checkBufferIndex(createSubpartitionMemoryDataManager.getBuffersSatisfyStatus(HsSpillingInfoProvider.SpillStatus.ALL, HsSpillingInfoProvider.ConsumeStatusWithId.ALL_ANY), Arrays.asList(0, 1, 2, 3));
        checkBufferIndex(createSubpartitionMemoryDataManager.getBuffersSatisfyStatus(HsSpillingInfoProvider.SpillStatus.ALL, HsSpillingInfoProvider.ConsumeStatusWithId.fromStatusAndConsumerId(HsSpillingInfoProvider.ConsumeStatus.CONSUMED, HsConsumerId.DEFAULT)), Arrays.asList(0, 1));
        checkBufferIndex(createSubpartitionMemoryDataManager.getBuffersSatisfyStatus(HsSpillingInfoProvider.SpillStatus.ALL, HsSpillingInfoProvider.ConsumeStatusWithId.fromStatusAndConsumerId(HsSpillingInfoProvider.ConsumeStatus.NOT_CONSUMED, HsConsumerId.DEFAULT)), Arrays.asList(2, 3));
        checkBufferIndex(createSubpartitionMemoryDataManager.getBuffersSatisfyStatus(HsSpillingInfoProvider.SpillStatus.SPILL, HsSpillingInfoProvider.ConsumeStatusWithId.ALL_ANY), Arrays.asList(1, 2));
        checkBufferIndex(createSubpartitionMemoryDataManager.getBuffersSatisfyStatus(HsSpillingInfoProvider.SpillStatus.NOT_SPILL, HsSpillingInfoProvider.ConsumeStatusWithId.ALL_ANY), Arrays.asList(0, 3));
        checkBufferIndex(createSubpartitionMemoryDataManager.getBuffersSatisfyStatus(HsSpillingInfoProvider.SpillStatus.SPILL, HsSpillingInfoProvider.ConsumeStatusWithId.fromStatusAndConsumerId(HsSpillingInfoProvider.ConsumeStatus.NOT_CONSUMED, HsConsumerId.DEFAULT)), Collections.singletonList(2));
        checkBufferIndex(createSubpartitionMemoryDataManager.getBuffersSatisfyStatus(HsSpillingInfoProvider.SpillStatus.SPILL, HsSpillingInfoProvider.ConsumeStatusWithId.fromStatusAndConsumerId(HsSpillingInfoProvider.ConsumeStatus.CONSUMED, HsConsumerId.DEFAULT)), Collections.singletonList(1));
        checkBufferIndex(createSubpartitionMemoryDataManager.getBuffersSatisfyStatus(HsSpillingInfoProvider.SpillStatus.NOT_SPILL, HsSpillingInfoProvider.ConsumeStatusWithId.fromStatusAndConsumerId(HsSpillingInfoProvider.ConsumeStatus.CONSUMED, HsConsumerId.DEFAULT)), Collections.singletonList(0));
        checkBufferIndex(createSubpartitionMemoryDataManager.getBuffersSatisfyStatus(HsSpillingInfoProvider.SpillStatus.NOT_SPILL, HsSpillingInfoProvider.ConsumeStatusWithId.fromStatusAndConsumerId(HsSpillingInfoProvider.ConsumeStatus.NOT_CONSUMED, HsConsumerId.DEFAULT)), Collections.singletonList(3));
    }

    @Test
    void testSpillSubpartitionBuffers() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        HsSubpartitionMemoryDataManager createSubpartitionMemoryDataManager = createSubpartitionMemoryDataManager(TestingMemoryDataManagerOperation.builder().setRequestBufferFromPoolSupplier(() -> {
            return HybridShuffleTestUtils.createBufferBuilder(RECORD_SIZE);
        }).build());
        for (int i = 0; i < 3; i++) {
            createSubpartitionMemoryDataManager.append(createRecord(i), Buffer.DataType.DATA_BUFFER);
        }
        List<BufferIndexAndChannel> createBufferIndexAndChannelsList = HybridShuffleTestUtils.createBufferIndexAndChannelsList(0, 0, 1, 2);
        List spillSubpartitionBuffers = createSubpartitionMemoryDataManager.spillSubpartitionBuffers(createBufferIndexAndChannelsList, completableFuture);
        Assertions.assertThat(createBufferIndexAndChannelsList).zipSatisfy(spillSubpartitionBuffers, (bufferIndexAndChannel, bufferWithIdentity) -> {
            Assertions.assertThat(bufferIndexAndChannel.getBufferIndex()).isEqualTo(bufferWithIdentity.getBufferIndex());
            Assertions.assertThat(bufferIndexAndChannel.getChannel()).isEqualTo(bufferWithIdentity.getChannelIndex());
        });
        List asList = Arrays.asList(0, 1, 2);
        checkBuffersRefCountAndValue(spillSubpartitionBuffers, Arrays.asList(2, 2, 2), asList);
        completableFuture.complete(null);
        checkBuffersRefCountAndValue(spillSubpartitionBuffers, Arrays.asList(1, 1, 1), asList);
    }

    @Test
    void testReleaseAndMarkReadableSubpartitionBuffers() throws Exception {
        int i = 0;
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        HsSubpartitionMemoryDataManager createSubpartitionMemoryDataManager = createSubpartitionMemoryDataManager(TestingMemoryDataManagerOperation.builder().setRequestBufferFromPoolSupplier(() -> {
            MemorySegment allocateUnpooledSegment = MemorySegmentFactory.allocateUnpooledSegment(this.bufferSize);
            arrayList2.getClass();
            return new BufferBuilder(allocateUnpooledSegment, (v1) -> {
                r3.add(v1);
            });
        }).setMarkBufferReadableConsumer((num, num2) -> {
            Assertions.assertThat(num).isEqualTo(i);
            arrayList.add(num2);
        }).build());
        for (int i2 = 0; i2 < 3; i2++) {
            createSubpartitionMemoryDataManager.append(createRecord(i2), Buffer.DataType.DATA_BUFFER);
        }
        List<BufferIndexAndChannel> createBufferIndexAndChannelsList = HybridShuffleTestUtils.createBufferIndexAndChannelsList(0, 0, 1, 2);
        CompletableFuture completableFuture = new CompletableFuture();
        createSubpartitionMemoryDataManager.spillSubpartitionBuffers(createBufferIndexAndChannelsList.subList(2, 3), completableFuture);
        createSubpartitionMemoryDataManager.releaseSubpartitionBuffers(createBufferIndexAndChannelsList);
        Assertions.assertThat(arrayList).isEmpty();
        checkMemorySegmentValue(arrayList2, Arrays.asList(0, 1));
        completableFuture.complete(null);
        Assertions.assertThat(arrayList).containsExactly(new Integer[]{2});
        checkMemorySegmentValue(arrayList2, Arrays.asList(0, 1, 2));
    }

    @Test
    void testMetricsUpdate() throws Exception {
        int i = this.bufferSize / 2;
        TestingMemoryDataManagerOperation build = TestingMemoryDataManagerOperation.builder().setRequestBufferFromPoolSupplier(() -> {
            return HybridShuffleTestUtils.createBufferBuilder(this.bufferSize);
        }).build();
        HsOutputMetrics createTestingOutputMetrics = HybridShuffleTestUtils.createTestingOutputMetrics();
        HsSubpartitionMemoryDataManager createSubpartitionMemoryDataManager = createSubpartitionMemoryDataManager(build);
        createSubpartitionMemoryDataManager.setOutputMetrics(createTestingOutputMetrics);
        createSubpartitionMemoryDataManager.append(ByteBuffer.allocate(i), Buffer.DataType.DATA_BUFFER);
        int remaining = EventSerializer.toSerializedEvent(EndOfPartitionEvent.INSTANCE).remaining();
        createSubpartitionMemoryDataManager.append(EventSerializer.toSerializedEvent(EndOfPartitionEvent.INSTANCE), Buffer.DataType.EVENT_BUFFER);
        Assertions.assertThat(createTestingOutputMetrics.getNumBuffersOut().getCount()).isEqualTo(2L);
        Assertions.assertThat(createTestingOutputMetrics.getNumBytesOut().getCount()).isEqualTo(i + remaining);
    }

    @Test
    void testConsumerRegisterRepeatedly() {
        HsSubpartitionMemoryDataManager createSubpartitionMemoryDataManager = createSubpartitionMemoryDataManager(TestingMemoryDataManagerOperation.builder().build());
        HsConsumerId newId = HsConsumerId.newId((HsConsumerId) null);
        createSubpartitionMemoryDataManager.registerNewConsumer(newId);
        Assertions.assertThatThrownBy(() -> {
            createSubpartitionMemoryDataManager.registerNewConsumer(newId);
        }).isInstanceOf(IllegalStateException.class);
    }

    @Test
    void testRegisterAndReleaseConsumer() {
        HsSubpartitionMemoryDataManager createSubpartitionMemoryDataManager = createSubpartitionMemoryDataManager(TestingMemoryDataManagerOperation.builder().build());
        HsConsumerId newId = HsConsumerId.newId((HsConsumerId) null);
        createSubpartitionMemoryDataManager.registerNewConsumer(newId);
        createSubpartitionMemoryDataManager.releaseConsumer(newId);
        Assertions.assertThatNoException().isThrownBy(() -> {
            createSubpartitionMemoryDataManager.registerNewConsumer(newId);
        });
    }

    private static void checkBufferIndex(Deque<BufferIndexAndChannel> deque, List<Integer> list) {
        Assertions.assertThat((List) deque.stream().map((v0) -> {
            return v0.getBufferIndex();
        }).collect(Collectors.toList())).isEqualTo(list);
    }

    private static void checkMemorySegmentValue(List<MemorySegment> list, List<Integer> list2) {
        for (int i = 0; i < list.size(); i++) {
            Assertions.assertThat(list.get(i).getInt(0)).isEqualTo(list2.get(i));
        }
    }

    private static void checkConsumedBufferAndNextDataType(int i, BufferDecompressor bufferDecompressor, List<Tuple2<Long, Buffer.DataType>> list, List<Optional<ResultSubpartition.BufferAndBacklog>> list2) {
        for (int i2 = 0; i2 < list2.size(); i2++) {
            int i3 = i2;
            Assertions.assertThat(list2.get(i3)).hasValueSatisfying(bufferAndBacklog -> {
                Buffer buffer = bufferAndBacklog.buffer();
                if (buffer.isCompressed()) {
                    Assertions.assertThat(bufferDecompressor).isNotNull();
                    buffer = bufferDecompressor.decompressToIntermediateBuffer(buffer);
                }
                ByteBuffer order = buffer.getNioBufferReadable().order(ByteOrder.LITTLE_ENDIAN);
                int i4 = i3 * i;
                while (order.hasRemaining()) {
                    long j = order.getLong();
                    Buffer.DataType dataType = buffer.getDataType();
                    Assertions.assertThat(j).isEqualTo(((Tuple2) list.get(i4)).f0);
                    Assertions.assertThat(dataType).isEqualTo(((Tuple2) list.get(i4)).f1);
                    i4++;
                }
                if (i3 != list2.size() - 1) {
                    Assertions.assertThat(bufferAndBacklog.getNextDataType()).isEqualTo(((Tuple2) list.get(i4)).f1);
                } else {
                    Assertions.assertThat(bufferAndBacklog.getNextDataType()).isEqualTo(Buffer.DataType.NONE);
                }
                buffer.recycleBuffer();
            });
        }
    }

    private static void checkBuffersRefCountAndValue(List<BufferWithIdentity> list, List<Integer> list2, List<Integer> list3) {
        for (int i = 0; i < list.size(); i++) {
            Buffer buffer = list.get(i).getBuffer();
            Assertions.assertThat(buffer.getNioBufferReadable().order(ByteOrder.LITTLE_ENDIAN).getInt()).isEqualTo(list3.get(i));
            Assertions.assertThat(buffer.refCnt()).isEqualTo(list2.get(i));
        }
    }

    private HsSubpartitionMemoryDataManager createSubpartitionMemoryDataManager(HsMemoryDataManagerOperation hsMemoryDataManagerOperation) {
        return createSubpartitionMemoryDataManager(hsMemoryDataManagerOperation, null);
    }

    private HsSubpartitionMemoryDataManager createSubpartitionMemoryDataManager(HsMemoryDataManagerOperation hsMemoryDataManagerOperation, @Nullable BufferCompressor bufferCompressor) {
        HsSubpartitionMemoryDataManager hsSubpartitionMemoryDataManager = new HsSubpartitionMemoryDataManager(0, this.bufferSize, lock.readLock(), bufferCompressor, hsMemoryDataManagerOperation);
        hsSubpartitionMemoryDataManager.setOutputMetrics(HybridShuffleTestUtils.createTestingOutputMetrics());
        return hsSubpartitionMemoryDataManager;
    }

    private static ByteBuffer createRecord(long j) {
        ByteBuffer allocate = ByteBuffer.allocate(RECORD_SIZE);
        allocate.order(ByteOrder.LITTLE_ENDIAN);
        allocate.putLong(j);
        allocate.flip();
        return allocate;
    }
}
