package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.TestingBufferAccumulator;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.TestingTierProducerAgent;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.TieredStorageTestUtils;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.AssertionsForClassTypes;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith({ParameterizedTestExtension.class})
/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageProducerClientTest.class */
public class TieredStorageProducerClientTest {
    private static final int NUM_TOTAL_BUFFERS = 1000;
    private static final int NETWORK_BUFFER_SIZE = 1024;

    @Parameter
    public boolean isBroadcast;
    private NetworkBufferPool globalPool;

    @Parameters(name = "isBroadcast={0}")
    public static Collection<Boolean> parameters() {
        return Arrays.asList(false, true);
    }

    @BeforeEach
    void before() {
        this.globalPool = new NetworkBufferPool(1000, 1024);
    }

    @AfterEach
    void after() {
        this.globalPool.destroy();
    }

    @TestTemplate
    void testWriteRecordsToEmptyStorageTiers() {
        int i = 1024;
        Random random = new Random();
        TieredStorageProducerClient createTieredStorageProducerClient = createTieredStorageProducerClient(10, Collections.emptyList());
        AssertionsForClassTypes.assertThatThrownBy(() -> {
            createTieredStorageProducerClient.write(TieredStorageTestUtils.generateRandomData(i, random), new TieredStorageSubpartitionId(0), Buffer.DataType.DATA_BUFFER, this.isBroadcast);
        }).isInstanceOf(RuntimeException.class).hasMessageContaining("Failed to choose a storage tier");
    }

    @TestTemplate
    void testWriteRecords() throws IOException {
        Random random = new Random();
        AtomicInteger atomicInteger = new AtomicInteger(0);
        AtomicInteger atomicInteger2 = new AtomicInteger(0);
        AtomicInteger atomicInteger3 = new AtomicInteger(0);
        AtomicInteger atomicInteger4 = new AtomicInteger(0);
        TestingTierProducerAgent build = new TestingTierProducerAgent.Builder().setTryStartSegmentSupplier((tieredStorageSubpartitionId, num) -> {
            return Boolean.valueOf(atomicInteger3.get() < 1);
        }).setTryWriterFunction((tieredStorageSubpartitionId2, buffer) -> {
            boolean z = atomicInteger3.get() % 2 == 0;
            if (z) {
                atomicInteger.incrementAndGet();
                atomicInteger3.incrementAndGet();
                atomicInteger2.set(atomicInteger2.get() + buffer.readableBytes());
            }
            return Boolean.valueOf(z);
        }).build();
        TestingTierProducerAgent build2 = new TestingTierProducerAgent.Builder().setTryWriterFunction((tieredStorageSubpartitionId3, buffer2) -> {
            atomicInteger.incrementAndGet();
            atomicInteger4.incrementAndGet();
            atomicInteger2.set(atomicInteger2.get() + buffer2.readableBytes());
            return true;
        }).build();
        ArrayList arrayList = new ArrayList();
        arrayList.add(build);
        arrayList.add(build2);
        TieredStorageProducerClient createTieredStorageProducerClient = createTieredStorageProducerClient(10, arrayList);
        TieredStorageSubpartitionId tieredStorageSubpartitionId4 = new TieredStorageSubpartitionId(0);
        for (int i = 0; i < 20; i++) {
            createTieredStorageProducerClient.write(TieredStorageTestUtils.generateRandomData(1024, random), tieredStorageSubpartitionId4, Buffer.DataType.DATA_BUFFER, this.isBroadcast);
        }
        int i2 = this.isBroadcast ? 10 * 20 * 1024 : 20 * 1024;
        Assertions.assertThat(atomicInteger3.get()).isEqualTo(1);
        Assertions.assertThat(atomicInteger.get()).isEqualTo(atomicInteger3.get() + atomicInteger4.get());
        Assertions.assertThat(atomicInteger2.get()).isEqualTo(i2);
    }

    @TestTemplate
    void testTierCanNotStartNewSegment() {
        int i = 1024;
        Random random = new Random();
        TieredStorageProducerClient createTieredStorageProducerClient = createTieredStorageProducerClient(10, Collections.singletonList(new TestingTierProducerAgent.Builder().setTryStartSegmentSupplier((tieredStorageSubpartitionId, num) -> {
            return false;
        }).build()));
        AssertionsForClassTypes.assertThatThrownBy(() -> {
            createTieredStorageProducerClient.write(TieredStorageTestUtils.generateRandomData(i, random), new TieredStorageSubpartitionId(0), Buffer.DataType.DATA_BUFFER, this.isBroadcast);
        }).isInstanceOf(RuntimeException.class).hasMessageContaining("Failed to choose a storage tier");
    }

    @TestTemplate
    void testUpdateMetrics() throws IOException {
        Random random = new Random();
        TieredStorageProducerClient tieredStorageProducerClient = new TieredStorageProducerClient(10, false, new TestingBufferAccumulator(), (BufferCompressor) null, Collections.singletonList(new TestingTierProducerAgent.Builder().build()));
        AtomicInteger atomicInteger = new AtomicInteger(0);
        AtomicInteger atomicInteger2 = new AtomicInteger(0);
        tieredStorageProducerClient.setMetricStatisticsUpdater(tieredStorageProducerMetricUpdate -> {
            atomicInteger.set(atomicInteger.get() + tieredStorageProducerMetricUpdate.numWriteBuffersDelta());
            atomicInteger2.set(atomicInteger2.get() + tieredStorageProducerMetricUpdate.numWriteBytesDelta());
        });
        tieredStorageProducerClient.write(TieredStorageTestUtils.generateRandomData(1024, random), new TieredStorageSubpartitionId(0), Buffer.DataType.DATA_BUFFER, this.isBroadcast);
        int i = this.isBroadcast ? 10 : 1;
        int i2 = this.isBroadcast ? 1024 * 10 : 1024;
        Assertions.assertThat(atomicInteger.get()).isEqualTo(i);
        Assertions.assertThat(atomicInteger2.get()).isEqualTo(i2);
    }

    @TestTemplate
    void testClose() {
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        TieredStorageProducerClient createTieredStorageProducerClient = createTieredStorageProducerClient(10, Collections.singletonList(new TestingTierProducerAgent.Builder().setCloseRunnable(() -> {
            atomicBoolean.set(true);
        }).build()));
        Assertions.assertThat(atomicBoolean.get()).isFalse();
        createTieredStorageProducerClient.close();
        Assertions.assertThat(atomicBoolean.get()).isTrue();
    }

    private static TieredStorageProducerClient createTieredStorageProducerClient(int i, List<TierProducerAgent> list) {
        TieredStorageProducerClient tieredStorageProducerClient = new TieredStorageProducerClient(i, false, new TestingBufferAccumulator(), (BufferCompressor) null, list);
        tieredStorageProducerClient.setMetricStatisticsUpdater(tieredStorageProducerMetricUpdate -> {
        });
        return tieredStorageProducerClient;
    }
}
