package org.apache.flink.runtime.io.network.partition.hybrid;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Path;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Queue;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool;
import org.apache.flink.runtime.io.disk.FileChannelManager;
import org.apache.flink.runtime.io.disk.FileChannelManagerImpl;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.NoOpBufferAvailablityListener;
import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.io.TempDir;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartitionTest.class */
class HsResultPartitionTest {
    private static final int bufferSize = 1024;
    private static final int totalBuffers = 1000;
    private static final int totalBytes = 33554432;
    private static final int numThreads = 4;
    private FileChannelManager fileChannelManager;
    private NetworkBufferPool globalPool;
    private BatchShuffleReadBufferPool readBufferPool;
    private ScheduledExecutorService readIOExecutor;
    private TaskIOMetricGroup taskIOMetricGroup;

    @TempDir
    public Path tempDataPath;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartitionTest$TestingBufferAvailabilityListener.class */
    public static final class TestingBufferAvailabilityListener implements BufferAvailabilityListener {
        private int numNotifications;

        private TestingBufferAvailabilityListener() {
        }

        public synchronized void notifyDataAvailable() {
            if (this.numNotifications == 0) {
                notifyAll();
            }
            this.numNotifications++;
        }

        public synchronized void waitForData() throws InterruptedException {
            if (this.numNotifications == 0) {
                wait();
            }
            this.numNotifications = 0;
        }
    }

    HsResultPartitionTest() {
    }

    @BeforeEach
    void before() {
        this.fileChannelManager = new FileChannelManagerImpl(new String[]{this.tempDataPath.toString()}, "testing");
        this.globalPool = new NetworkBufferPool(totalBuffers, bufferSize);
        this.readBufferPool = new BatchShuffleReadBufferPool(33554432L, bufferSize);
        this.readIOExecutor = Executors.newScheduledThreadPool(4);
    }

    @AfterEach
    void after() throws Exception {
        this.fileChannelManager.close();
        this.globalPool.destroy();
        this.readBufferPool.destroy();
        this.readIOExecutor.shutdown();
    }

    @Test
    void testEmit() throws Exception {
        Random random = new Random();
        HsResultPartition createHsResultPartition = createHsResultPartition(10, this.globalPool.createBufferPool(100, 100));
        Throwable th = null;
        try {
            try {
                Queue[] queueArr = new Queue[10];
                Queue[] queueArr2 = new Queue[10];
                for (int i = 0; i < 10; i++) {
                    queueArr[i] = new ArrayDeque();
                    queueArr2[i] = new ArrayDeque();
                }
                int[] iArr = new int[10];
                int[] iArr2 = new int[10];
                Arrays.fill(iArr, 0);
                Arrays.fill(iArr2, 0);
                for (int i2 = 0; i2 < totalBuffers; i2++) {
                    ByteBuffer generateRandomData = generateRandomData(random.nextInt(2048) + 1, random);
                    if (random.nextBoolean()) {
                        createHsResultPartition.broadcastRecord(generateRandomData);
                        for (int i3 = 0; i3 < 10; i3++) {
                            recordDataWritten(generateRandomData, queueArr, i3, iArr, Buffer.DataType.DATA_BUFFER);
                        }
                    } else {
                        int nextInt = random.nextInt(10);
                        createHsResultPartition.emitRecord(generateRandomData, nextInt);
                        recordDataWritten(generateRandomData, queueArr, nextInt, iArr, Buffer.DataType.DATA_BUFFER);
                    }
                }
                createHsResultPartition.finish();
                for (int i4 = 0; i4 < 10; i4++) {
                    recordDataWritten(EventSerializer.toSerializedEvent(EndOfPartitionEvent.INSTANCE), queueArr, i4, iArr, Buffer.DataType.EVENT_BUFFER);
                }
                readData(createSubpartitionViews(createHsResultPartition, 10), (buffer, num) -> {
                    int readableBytes = buffer.readableBytes();
                    int intValue = num.intValue();
                    iArr2[intValue] = iArr2[intValue] + readableBytes;
                    MemorySegment allocateUnpooledSegment = MemorySegmentFactory.allocateUnpooledSegment(readableBytes);
                    allocateUnpooledSegment.put(0, buffer.getNioBufferReadable(), readableBytes);
                    queueArr2[num.intValue()].add(new NetworkBuffer(allocateUnpooledSegment, memorySegment -> {
                    }, buffer.getDataType(), readableBytes));
                });
                checkWriteReadResult(10, iArr, iArr2, queueArr, queueArr2);
                if (createHsResultPartition != null) {
                    if (0 == 0) {
                        createHsResultPartition.close();
                        return;
                    }
                    try {
                        createHsResultPartition.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createHsResultPartition != null) {
                if (th != null) {
                    try {
                        createHsResultPartition.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createHsResultPartition.close();
                }
            }
            throw th4;
        }
    }

    @Test
    void testBroadcastEvent() throws Exception {
        BufferPool createBufferPool = this.globalPool.createBufferPool(1, 1);
        HsResultPartition createHsResultPartition = createHsResultPartition(2, createBufferPool);
        Throwable th = null;
        try {
            try {
                createHsResultPartition.broadcastEvent(EndOfPartitionEvent.INSTANCE, false);
                Assertions.assertThat(createBufferPool.getNumberOfAvailableMemorySegments()).isEqualTo(1);
                Tuple2<ResultSubpartitionView, TestingBufferAvailabilityListener>[] createSubpartitionViews = createSubpartitionViews(createHsResultPartition, 2);
                boolean[] zArr = new boolean[2];
                readData(createSubpartitionViews, (buffer, num) -> {
                    Assertions.assertThat(buffer.getDataType().isEvent()).isTrue();
                    try {
                        Assertions.assertThat(EventSerializer.fromSerializedEvent(buffer.readOnlySlice().getNioBufferReadable(), HsResultPartitionTest.class.getClassLoader())).isInstanceOf(EndOfPartitionEvent.class);
                        zArr[num.intValue()] = true;
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                });
                Assertions.assertThat(zArr).containsExactly(new boolean[]{true, true});
                if (createHsResultPartition != null) {
                    if (0 == 0) {
                        createHsResultPartition.close();
                        return;
                    }
                    try {
                        createHsResultPartition.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createHsResultPartition != null) {
                if (th != null) {
                    try {
                        createHsResultPartition.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createHsResultPartition.close();
                }
            }
            throw th4;
        }
    }

    @Test
    void testClose() throws Exception {
        HsResultPartition createHsResultPartition = createHsResultPartition(1, this.globalPool.createBufferPool(1, 1));
        createHsResultPartition.close();
        Assertions.assertThatThrownBy(() -> {
            createHsResultPartition.emitRecord(ByteBuffer.allocate(bufferSize), 0);
        });
    }

    @Timeout(30)
    @Test
    void testRelease() throws Exception {
        BufferPool createBufferPool = this.globalPool.createBufferPool(10, 10);
        HsResultPartition createHsResultPartition = createHsResultPartition(2, createBufferPool, HybridShuffleConfiguration.builder(2, this.readBufferPool.getNumBuffersPerRequest()).setFullStrategyNumBuffersTriggerSpillingRatio(0.6f).setFullStrategyReleaseBufferRatio(0.8f).build());
        createHsResultPartition.emitRecord(ByteBuffer.allocate(5120), 1);
        Assertions.assertThat(createBufferPool.bestEffortGetNumOfUsedBuffers()).isEqualTo(5);
        createHsResultPartition.close();
        Assertions.assertThat(createBufferPool.isDestroyed()).isTrue();
        createHsResultPartition.release();
        while (((File[]) Preconditions.checkNotNull(this.fileChannelManager.getPaths()[0].listFiles())).length != 0) {
            Thread.sleep(10L);
        }
        Assertions.assertThat(totalBuffers).isEqualTo(this.globalPool.getNumberOfAvailableMemorySegments());
    }

    @Test
    void testCreateSubpartitionViewAfterRelease() throws Exception {
        HsResultPartition createHsResultPartition = createHsResultPartition(2, this.globalPool.createBufferPool(10, 10));
        createHsResultPartition.release();
        Assertions.assertThatThrownBy(() -> {
            createHsResultPartition.createSubpartitionView(0, new NoOpBufferAvailablityListener());
        }).isInstanceOf(IllegalStateException.class);
    }

    @Test
    void testCreateSubpartitionViewLostData() throws Exception {
        HsResultPartition createHsResultPartition = createHsResultPartition(2, this.globalPool.createBufferPool(10, 10));
        IOUtils.deleteFilesRecursively(this.tempDataPath);
        Assertions.assertThatThrownBy(() -> {
            createHsResultPartition.createSubpartitionView(0, new NoOpBufferAvailablityListener());
        }).isInstanceOf(PartitionNotFoundException.class);
    }

    @Test
    void testAvailability() throws Exception {
        HsResultPartition createHsResultPartition = createHsResultPartition(1, this.globalPool.createBufferPool(2, 2));
        createHsResultPartition.emitRecord(ByteBuffer.allocate(2048), 0);
        Assertions.assertThat(createHsResultPartition.isAvailable()).isFalse();
        createHsResultPartition.close();
        createHsResultPartition.release();
        Assertions.assertThat(createHsResultPartition.isAvailable()).isTrue();
    }

    @Test
    void testMetricsUpdate() throws Exception {
        HsResultPartition createHsResultPartition = createHsResultPartition(2, this.globalPool.createBufferPool(3, 3));
        Throwable th = null;
        try {
            try {
                createHsResultPartition.emitRecord(ByteBuffer.allocate(bufferSize), 0);
                createHsResultPartition.broadcastRecord(ByteBuffer.allocate(bufferSize));
                Assertions.assertThat(this.taskIOMetricGroup.getNumBuffersOutCounter().getCount()).isEqualTo(3L);
                Assertions.assertThat(this.taskIOMetricGroup.getNumBytesOutCounter().getCount()).isEqualTo(3072L);
                Assertions.assertThat(this.taskIOMetricGroup.createSnapshot().getNumBytesProducedOfPartitions()).hasSize(1).containsValue(2048L);
                if (createHsResultPartition != null) {
                    if (0 == 0) {
                        createHsResultPartition.close();
                        return;
                    }
                    try {
                        createHsResultPartition.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createHsResultPartition != null) {
                if (th != null) {
                    try {
                        createHsResultPartition.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createHsResultPartition.close();
                }
            }
            throw th4;
        }
    }

    private static void recordDataWritten(ByteBuffer byteBuffer, Queue<Tuple2<ByteBuffer, Buffer.DataType>>[] queueArr, int i, int[] iArr, Buffer.DataType dataType) {
        byteBuffer.rewind();
        queueArr[i].add(Tuple2.of(byteBuffer, dataType));
        iArr[i] = iArr[i] + byteBuffer.remaining();
    }

    private long readData(final Tuple2<ResultSubpartitionView, TestingBufferAvailabilityListener>[] tuple2Arr, final BiConsumer<Buffer, Integer> biConsumer) throws Exception {
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        final AtomicInteger atomicInteger2 = new AtomicInteger(0);
        CheckedThread[] checkedThreadArr = new CheckedThread[tuple2Arr.length];
        for (int i = 0; i < tuple2Arr.length; i++) {
            final int i2 = i;
            CheckedThread checkedThread = new CheckedThread() { // from class: org.apache.flink.runtime.io.network.partition.hybrid.HsResultPartitionTest.1
                public void go() throws Exception {
                    ResultSubpartitionView resultSubpartitionView = (ResultSubpartitionView) tuple2Arr[i2].f0;
                    while (true) {
                        ResultSubpartition.BufferAndBacklog nextBuffer = resultSubpartitionView.getNextBuffer();
                        if (nextBuffer == null) {
                            ((TestingBufferAvailabilityListener) tuple2Arr[i2].f1).waitForData();
                        } else {
                            Buffer buffer = nextBuffer.buffer();
                            biConsumer.accept(buffer, Integer.valueOf(i2));
                            atomicInteger.addAndGet(buffer.readableBytes());
                            buffer.recycleBuffer();
                            if (!buffer.isBuffer()) {
                                atomicInteger2.incrementAndGet();
                                resultSubpartitionView.releaseAllResources();
                                return;
                            } else if (nextBuffer.getNextDataType() == Buffer.DataType.NONE) {
                                ((TestingBufferAvailabilityListener) tuple2Arr[i2].f1).waitForData();
                            }
                        }
                    }
                }
            };
            checkedThreadArr[i2] = checkedThread;
            checkedThread.start();
        }
        for (CheckedThread checkedThread2 : checkedThreadArr) {
            checkedThread2.sync();
        }
        return atomicInteger.get();
    }

    private static ByteBuffer generateRandomData(int i, Random random) {
        byte[] bArr = new byte[i];
        random.nextBytes(bArr);
        return ByteBuffer.wrap(bArr);
    }

    private HsResultPartition createHsResultPartition(int i, BufferPool bufferPool) throws IOException {
        return createHsResultPartition(i, bufferPool, HybridShuffleConfiguration.builder(i, this.readBufferPool.getNumBuffersPerRequest()).build());
    }

    private HsResultPartition createHsResultPartition(int i, BufferPool bufferPool, HybridShuffleConfiguration hybridShuffleConfiguration) throws IOException {
        HsResultPartition hsResultPartition = new HsResultPartition("HsResultPartitionTest", 0, new ResultPartitionID(), ResultPartitionType.HYBRID_FULL, i, i, this.readBufferPool, this.readIOExecutor, new ResultPartitionManager(), this.fileChannelManager.createChannel().getPath(), bufferSize, hybridShuffleConfiguration, (BufferCompressor) null, () -> {
            return bufferPool;
        });
        this.taskIOMetricGroup = UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup();
        hsResultPartition.setup();
        hsResultPartition.setMetricGroup(this.taskIOMetricGroup);
        return hsResultPartition;
    }

    private static void checkWriteReadResult(int i, int[] iArr, int[] iArr2, Queue<Tuple2<ByteBuffer, Buffer.DataType>>[] queueArr, Queue<Buffer>[] queueArr2) {
        for (int i2 = 0; i2 < i; i2++) {
            Assertions.assertThat(iArr[i2]).isEqualTo(iArr2[i2]);
            ArrayList arrayList = new ArrayList();
            ArrayList arrayList2 = new ArrayList();
            ByteBuffer allocate = ByteBuffer.allocate(iArr[i2]);
            for (Tuple2<ByteBuffer, Buffer.DataType> tuple2 : queueArr[i2]) {
                allocate.put((ByteBuffer) tuple2.f0);
                ((ByteBuffer) tuple2.f0).rewind();
                if (((Buffer.DataType) tuple2.f1).isEvent()) {
                    arrayList.add(tuple2);
                }
            }
            ByteBuffer allocate2 = ByteBuffer.allocate(iArr2[i2]);
            for (Buffer buffer : queueArr2[i2]) {
                allocate2.put(buffer.getNioBufferReadable());
                if (!buffer.isBuffer()) {
                    arrayList2.add(buffer);
                }
            }
            allocate.flip();
            allocate2.flip();
            Assertions.assertThat(allocate).isEqualTo(allocate2);
            Assertions.assertThat(arrayList.size()).isEqualTo(arrayList2.size());
            for (int i3 = 0; i3 < arrayList.size(); i3++) {
                Assertions.assertThat((Comparable) ((Tuple2) arrayList.get(i3)).f1).isEqualTo(((Buffer) arrayList2.get(i3)).getDataType());
                Assertions.assertThat((Comparable) ((Tuple2) arrayList.get(i3)).f0).isEqualTo(((Buffer) arrayList2.get(i3)).getNioBufferReadable());
            }
        }
    }

    private Tuple2<ResultSubpartitionView, TestingBufferAvailabilityListener>[] createSubpartitionViews(HsResultPartition hsResultPartition, int i) throws Exception {
        Tuple2<ResultSubpartitionView, TestingBufferAvailabilityListener>[] tuple2Arr = new Tuple2[i];
        for (int i2 = 0; i2 < i; i2++) {
            TestingBufferAvailabilityListener testingBufferAvailabilityListener = new TestingBufferAvailabilityListener();
            tuple2Arr[i2] = Tuple2.of(hsResultPartition.createSubpartitionView(i2, testingBufferAvailabilityListener), testingBufferAvailabilityListener);
        }
        return tuple2Arr;
    }
}
