package org.apache.flink.runtime.io.network.partition.consumer;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.checkpoint.channel.RecordingChannelStateWriter;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/consumer/ChannelStatePersisterTest.class */
public class ChannelStatePersisterTest {
    @Test
    public void testNewBarrierNotOverwrittenByStopPersisting() throws Exception {
        RecordingChannelStateWriter recordingChannelStateWriter = new RecordingChannelStateWriter();
        ChannelStatePersister channelStatePersister = new ChannelStatePersister(recordingChannelStateWriter, new InputChannelInfo(0, 0));
        recordingChannelStateWriter.start(1L, CheckpointOptions.unaligned(CheckpointStorageLocationReference.getDefault()));
        channelStatePersister.checkForBarrier(barrier(1L));
        channelStatePersister.startPersisting(1L, Arrays.asList(BufferBuilderTestUtils.buildSomeBuffer()));
        Assert.assertEquals(1L, recordingChannelStateWriter.getAddedInput().get(r0).size());
        channelStatePersister.maybePersist(BufferBuilderTestUtils.buildSomeBuffer());
        Assert.assertEquals(1L, recordingChannelStateWriter.getAddedInput().get(r0).size());
        channelStatePersister.checkForBarrier(barrier(1 + 1));
        channelStatePersister.maybePersist(BufferBuilderTestUtils.buildSomeBuffer());
        channelStatePersister.stopPersisting(1L);
        channelStatePersister.maybePersist(BufferBuilderTestUtils.buildSomeBuffer());
        Assert.assertEquals(1L, recordingChannelStateWriter.getAddedInput().get(r0).size());
        Assert.assertTrue(channelStatePersister.hasBarrierReceived());
    }

    @Test
    public void testNewBarrierNotOverwrittenByCheckForBarrier() throws Exception {
        ChannelStatePersister channelStatePersister = new ChannelStatePersister(ChannelStateWriter.NO_OP, new InputChannelInfo(0, 0));
        channelStatePersister.startPersisting(1L, Collections.emptyList());
        channelStatePersister.startPersisting(2L, Collections.emptyList());
        Assert.assertFalse(channelStatePersister.checkForBarrier(barrier(1L)).isPresent());
        Assert.assertFalse(channelStatePersister.hasBarrierReceived());
    }

    @Test
    public void testLateBarrierOnStartedAndCancelledCheckpoint() throws Exception {
        testLateBarrier(true, true);
    }

    @Test
    public void testLateBarrierOnCancelledCheckpoint() throws Exception {
        testLateBarrier(false, true);
    }

    @Test
    public void testLateBarrierOnNotYetCancelledCheckpoint() throws Exception {
        testLateBarrier(false, false);
    }

    private void testLateBarrier(boolean z, boolean z2) throws Exception {
        RecordingChannelStateWriter recordingChannelStateWriter = new RecordingChannelStateWriter();
        ChannelStatePersister channelStatePersister = new ChannelStatePersister(recordingChannelStateWriter, new InputChannelInfo(0, 0));
        if (z) {
            channelStatePersister.startPersisting(1L, Collections.emptyList());
        }
        if (z2) {
            channelStatePersister.stopPersisting(1L);
        }
        channelStatePersister.checkForBarrier(barrier(1L));
        recordingChannelStateWriter.start(2L, CheckpointOptions.unaligned(CheckpointStorageLocationReference.getDefault()));
        channelStatePersister.startPersisting(2L, Arrays.asList(BufferBuilderTestUtils.buildSomeBuffer()));
        channelStatePersister.maybePersist(BufferBuilderTestUtils.buildSomeBuffer());
        channelStatePersister.checkForBarrier(barrier(2L));
        channelStatePersister.maybePersist(BufferBuilderTestUtils.buildSomeBuffer());
        Assert.assertTrue(channelStatePersister.hasBarrierReceived());
        Assert.assertEquals(2L, recordingChannelStateWriter.getAddedInput().get(r0).size());
    }

    @Test(expected = CheckpointException.class)
    public void testLateBarrierTriggeringCheckpoint() throws Exception {
        ChannelStatePersister channelStatePersister = new ChannelStatePersister(ChannelStateWriter.NO_OP, new InputChannelInfo(0, 0));
        channelStatePersister.checkForBarrier(barrier(2L));
        channelStatePersister.startPersisting(1L, Collections.emptyList());
    }

    private static Buffer barrier(long j) throws IOException {
        return EventSerializer.toBuffer(new CheckpointBarrier(j, 1L, CheckpointOptions.forCheckpointWithDefaultLocation()), true);
    }
}
