/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.disk;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.network.api.EndOfSegmentEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.TestingTieredStorageMemoryManager;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageIdMappingUtils;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.PartitionFileWriter;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.TestingPartitionFileWriter;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemoryManager;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.disk.DiskCacheManager;
import org.apache.flink.util.concurrent.FutureUtils;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

class DiskCacheManagerTest {
    DiskCacheManagerTest() {
    }

    @Test
    void testAppend() {
        int numAddBuffers = 100;
        int maxBufferSizeBytes = 100;
        int subpartitionId = 0;
        Random random = new Random();
        TestingTieredStorageMemoryManager memoryManager = new TestingTieredStorageMemoryManager.Builder().build();
        AtomicInteger numReceivedBuffers = new AtomicInteger(0);
        AtomicInteger numReceivedBytes = new AtomicInteger(0);
        TestingPartitionFileWriter partitionFileWriter = new TestingPartitionFileWriter.Builder().setWriteFunction((partitionId, subpartitionBufferContexts) -> {
            Tuple2<Integer, Integer> numBuffersAndBytes = this.getNumReceivedBuffersAndBytes((List<PartitionFileWriter.SubpartitionBufferContext>)subpartitionBufferContexts);
            numReceivedBuffers.getAndAdd((Integer)numBuffersAndBytes.f0);
            numReceivedBytes.getAndAdd((Integer)numBuffersAndBytes.f1);
            return FutureUtils.completedVoidFuture();
        }).build();
        DiskCacheManager diskCacheManager = new DiskCacheManager(TieredStorageIdMappingUtils.convertId((ResultPartitionID)new ResultPartitionID()), 1, 1024, (TieredStorageMemoryManager)memoryManager, (PartitionFileWriter)partitionFileWriter);
        int numExpectBytes = 0;
        for (int i = 0; i < numAddBuffers; ++i) {
            int bufferSizeBytes = random.nextInt(maxBufferSizeBytes) + 1;
            numExpectBytes += bufferSizeBytes;
            diskCacheManager.append(BufferBuilderTestUtils.buildSomeBuffer(bufferSizeBytes), subpartitionId, true);
        }
        Assertions.assertThat((int)diskCacheManager.getBufferIndex(subpartitionId)).isEqualTo(numAddBuffers);
        diskCacheManager.close();
        Assertions.assertThat((AtomicInteger)numReceivedBuffers).hasValue(numAddBuffers);
        Assertions.assertThat((AtomicInteger)numReceivedBytes).hasValue(numExpectBytes);
    }

    @Test
    void testAppendEndOfSegmentEvent() throws IOException {
        TestingTieredStorageMemoryManager memoryManager = new TestingTieredStorageMemoryManager.Builder().build();
        ArrayList receivedBuffers = new ArrayList();
        TestingPartitionFileWriter partitionFileWriter = new TestingPartitionFileWriter.Builder().setWriteFunction((partitionId, subpartitionBufferContexts) -> {
            receivedBuffers.addAll(subpartitionBufferContexts);
            return FutureUtils.completedVoidFuture();
        }).build();
        DiskCacheManager diskCacheManager = new DiskCacheManager(TieredStorageIdMappingUtils.convertId((ResultPartitionID)new ResultPartitionID()), 1, 1024, (TieredStorageMemoryManager)memoryManager, (PartitionFileWriter)partitionFileWriter);
        diskCacheManager.appendEndOfSegmentEvent(EventSerializer.toSerializedEvent((AbstractEvent)EndOfSegmentEvent.INSTANCE), 0);
        diskCacheManager.close();
        Assertions.assertThat(receivedBuffers).hasSize(1);
        List segmentBufferContexts = ((PartitionFileWriter.SubpartitionBufferContext)receivedBuffers.get(0)).getSegmentBufferContexts();
        Assertions.assertThat((List)segmentBufferContexts).hasSize(1);
        List bufferAndIndexes = ((PartitionFileWriter.SegmentBufferContext)segmentBufferContexts.get(0)).getBufferAndIndexes();
        Assertions.assertThat((List)bufferAndIndexes).hasSize(1);
        Buffer buffer = (Buffer)((Tuple2)bufferAndIndexes.get((int)0)).f0;
        Assertions.assertThat((boolean)buffer.isBuffer()).isFalse();
        AbstractEvent event = EventSerializer.fromSerializedEvent((ByteBuffer)buffer.readOnlySlice().getNioBufferReadable(), (ClassLoader)this.getClass().getClassLoader());
        Assertions.assertThat((Object)event).isInstanceOf(EndOfSegmentEvent.class);
    }

    @Test
    void testFlushWhenCachedBytesReachLimit() throws IOException {
        TestingTieredStorageMemoryManager memoryManager = new TestingTieredStorageMemoryManager.Builder().build();
        AtomicInteger numWriteTimes = new AtomicInteger(0);
        TestingPartitionFileWriter partitionFileWriter = new TestingPartitionFileWriter.Builder().setWriteFunction((partitionId, subpartitionBufferContexts) -> {
            numWriteTimes.incrementAndGet();
            return FutureUtils.completedVoidFuture();
        }).build();
        DiskCacheManager diskCacheManager = new DiskCacheManager(TieredStorageIdMappingUtils.convertId((ResultPartitionID)new ResultPartitionID()), 1, 1024, (TieredStorageMemoryManager)memoryManager, (PartitionFileWriter)partitionFileWriter);
        diskCacheManager.append(BufferBuilderTestUtils.buildSomeBuffer(1024), 0, true);
        Assertions.assertThat((AtomicInteger)numWriteTimes).hasValue(0);
        diskCacheManager.append(BufferBuilderTestUtils.buildSomeBuffer(1), 0, true);
        Assertions.assertThat((AtomicInteger)numWriteTimes).hasValue(1);
        diskCacheManager.append(BufferBuilderTestUtils.buildSomeBuffer(1024), 0, true);
        Assertions.assertThat((AtomicInteger)numWriteTimes).hasValue(1);
        diskCacheManager.appendEndOfSegmentEvent(EventSerializer.toSerializedEvent((AbstractEvent)EndOfSegmentEvent.INSTANCE), 0);
        Assertions.assertThat((AtomicInteger)numWriteTimes).hasValue(2);
    }

    @Test
    void testRelease() {
        AtomicBoolean isReleased = new AtomicBoolean(false);
        TestingTieredStorageMemoryManager memoryManager = new TestingTieredStorageMemoryManager.Builder().build();
        TestingPartitionFileWriter partitionFileWriter = new TestingPartitionFileWriter.Builder().setReleaseRunnable(() -> isReleased.set(true)).build();
        DiskCacheManager diskCacheManager = new DiskCacheManager(TieredStorageIdMappingUtils.convertId((ResultPartitionID)new ResultPartitionID()), 1, 1024, (TieredStorageMemoryManager)memoryManager, (PartitionFileWriter)partitionFileWriter);
        diskCacheManager.release();
        Assertions.assertThat((AtomicBoolean)isReleased).isTrue();
    }

    private Tuple2<Integer, Integer> getNumReceivedBuffersAndBytes(List<PartitionFileWriter.SubpartitionBufferContext> subpartitionBufferContexts) {
        int numReceivedBuffers = 0;
        int numReceivedBytes = 0;
        for (PartitionFileWriter.SubpartitionBufferContext subpartitionBufferContext : subpartitionBufferContexts) {
            for (PartitionFileWriter.SegmentBufferContext segmentBufferContext : subpartitionBufferContext.getSegmentBufferContexts()) {
                numReceivedBuffers += segmentBufferContext.getBufferAndIndexes().size();
                numReceivedBytes += segmentBufferContext.getBufferAndIndexes().stream().mapToInt(bufferAndIndex -> ((Buffer)bufferAndIndex.f0).readableBytes()).sum();
            }
        }
        return new Tuple2((Object)numReceivedBuffers, (Object)numReceivedBytes);
    }
}

