/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.disk;

import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.time.Duration;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutorService;
import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool;
import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.TestingTieredStorageMemoryManager;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageDataIdentifier;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageIdMappingUtils;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.PartitionFileReader;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.PartitionFileWriter;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.TestingPartitionFileReader;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.TestingPartitionFileWriter;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.TestingNettyServiceProducer;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.TestingTieredStorageNettyService;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.TieredStorageNettyService;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemoryManager;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageResourceRegistry;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.disk.DiskTierProducerAgent;
import org.apache.flink.testutils.junit.utils.TempDirUtils;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

public class DiskTierProducerAgentTest {
    @TempDir
    private Path tempFolder;
    private static final int NUM_SUBPARTITIONS = 10;
    private static final int BUFFER_SIZE_BYTES = 1024;
    private static final int NUM_BYTES_PER_SEGMENT = 2048;
    private static final TieredStoragePartitionId PARTITION_ID = TieredStorageIdMappingUtils.convertId((ResultPartitionID)new ResultPartitionID());
    private static final TieredStorageSubpartitionId SUBPARTITION_ID = new TieredStorageSubpartitionId(0);

    @Test
    void testStartNewSegmentSuccess() throws IOException {
        String partitionFile = TempDirUtils.newFile((Path)this.tempFolder, (String)"test").toString();
        File testFile = new File(partitionFile + ".tier-storage.data");
        Assertions.assertThat((boolean)testFile.createNewFile()).isTrue();
        try (DiskTierProducerAgent diskTierProducerAgent = DiskTierProducerAgentTest.createDiskTierProducerAgent(false, 2048, 0.0f, partitionFile, new TestingPartitionFileWriter.Builder().build(), new TieredStorageResourceRegistry());){
            Assertions.assertThat((boolean)diskTierProducerAgent.tryStartNewSegment(SUBPARTITION_ID, 0, 0)).isTrue();
        }
    }

    @Test
    void testStartNewSegmentFailed() {
        try (DiskTierProducerAgent diskTierProducerAgent = DiskTierProducerAgentTest.createDiskTierProducerAgent(false, 2048, 1.0f, this.tempFolder.toString(), new TestingPartitionFileWriter.Builder().build(), new TieredStorageResourceRegistry());){
            Assertions.assertThat((boolean)diskTierProducerAgent.tryStartNewSegment(SUBPARTITION_ID, 0, 0)).isFalse();
        }
    }

    @Test
    void testWriteSuccess() {
        try (DiskTierProducerAgent diskTierProducerAgent = DiskTierProducerAgentTest.createDiskTierProducerAgent(false, 1024, 0.0f, this.tempFolder.toString(), new TestingPartitionFileWriter.Builder().build(), new TieredStorageResourceRegistry());){
            diskTierProducerAgent.tryStartNewSegment(SUBPARTITION_ID, 0, 0);
            Assertions.assertThat((boolean)diskTierProducerAgent.tryWrite(SUBPARTITION_ID, BufferBuilderTestUtils.buildSomeBuffer(), (Object)this, 0)).isTrue();
        }
    }

    @Test
    void testWriteFailed() {
        try (DiskTierProducerAgent diskTierProducerAgent = DiskTierProducerAgentTest.createDiskTierProducerAgent(false, 1024, 0.0f, this.tempFolder.toString(), new TestingPartitionFileWriter.Builder().build(), new TieredStorageResourceRegistry());){
            diskTierProducerAgent.tryStartNewSegment(SUBPARTITION_ID, 0, 0);
            Assertions.assertThat((boolean)diskTierProducerAgent.tryWrite(SUBPARTITION_ID, BufferBuilderTestUtils.buildSomeBuffer(), (Object)this, 0)).isTrue();
            Assertions.assertThat((boolean)diskTierProducerAgent.tryWrite(SUBPARTITION_ID, BufferBuilderTestUtils.buildSomeBuffer(1024), (Object)this, 0)).isFalse();
        }
    }

    @Test
    void testWriteBroadcastBuffer() {
        try (DiskTierProducerAgent diskTierProducerAgent = DiskTierProducerAgentTest.createDiskTierProducerAgent(true, 2048, 0.0f, this.tempFolder.toString(), new TestingPartitionFileWriter.Builder().build(), new TieredStorageResourceRegistry());){
            diskTierProducerAgent.tryStartNewSegment(SUBPARTITION_ID, 0, 0);
            Assertions.assertThatThrownBy(() -> diskTierProducerAgent.tryWrite(new TieredStorageSubpartitionId(1), BufferBuilderTestUtils.buildSomeBuffer(), (Object)this, 0)).isInstanceOf(ArrayIndexOutOfBoundsException.class);
            Assertions.assertThat((boolean)diskTierProducerAgent.tryWrite(SUBPARTITION_ID, BufferBuilderTestUtils.buildSomeBuffer(), (Object)this, 0)).isTrue();
        }
    }

    @Test
    void testRelease() {
        AtomicBoolean isReleased = new AtomicBoolean(false);
        TestingPartitionFileWriter partitionFileWriter = new TestingPartitionFileWriter.Builder().setReleaseRunnable(() -> isReleased.set(true)).build();
        TieredStorageResourceRegistry resourceRegistry = new TieredStorageResourceRegistry();
        DiskTierProducerAgent diskTierProducerAgent = DiskTierProducerAgentTest.createDiskTierProducerAgent(false, 2048, 0.0f, this.tempFolder.toString(), partitionFileWriter, resourceRegistry);
        diskTierProducerAgent.close();
        resourceRegistry.clearResourceFor((TieredStorageDataIdentifier)PARTITION_ID);
        Assertions.assertThat((AtomicBoolean)isReleased).isTrue();
    }

    private static DiskTierProducerAgent createDiskTierProducerAgent(boolean isBroadcastOnly, int numBytesPerSegment, float minReservedDiskSpaceFraction, String dataFileBasePath, PartitionFileWriter partitionFileWriter, TieredStorageResourceRegistry resourceRegistry) {
        TestingTieredStorageMemoryManager memoryManager = new TestingTieredStorageMemoryManager.Builder().setGetMaxNonReclaimableBuffersFunction(ignore -> Integer.MAX_VALUE).build();
        TestingTieredStorageNettyService nettyService = new TestingTieredStorageNettyService.Builder().build();
        TestingNettyServiceProducer nettyServiceProducer = new TestingNettyServiceProducer.Builder().build();
        nettyService.registerProducer(PARTITION_ID, nettyServiceProducer);
        Path dataFilePath = new File(dataFileBasePath + ".tier-storage.data").toPath();
        return new DiskTierProducerAgent(PARTITION_ID, 10, numBytesPerSegment, 1024, 1024, dataFilePath, minReservedDiskSpaceFraction, isBroadcastOnly, partitionFileWriter, (PartitionFileReader)new TestingPartitionFileReader.Builder().build(), (TieredStorageMemoryManager)memoryManager, (TieredStorageNettyService)nettyService, resourceRegistry, new BatchShuffleReadBufferPool(1L, 1), (ScheduledExecutorService)new ManuallyTriggeredScheduledExecutorService(), 0, Duration.ofMinutes(5L));
    }
}

