/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.TestingBufferAccumulator;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.TestingTierProducerAgent;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.TieredStorageTestUtils;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.BufferAccumulator;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageProducerClient;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.AssertionsForClassTypes;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith(value={ParameterizedTestExtension.class})
class TieredStorageProducerClientTest {
    private static final int NUM_TOTAL_BUFFERS = 1000;
    private static final int NETWORK_BUFFER_SIZE = 1024;
    @Parameter
    private boolean isBroadcast;
    private NetworkBufferPool globalPool;

    TieredStorageProducerClientTest() {
    }

    @Parameters(name="isBroadcast={0}")
    private static Collection<Boolean> parameters() {
        return Arrays.asList(false, true);
    }

    @BeforeEach
    void before() {
        this.globalPool = new NetworkBufferPool(1000, 1024);
    }

    @AfterEach
    void after() {
        this.globalPool.destroy();
    }

    @TestTemplate
    void testWriteRecordsToEmptyStorageTiers() {
        int numSubpartitions = 10;
        int bufferSize = 1024;
        Random random = new Random();
        TieredStorageProducerClient tieredStorageProducerClient = TieredStorageProducerClientTest.createTieredStorageProducerClient(numSubpartitions, Collections.emptyList());
        ((AbstractThrowableAssert)AssertionsForClassTypes.assertThatThrownBy(() -> tieredStorageProducerClient.write(TieredStorageTestUtils.generateRandomData(bufferSize, random), new TieredStorageSubpartitionId(0), Buffer.DataType.DATA_BUFFER, this.isBroadcast)).isInstanceOf(RuntimeException.class)).hasMessageContaining("Failed to choose a storage tier");
    }

    @TestTemplate
    void testWriteRecords() throws IOException {
        int numSubpartitions = 10;
        int numToWriteRecords = 20;
        int bufferSize = 1024;
        Random random = new Random();
        AtomicInteger numReceivedBuffers = new AtomicInteger(0);
        AtomicInteger numReceivedBytes = new AtomicInteger(0);
        AtomicInteger numReceivedBuffersInTier1 = new AtomicInteger(0);
        AtomicInteger numReceivedBuffersInTier2 = new AtomicInteger(0);
        TestingTierProducerAgent tierProducerAgent1 = new TestingTierProducerAgent.Builder().setTryStartSegmentSupplier((subpartitionId, integer) -> numReceivedBuffersInTier1.get() < 1).setTryWriterFunction((subpartitionId, buffer) -> {
            boolean isSuccess;
            boolean bl = isSuccess = numReceivedBuffersInTier1.get() % 2 == 0;
            if (isSuccess) {
                numReceivedBuffers.incrementAndGet();
                numReceivedBuffersInTier1.incrementAndGet();
                numReceivedBytes.set(numReceivedBytes.get() + buffer.readableBytes());
            }
            return isSuccess;
        }).build();
        TestingTierProducerAgent tierProducerAgent2 = new TestingTierProducerAgent.Builder().setTryWriterFunction((subpartitionId, buffer) -> {
            numReceivedBuffers.incrementAndGet();
            numReceivedBuffersInTier2.incrementAndGet();
            numReceivedBytes.set(numReceivedBytes.get() + buffer.readableBytes());
            return true;
        }).build();
        ArrayList<TierProducerAgent> tierProducerAgents = new ArrayList<TierProducerAgent>();
        tierProducerAgents.add(tierProducerAgent1);
        tierProducerAgents.add(tierProducerAgent2);
        TieredStorageProducerClient tieredStorageProducerClient = TieredStorageProducerClientTest.createTieredStorageProducerClient(numSubpartitions, tierProducerAgents);
        TieredStorageSubpartitionId subpartitionId2 = new TieredStorageSubpartitionId(0);
        for (int i = 0; i < numToWriteRecords; ++i) {
            tieredStorageProducerClient.write(TieredStorageTestUtils.generateRandomData(bufferSize, random), subpartitionId2, Buffer.DataType.DATA_BUFFER, this.isBroadcast);
        }
        int numExpectedBytes = this.isBroadcast ? numSubpartitions * numToWriteRecords * bufferSize : numToWriteRecords * bufferSize;
        Assertions.assertThat((int)numReceivedBuffersInTier1.get()).isEqualTo(1);
        Assertions.assertThat((int)numReceivedBuffers.get()).isEqualTo(numReceivedBuffersInTier1.get() + numReceivedBuffersInTier2.get());
        Assertions.assertThat((int)numReceivedBytes.get()).isEqualTo(numExpectedBytes);
    }

    @TestTemplate
    void testTierCanNotStartNewSegment() {
        int numSubpartitions = 10;
        int bufferSize = 1024;
        Random random = new Random();
        TestingTierProducerAgent tierProducerAgent = new TestingTierProducerAgent.Builder().setTryStartSegmentSupplier((subpartitionId, integer) -> false).build();
        TieredStorageProducerClient tieredStorageProducerClient = TieredStorageProducerClientTest.createTieredStorageProducerClient(numSubpartitions, Collections.singletonList(tierProducerAgent));
        ((AbstractThrowableAssert)AssertionsForClassTypes.assertThatThrownBy(() -> tieredStorageProducerClient.write(TieredStorageTestUtils.generateRandomData(bufferSize, random), new TieredStorageSubpartitionId(0), Buffer.DataType.DATA_BUFFER, this.isBroadcast)).isInstanceOf(RuntimeException.class)).hasMessageContaining("Failed to choose a storage tier");
    }

    @TestTemplate
    void testUpdateMetrics() throws IOException {
        int numSubpartitions = 10;
        int bufferSize = 1024;
        Random random = new Random();
        TestingTierProducerAgent tierProducerAgent = new TestingTierProducerAgent.Builder().build();
        TieredStorageProducerClient tieredStorageProducerClient = new TieredStorageProducerClient(numSubpartitions, false, (BufferAccumulator)new TestingBufferAccumulator(), null, Collections.singletonList(tierProducerAgent));
        AtomicInteger numWriteBuffers = new AtomicInteger(0);
        AtomicInteger numWriteBytes = new AtomicInteger(0);
        tieredStorageProducerClient.setMetricStatisticsUpdater(metricStatistics -> {
            numWriteBuffers.set(numWriteBuffers.get() + metricStatistics.numWriteBuffersDelta());
            numWriteBytes.set(numWriteBytes.get() + metricStatistics.numWriteBytesDelta());
        });
        tieredStorageProducerClient.write(TieredStorageTestUtils.generateRandomData(bufferSize, random), new TieredStorageSubpartitionId(0), Buffer.DataType.DATA_BUFFER, this.isBroadcast);
        int numExpectedBuffers = this.isBroadcast ? numSubpartitions : 1;
        int numExpectedBytes = this.isBroadcast ? bufferSize * numSubpartitions : bufferSize;
        Assertions.assertThat((int)numWriteBuffers.get()).isEqualTo(numExpectedBuffers);
        Assertions.assertThat((int)numWriteBytes.get()).isEqualTo(numExpectedBytes);
    }

    @TestTemplate
    void testClose() {
        int numSubpartitions = 10;
        AtomicBoolean isClosed = new AtomicBoolean(false);
        TestingTierProducerAgent tierProducerAgent = new TestingTierProducerAgent.Builder().setCloseRunnable(() -> isClosed.set(true)).build();
        TieredStorageProducerClient tieredStorageProducerClient = TieredStorageProducerClientTest.createTieredStorageProducerClient(numSubpartitions, Collections.singletonList(tierProducerAgent));
        Assertions.assertThat((boolean)isClosed.get()).isFalse();
        tieredStorageProducerClient.close();
        Assertions.assertThat((boolean)isClosed.get()).isTrue();
    }

    private static TieredStorageProducerClient createTieredStorageProducerClient(int numSubpartitions, List<TierProducerAgent> tierProducerAgents) {
        TieredStorageProducerClient tieredStorageProducerClient = new TieredStorageProducerClient(numSubpartitions, false, (BufferAccumulator)new TestingBufferAccumulator(), null, tierProducerAgents);
        tieredStorageProducerClient.setMetricStatisticsUpdater(metricStatistics -> {});
        return tieredStorageProducerClient;
    }
}

