package org.apache.flink.runtime.io.network.api.writer;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.partition.NoOpBufferAvailablityListener;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionBuilder;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionIndexSet;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.types.IntValue;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/runtime/io/network/api/writer/RecordWriterDelegateTest.class */
class RecordWriterDelegateTest {
    private static final int recordSize = 8;
    private static final int numberOfBuffers = 10;
    private static final int memorySegmentSize = 128;
    private NetworkBufferPool globalPool;

    RecordWriterDelegateTest() {
    }

    @BeforeEach
    void setup() {
        Assertions.assertThat(0).as("Illegal memory segment size", new Object[0]).isZero();
        this.globalPool = new NetworkBufferPool(10, 128);
    }

    @AfterEach
    void teardown() {
        this.globalPool.destroyAllBufferPools();
        this.globalPool.destroy();
    }

    @Test
    void testSingleRecordWriterAvailability() throws Exception {
        RecordWriter createRecordWriter = createRecordWriter(this.globalPool);
        SingleRecordWriter singleRecordWriter = new SingleRecordWriter(createRecordWriter);
        Assertions.assertThat(singleRecordWriter.getRecordWriter(0)).isEqualTo(createRecordWriter);
        verifyAvailability(singleRecordWriter);
    }

    @Test
    void testMultipleRecordWritersAvailability() throws Exception {
        ArrayList arrayList = new ArrayList(2);
        for (int i = 0; i < 2; i++) {
            arrayList.add(createRecordWriter(this.globalPool));
        }
        MultipleRecordWriters multipleRecordWriters = new MultipleRecordWriters(arrayList);
        for (int i2 = 0; i2 < 2; i2++) {
            Assertions.assertThat(multipleRecordWriters.getRecordWriter(i2)).isEqualTo(arrayList.get(i2));
        }
        verifyAvailability(multipleRecordWriters);
    }

    @Test
    void testSingleRecordWriterBroadcastEvent() throws Exception {
        ResultPartition createResultPartition = RecordWriterTest.createResultPartition(128, 2);
        verifyBroadcastEvent(new SingleRecordWriter(new RecordWriterBuilder().build(createResultPartition)), Collections.singletonList(createResultPartition));
    }

    @Test
    void testMultipleRecordWritersBroadcastEvent() throws Exception {
        ArrayList arrayList = new ArrayList(2);
        ArrayList arrayList2 = new ArrayList(2);
        for (int i = 0; i < 2; i++) {
            ResultPartition createResultPartition = RecordWriterTest.createResultPartition(128, 2);
            arrayList2.add(createResultPartition);
            arrayList.add(new RecordWriterBuilder().build(createResultPartition));
        }
        verifyBroadcastEvent(new MultipleRecordWriters(arrayList), arrayList2);
    }

    private static RecordWriter createRecordWriter(NetworkBufferPool networkBufferPool) throws Exception {
        BufferPool createBufferPool = networkBufferPool.createBufferPool(1, 1, 1, Integer.MAX_VALUE, 0);
        ResultPartition build = new ResultPartitionBuilder().setBufferPoolFactory(() -> {
            return createBufferPool;
        }).build();
        build.setup();
        return new RecordWriterBuilder().build(build);
    }

    private static void verifyAvailability(RecordWriterDelegate recordWriterDelegate) throws Exception {
        Assertions.assertThat(recordWriterDelegate.isAvailable()).isTrue();
        Assertions.assertThat(recordWriterDelegate.getAvailableFuture()).isDone();
        RecordWriter recordWriter = recordWriterDelegate.getRecordWriter(0);
        for (int i = 0; i < 16; i++) {
            recordWriter.emit(new IntValue(i));
        }
        Assertions.assertThat(recordWriterDelegate.isAvailable()).isFalse();
        CompletableFuture availableFuture = recordWriterDelegate.getAvailableFuture();
        Assertions.assertThat(availableFuture).isNotDone();
        recordWriter.getTargetPartition().createSubpartitionView(new ResultSubpartitionIndexSet(0), new NoOpBufferAvailablityListener()).getNextBuffer().buffer().recycleBuffer();
        Assertions.assertThat(availableFuture).isDone();
        Assertions.assertThat(recordWriterDelegate.isAvailable()).isTrue();
        Assertions.assertThat(recordWriterDelegate.getAvailableFuture()).isDone();
    }

    private static void verifyBroadcastEvent(RecordWriterDelegate recordWriterDelegate, List<ResultPartition> list) throws Exception {
        CancelCheckpointMarker cancelCheckpointMarker = new CancelCheckpointMarker(1L);
        recordWriterDelegate.broadcastEvent(cancelCheckpointMarker);
        for (ResultPartition resultPartition : list) {
            for (int i = 0; i < resultPartition.getNumberOfSubpartitions(); i++) {
                Assertions.assertThat(resultPartition.getNumberOfQueuedBuffers(i)).isOne();
                BufferOrEvent parseBuffer = RecordWriterTest.parseBuffer(resultPartition.createSubpartitionView(new ResultSubpartitionIndexSet(i), new NoOpBufferAvailablityListener()).getNextBuffer().buffer(), i);
                Assertions.assertThat(parseBuffer.isEvent()).isTrue();
                Assertions.assertThat(parseBuffer.getEvent()).isEqualTo(cancelCheckpointMarker);
            }
        }
    }
}
