package org.apache.flink.runtime.io.network.partition;

import java.io.IOException;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.List;
import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
import org.apache.flink.runtime.io.network.buffer.BufferDecompressor;
import org.apache.flink.runtime.io.network.partition.BoundedData;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;

@ExtendWith({ParameterizedTestExtension.class})
/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/BoundedDataTestBase.class */
abstract class BoundedDataTestBase {
    private Path subpartitionDataPath;

    @Parameter
    private static boolean compressionEnabled;
    private static final NettyShuffleEnvironmentOptions.CompressionCodec COMPRESSION_CODEC = NettyShuffleEnvironmentOptions.CompressionCodec.LZ4;
    protected static final int BUFFER_SIZE = 1048576;
    private static final BufferCompressor COMPRESSOR = new BufferCompressor(BUFFER_SIZE, COMPRESSION_CODEC);
    private static final BufferDecompressor DECOMPRESSOR = new BufferDecompressor(BUFFER_SIZE, COMPRESSION_CODEC);

    @Parameters(name = "compressionEnabled = {0}")
    private static List<Boolean> compressionEnabled() {
        return Arrays.asList(false, true);
    }

    @BeforeEach
    void before(@TempDir Path path) {
        this.subpartitionDataPath = path.resolve("subpartitiondata");
    }

    protected abstract boolean isRegionBased();

    protected abstract BoundedData createBoundedData(Path path) throws IOException;

    protected abstract BoundedData createBoundedDataWithRegion(Path path, int i) throws IOException;

    /* JADX INFO: Access modifiers changed from: protected */
    public BoundedData createBoundedData() throws IOException {
        return createBoundedData(this.subpartitionDataPath);
    }

    private BoundedData createBoundedDataWithRegion(int i) throws IOException {
        return createBoundedDataWithRegion(this.subpartitionDataPath, i);
    }

    @TestTemplate
    void testWriteAndReadData() throws Exception {
        BoundedData createBoundedData = createBoundedData();
        try {
            testWriteAndReadData(createBoundedData);
            if (createBoundedData != null) {
                createBoundedData.close();
            }
        } catch (Throwable th) {
            if (createBoundedData != null) {
                try {
                    createBoundedData.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @TestTemplate
    void testWriteAndReadDataAcrossRegions() throws Exception {
        if (isRegionBased()) {
            BoundedData createBoundedDataWithRegion = createBoundedDataWithRegion(1276347);
            try {
                testWriteAndReadData(createBoundedDataWithRegion);
                if (createBoundedDataWithRegion != null) {
                    createBoundedDataWithRegion.close();
                }
            } catch (Throwable th) {
                if (createBoundedDataWithRegion != null) {
                    try {
                        createBoundedDataWithRegion.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
    }

    private void testWriteAndReadData(BoundedData boundedData) throws Exception {
        int writeLongs = writeLongs(boundedData, 10000000);
        boundedData.finishWrite();
        readLongs(boundedData.createReader(), writeLongs, 10000000);
    }

    @TestTemplate
    void returnNullAfterEmpty() throws Exception {
        BoundedData createBoundedData = createBoundedData();
        try {
            createBoundedData.finishWrite();
            BoundedData.Reader createReader = createBoundedData.createReader();
            Assertions.assertThat(createReader.nextBuffer()).isNull();
            Assertions.assertThat(createReader.nextBuffer()).isNull();
            Assertions.assertThat(createReader.nextBuffer()).isNull();
            if (createBoundedData != null) {
                createBoundedData.close();
            }
        } catch (Throwable th) {
            if (createBoundedData != null) {
                try {
                    createBoundedData.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @TestTemplate
    void testDeleteFileOnClose() throws Exception {
        BoundedData createBoundedData = createBoundedData(this.subpartitionDataPath);
        Assertions.assertThat(this.subpartitionDataPath).exists();
        createBoundedData.close();
        Assertions.assertThat(this.subpartitionDataPath).doesNotExist();
    }

    @TestTemplate
    void testGetSizeSingleRegion() throws Exception {
        BoundedData createBoundedData = createBoundedData();
        try {
            testGetSize(createBoundedData, 60787, 76687);
            if (createBoundedData != null) {
                createBoundedData.close();
            }
        } catch (Throwable th) {
            if (createBoundedData != null) {
                try {
                    createBoundedData.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @TestTemplate
    void testGetSizeMultipleRegions() throws Exception {
        if (isRegionBased()) {
            int systemPageSizeOrConservativeMultiple = PageSizeUtil.getSystemPageSizeOrConservativeMultiple();
            BoundedData createBoundedDataWithRegion = createBoundedDataWithRegion(systemPageSizeOrConservativeMultiple);
            try {
                testGetSize(createBoundedDataWithRegion, systemPageSizeOrConservativeMultiple / 3, systemPageSizeOrConservativeMultiple - 8);
                if (createBoundedDataWithRegion != null) {
                    createBoundedDataWithRegion.close();
                }
            } catch (Throwable th) {
                if (createBoundedDataWithRegion != null) {
                    try {
                        createBoundedDataWithRegion.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
    }

    private static void testGetSize(BoundedData boundedData, int i, int i2) throws Exception {
        int i3 = i + i2 + 16;
        boundedData.writeBuffer(BufferBuilderTestUtils.buildSomeBuffer(i));
        Assertions.assertThat(boundedData.getSize()).isEqualTo(i + 8);
        boundedData.writeBuffer(BufferBuilderTestUtils.buildSomeBuffer(i2));
        Assertions.assertThat(boundedData.getSize()).isEqualTo(i3);
        boundedData.finishWrite();
        Assertions.assertThat(boundedData.getSize()).isEqualTo(i3);
    }

    private static int writeLongs(BoundedData boundedData, int i) throws IOException {
        int i2 = 0;
        long j = 0;
        while (true) {
            long j2 = j;
            if (j2 >= i) {
                return i2;
            }
            Buffer buildBufferWithAscendingLongs = BufferBuilderTestUtils.buildBufferWithAscendingLongs(BUFFER_SIZE, 131072, j2);
            if (compressionEnabled) {
                Buffer compressToIntermediateBuffer = COMPRESSOR.compressToIntermediateBuffer(buildBufferWithAscendingLongs);
                boundedData.writeBuffer(compressToIntermediateBuffer);
                if (compressToIntermediateBuffer != buildBufferWithAscendingLongs) {
                    compressToIntermediateBuffer.recycleBuffer();
                }
            } else {
                boundedData.writeBuffer(buildBufferWithAscendingLongs);
            }
            i2++;
            buildBufferWithAscendingLongs.recycleBuffer();
            j = j2 + 131072;
        }
    }

    private static void readLongs(BoundedData.Reader reader, int i, int i2) throws IOException {
        int size;
        long j = 0;
        int i3 = 0;
        while (true) {
            Buffer nextBuffer = reader.nextBuffer();
            if (nextBuffer == null) {
                Assertions.assertThat(i3).isEqualTo(i);
                Assertions.assertThat(j).isGreaterThanOrEqualTo(i2);
                return;
            }
            if (compressionEnabled && nextBuffer.isCompressed()) {
                Buffer decompressToIntermediateBuffer = DECOMPRESSOR.decompressToIntermediateBuffer(nextBuffer);
                size = decompressToIntermediateBuffer.getSize() / 8;
                BufferBuilderTestUtils.validateBufferWithAscendingLongs(decompressToIntermediateBuffer, size, j);
                decompressToIntermediateBuffer.recycleBuffer();
            } else {
                size = nextBuffer.getSize() / 8;
                BufferBuilderTestUtils.validateBufferWithAscendingLongs(nextBuffer, size, j);
            }
            j += size;
            i3++;
            nextBuffer.recycleBuffer();
        }
    }
}
