/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.checkpoint.SnapshotType;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.NoOpBufferAvailablityListener;
import org.apache.flink.runtime.io.network.partition.PartitionTestUtils;
import org.apache.flink.runtime.io.network.partition.PipelinedSubpartition;
import org.apache.flink.runtime.io.network.partition.PipelinedSubpartitionView;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.runtime.io.network.partition.SubpartitionTestBase;
import org.apache.flink.runtime.io.network.util.TestConsumerCallback;
import org.apache.flink.runtime.io.network.util.TestProducerSource;
import org.apache.flink.runtime.io.network.util.TestSubpartitionConsumer;
import org.apache.flink.runtime.io.network.util.TestSubpartitionProducer;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.testutils.junit.extensions.parameterized.NoOpTestExtension;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.function.CheckedSupplier;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.Assumptions;
import org.junit.Assert;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

@ExtendWith(value={NoOpTestExtension.class})
public class PipelinedSubpartitionTest
extends SubpartitionTestBase {
    @RegisterExtension
    private static final TestExecutorExtension<ExecutorService> EXECUTOR_EXTENSION = new TestExecutorExtension(Executors::newCachedThreadPool);

    PipelinedSubpartition createSubpartition() throws Exception {
        return PipelinedSubpartitionTest.createPipelinedSubpartition();
    }

    @Override
    ResultSubpartition createFailingWritesSubpartition() throws Exception {
        Assumptions.assumeThat((boolean)false).isTrue();
        return null;
    }

    @TestTemplate
    void testIllegalReadViewRequest() throws Exception {
        PipelinedSubpartition subpartition = this.createSubpartition();
        Assertions.assertThat((Object)subpartition.createReadView((BufferAvailabilityListener)new NoOpBufferAvailablityListener())).isNotNull();
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> subpartition.createReadView((BufferAvailabilityListener)new NoOpBufferAvailablityListener())).withFailMessage("Did not throw expected exception after duplicate notifyNonEmpty view request.", new Object[0])).isInstanceOf(IllegalStateException.class);
    }

    @TestTemplate
    void testIsReleasedChecksParent() {
        PipelinedSubpartition subpartition = (PipelinedSubpartition)Mockito.mock(PipelinedSubpartition.class);
        PipelinedSubpartitionView reader = new PipelinedSubpartitionView(subpartition, (BufferAvailabilityListener)Mockito.mock(BufferAvailabilityListener.class));
        Assertions.assertThat((boolean)reader.isReleased()).isFalse();
        ((PipelinedSubpartition)Mockito.verify((Object)subpartition, (VerificationMode)Mockito.times((int)1))).isReleased();
        Mockito.when((Object)subpartition.isReleased()).thenReturn((Object)true);
        Assertions.assertThat((boolean)reader.isReleased()).isTrue();
        ((PipelinedSubpartition)Mockito.verify((Object)subpartition, (VerificationMode)Mockito.times((int)2))).isReleased();
    }

    @TestTemplate
    void testConcurrentFastProduceAndFastConsume() throws Exception {
        this.testProduceConsume(false, false);
    }

    @TestTemplate
    void testConcurrentFastProduceAndSlowConsume() throws Exception {
        this.testProduceConsume(false, true);
    }

    @TestTemplate
    void testConcurrentSlowProduceAndFastConsume() throws Exception {
        this.testProduceConsume(true, false);
    }

    @TestTemplate
    void testConcurrentSlowProduceAndSlowConsume() throws Exception {
        this.testProduceConsume(true, true);
    }

    private void testProduceConsume(boolean isSlowProducer, boolean isSlowConsumer) throws Exception {
        int producerNumberOfBuffersToProduce = 128;
        int bufferSize = 32768;
        TestProducerSource producerSource = new TestProducerSource(){
            private int numberOfBuffers;

            @Override
            public TestProducerSource.BufferAndChannel getNextBuffer() throws Exception {
                if (this.numberOfBuffers == 128) {
                    return null;
                }
                MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment((int)32768);
                int next = this.numberOfBuffers * 8192;
                for (int i = 0; i < 32768; i += 4) {
                    segment.putInt(i, next);
                    ++next;
                }
                ++this.numberOfBuffers;
                return new TestProducerSource.BufferAndChannel(segment.getArray(), 0);
            }
        };
        TestConsumerCallback consumerCallback = new TestConsumerCallback(){
            private int numberOfBuffers;

            @Override
            public void onBuffer(Buffer buffer) {
                MemorySegment segment = buffer.getMemorySegment();
                Assertions.assertThat((int)buffer.getSize()).isEqualTo(segment.size());
                int expected = this.numberOfBuffers * (segment.size() / 4);
                for (int i = 0; i < segment.size(); i += 4) {
                    Assertions.assertThat((int)segment.getInt(i)).isEqualTo(expected);
                    ++expected;
                }
                ++this.numberOfBuffers;
                buffer.recycleBuffer();
            }

            @Override
            public void onEvent(AbstractEvent event) {
            }
        };
        PipelinedSubpartition subpartition = this.createSubpartition();
        TestSubpartitionProducer producer = new TestSubpartitionProducer((ResultSubpartition)subpartition, isSlowProducer, producerSource);
        TestSubpartitionConsumer consumer = new TestSubpartitionConsumer(isSlowConsumer, consumerCallback);
        PipelinedSubpartitionView view = subpartition.createReadView((BufferAvailabilityListener)consumer);
        consumer.setSubpartitionView((ResultSubpartitionView)view);
        CompletableFuture producerResult = CompletableFuture.supplyAsync(CheckedSupplier.unchecked(producer::call), EXECUTOR_EXTENSION.getExecutor());
        CompletableFuture consumerResult = CompletableFuture.supplyAsync(CheckedSupplier.unchecked(consumer::call), EXECUTOR_EXTENSION.getExecutor());
        FutureUtils.waitForAll(Arrays.asList(producerResult, consumerResult)).get(60000L, TimeUnit.MILLISECONDS);
    }

    @TestTemplate
    void testCleanupReleasedPartitionNoView() throws Exception {
        this.testCleanupReleasedPartition(false);
    }

    @TestTemplate
    void testCleanupReleasedPartitionWithView() throws Exception {
        this.testCleanupReleasedPartition(true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testCleanupReleasedPartition(boolean createView) throws Exception {
        boolean buffer2Recycled;
        boolean buffer1Recycled;
        PipelinedSubpartition partition = this.createSubpartition();
        BufferConsumer buffer1 = BufferBuilderTestUtils.createFilledFinishedBufferConsumer(4096);
        BufferConsumer buffer2 = BufferBuilderTestUtils.createFilledFinishedBufferConsumer(4096);
        try {
            partition.add(buffer1);
            partition.add(buffer2);
            Assertions.assertThat((int)partition.getNumberOfQueuedBuffers()).isEqualTo(2);
            PipelinedSubpartitionView view = null;
            if (createView) {
                view = partition.createReadView((BufferAvailabilityListener)new NoOpBufferAvailablityListener());
            }
            partition.release();
            Assertions.assertThat((int)partition.getNumberOfQueuedBuffers()).isZero();
            Assertions.assertThat((boolean)partition.isReleased()).isTrue();
            if (createView) {
                Assertions.assertThat((boolean)view.isReleased()).isTrue();
            }
            Assertions.assertThat((boolean)buffer1.isRecycled()).isTrue();
        }
        finally {
            buffer1Recycled = buffer1.isRecycled();
            if (!buffer1Recycled) {
                buffer1.close();
            }
            if (!(buffer2Recycled = buffer2.isRecycled())) {
                buffer2.close();
            }
        }
        ((AbstractBooleanAssert)Assertions.assertThat((boolean)buffer1Recycled).withFailMessage("buffer 1 not recycled", new Object[0])).isTrue();
        ((AbstractBooleanAssert)Assertions.assertThat((boolean)buffer2Recycled).withFailMessage("buffer 2 not recycled", new Object[0])).isTrue();
        Assertions.assertThat((long)partition.getTotalNumberOfBuffersUnsafe()).isEqualTo(2L);
        Assertions.assertThat((long)partition.getTotalNumberOfBytesUnsafe()).isZero();
    }

    @TestTemplate
    void testReleaseParent() throws Exception {
        PipelinedSubpartition partition = this.createSubpartition();
        this.verifyViewReleasedAfterParentRelease((ResultSubpartition)partition);
    }

    @TestTemplate
    void testNumberOfQueueBuffers() throws Exception {
        PipelinedSubpartition subpartition = this.createSubpartition();
        subpartition.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(4096));
        Assertions.assertThat((int)subpartition.getNumberOfQueuedBuffers()).isOne();
        subpartition.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(4096));
        Assertions.assertThat((int)subpartition.getNumberOfQueuedBuffers()).isEqualTo(2);
        subpartition.getNextBuffer();
        Assertions.assertThat((int)subpartition.getNumberOfQueuedBuffers()).isOne();
    }

    @TestTemplate
    public void testStartingBufferSize() throws Exception {
        int startingBufferSize = 1234;
        PipelinedSubpartition subpartition = PipelinedSubpartitionTest.createPipelinedSubpartition(startingBufferSize);
        Assert.assertEquals((long)startingBufferSize, (long)subpartition.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(4)));
    }

    @TestTemplate
    void testNewBufferSize() throws Exception {
        PipelinedSubpartition subpartition = this.createSubpartition();
        Assertions.assertThat((int)subpartition.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(4))).isEqualTo(Integer.MAX_VALUE);
        subpartition.bufferSize(42);
        Assertions.assertThat((int)subpartition.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(4))).isEqualTo(42);
    }

    @TestTemplate
    void testNegativeNewBufferSize() throws Exception {
        PipelinedSubpartition subpartition = this.createSubpartition();
        Assertions.assertThat((int)subpartition.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(4))).isEqualTo(Integer.MAX_VALUE);
        Assertions.assertThatThrownBy(() -> subpartition.bufferSize(-1)).isInstanceOf(IllegalArgumentException.class);
    }

    @TestTemplate
    void testNegativeBufferSizeAsSignOfAddingFail() throws Exception {
        PipelinedSubpartition subpartition = this.createSubpartition();
        Assertions.assertThat((int)subpartition.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(4))).isEqualTo(Integer.MAX_VALUE);
        subpartition.finish();
        Assertions.assertThat((int)subpartition.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(4))).isEqualTo(-1);
    }

    @TestTemplate
    void testProducerFailedException() {
        FailurePipelinedSubpartition subpartition = new FailurePipelinedSubpartition(0, 2, PartitionTestUtils.createPartition());
        PipelinedSubpartitionView view = subpartition.createReadView(new NoOpBufferAvailablityListener());
        ((AbstractThrowableAssert)Assertions.assertThat((Throwable)view.getFailureCause()).isNotNull()).isInstanceOf(CancelTaskException.class);
    }

    @TestTemplate
    void testConsumeTimeoutableCheckpointBarrierQuickly() throws Exception {
        PipelinedSubpartition subpartition = this.createSubpartition();
        subpartition.setChannelStateWriter(ChannelStateWriter.NO_OP);
        this.assertSubpartitionChannelStateFuturesAndQueuedBuffers(subpartition, null, true, 0L, false);
        this.testConsumeQuicklyWithNDataBuffers(0, subpartition, 5L);
        this.testConsumeQuicklyWithNDataBuffers(1, subpartition, 6L);
        this.testConsumeQuicklyWithNDataBuffers(2, subpartition, 7L);
    }

    private void testConsumeQuicklyWithNDataBuffers(int numberOfDataBuffers, PipelinedSubpartition subpartition, long checkpointId) throws Exception {
        for (int i = 0; i < numberOfDataBuffers; ++i) {
            subpartition.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(4096));
        }
        subpartition.add(this.getTimeoutableBarrierBuffer(checkpointId));
        Assertions.assertThat((long)subpartition.getChannelStateCheckpointId()).isEqualTo(checkpointId);
        CompletableFuture channelStateFuture = subpartition.getChannelStateFuture();
        this.assertSubpartitionChannelStateFuturesAndQueuedBuffers(subpartition, channelStateFuture, false, numberOfDataBuffers + 1, false);
        for (int i = 0; i < numberOfDataBuffers; ++i) {
            this.pollBufferAndCheckType(subpartition, Buffer.DataType.DATA_BUFFER);
        }
        this.pollBufferAndCheckType(subpartition, Buffer.DataType.TIMEOUTABLE_ALIGNED_CHECKPOINT_BARRIER);
        this.assertSubpartitionChannelStateFuturesAndQueuedBuffers(subpartition, channelStateFuture, true, 0L, true);
        FlinkAssertions.assertThatFuture((CompletableFuture)channelStateFuture).eventuallySucceeds().asList().isEmpty();
        subpartition.resumeConsumption();
    }

    @TestTemplate
    void testTimeoutAlignedToUnalignedBarrier() throws Exception {
        PipelinedSubpartition subpartition = this.createSubpartition();
        subpartition.setChannelStateWriter(ChannelStateWriter.NO_OP);
        this.assertSubpartitionChannelStateFuturesAndQueuedBuffers(subpartition, null, true, 0L, false);
        this.testTimeoutWithNDataBuffers(0, subpartition, 7L);
        this.testTimeoutWithNDataBuffers(1, subpartition, 8L);
    }

    private void testTimeoutWithNDataBuffers(int numberOfDataBuffers, PipelinedSubpartition subpartition, long checkpointId) throws Exception {
        ArrayList<Buffer> expectedBuffers = new ArrayList<Buffer>();
        for (int i = 0; i < numberOfDataBuffers; ++i) {
            BufferConsumer bufferConsumer = BufferBuilderTestUtils.createFilledFinishedBufferConsumer(4096);
            subpartition.add(bufferConsumer);
            expectedBuffers.add(bufferConsumer.copy().build());
        }
        subpartition.add(this.getTimeoutableBarrierBuffer(checkpointId));
        Assertions.assertThat((long)subpartition.getChannelStateCheckpointId()).isEqualTo(checkpointId);
        CompletableFuture channelStateFuture = subpartition.getChannelStateFuture();
        this.assertSubpartitionChannelStateFuturesAndQueuedBuffers(subpartition, channelStateFuture, false, numberOfDataBuffers + 1, false);
        subpartition.alignedBarrierTimeout(checkpointId);
        this.assertSubpartitionChannelStateFuturesAndQueuedBuffers(subpartition, channelStateFuture, true, numberOfDataBuffers + 1, true);
        this.pollBufferAndCheckType(subpartition, Buffer.DataType.PRIORITIZED_EVENT_BUFFER);
        for (int i = 0; i < numberOfDataBuffers; ++i) {
            this.pollBufferAndCheckType(subpartition, Buffer.DataType.DATA_BUFFER);
        }
        FlinkAssertions.assertThatFuture((CompletableFuture)channelStateFuture).eventuallySucceeds().isEqualTo(expectedBuffers);
    }

    private void pollBufferAndCheckType(PipelinedSubpartition subpartition, Buffer.DataType dataType) {
        ResultSubpartition.BufferAndBacklog barrierBuffer = subpartition.pollBuffer();
        Assertions.assertThat((Object)barrierBuffer).isNotNull();
        Assertions.assertThat((Comparable)barrierBuffer.buffer().getDataType()).isEqualTo((Object)dataType);
    }

    @TestTemplate
    void testConcurrentTimeoutableCheckpointBarrier() throws Exception {
        PipelinedSubpartition subpartition = this.createSubpartition();
        subpartition.setChannelStateWriter(ChannelStateWriter.NO_OP);
        subpartition.add(this.getTimeoutableBarrierBuffer(10L));
        Assertions.assertThat((long)subpartition.getChannelStateCheckpointId()).isEqualTo(10L);
        CompletableFuture checkpointFuture10 = subpartition.getChannelStateFuture();
        Assertions.assertThat((CompletableFuture)checkpointFuture10).isNotNull();
        subpartition.add(this.getTimeoutableBarrierBuffer(11L));
        Assertions.assertThatThrownBy(checkpointFuture10::get).hasCauseInstanceOf(IllegalStateException.class).isInstanceOf(ExecutionException.class);
    }

    private BufferConsumer getTimeoutableBarrierBuffer(long checkpointId) throws IOException {
        CheckpointOptions checkpointOptions = CheckpointOptions.alignedWithTimeout((SnapshotType)CheckpointType.CHECKPOINT, (CheckpointStorageLocationReference)CheckpointStorageLocationReference.getDefault(), (long)1000L);
        return EventSerializer.toBufferConsumer((AbstractEvent)new CheckpointBarrier(checkpointId, System.currentTimeMillis(), checkpointOptions), (boolean)false);
    }

    private void assertSubpartitionChannelStateFuturesAndQueuedBuffers(PipelinedSubpartition subpartition, CompletableFuture<List<Buffer>> channelStateFuture, boolean channelStateFutureIsNull, long numberOfQueuedBuffers, boolean expectedFutureIsDone) {
        Assertions.assertThat((subpartition.getChannelStateFuture() == null ? 1 : 0) != 0).isEqualTo(channelStateFutureIsNull);
        Assertions.assertThat((int)subpartition.getNumberOfQueuedBuffers()).isEqualTo(numberOfQueuedBuffers);
        if (channelStateFuture != null) {
            Assertions.assertThat((boolean)channelStateFuture.isDone()).isEqualTo(expectedFutureIsDone);
        }
    }

    private void verifyViewReleasedAfterParentRelease(ResultSubpartition partition) throws Exception {
        BufferConsumer bufferConsumer = BufferBuilderTestUtils.createFilledFinishedBufferConsumer(32768);
        partition.add(bufferConsumer);
        partition.finish();
        BufferAvailabilityListener listener = (BufferAvailabilityListener)Mockito.mock(BufferAvailabilityListener.class);
        ResultSubpartitionView view = partition.createReadView(listener);
        Assertions.assertThat((Object)view.getNextBuffer()).isNotNull();
        Assertions.assertThat((Object)view.getNextBuffer()).isNotNull();
        Assertions.assertThat((boolean)view.isReleased()).isFalse();
        partition.release();
        Assertions.assertThat((boolean)view.isReleased()).isTrue();
    }

    public static PipelinedSubpartition createPipelinedSubpartition(int startingBufferSize) {
        ResultPartition parent = PartitionTestUtils.createPartition();
        return new PipelinedSubpartition(0, 2, startingBufferSize, parent);
    }

    public static PipelinedSubpartition createPipelinedSubpartition() {
        ResultPartition parent = PartitionTestUtils.createPartition();
        return new PipelinedSubpartition(0, 2, Integer.MAX_VALUE, parent);
    }

    public static PipelinedSubpartition createPipelinedSubpartition(ResultPartition parent) {
        return new PipelinedSubpartition(0, 2, Integer.MAX_VALUE, parent);
    }

    private static class FailurePipelinedSubpartition
    extends PipelinedSubpartition {
        FailurePipelinedSubpartition(int index, int receiverExclusiveBuffersPerChannel, ResultPartition parent) {
            super(index, receiverExclusiveBuffersPerChannel, Integer.MAX_VALUE, parent);
        }

        Throwable getFailureCause() {
            return new RuntimeException("Expected test exception");
        }
    }
}

