package org.apache.flink.streaming.runtime.io.checkpointing;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.stream.IntStream;
import javax.annotation.Nullable;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
import org.apache.flink.runtime.io.network.TestingConnectionManager;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannelBuilder;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder;
import org.apache.flink.runtime.io.network.util.TestBufferFactory;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.operators.testutils.DummyCheckpointInvokable;
import org.apache.flink.streaming.api.operators.SyncMailboxExecutor;
import org.apache.flink.streaming.runtime.io.MockInputGate;
import org.apache.flink.util.clock.SystemClock;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/streaming/runtime/io/checkpointing/CheckpointBarrierTrackerTest.class */
public class CheckpointBarrierTrackerTest {
    private static final int PAGE_SIZE = 512;
    private CheckpointedInputGate inputGate;

    @After
    public void ensureEmpty() throws Exception {
        Assert.assertFalse(this.inputGate.pollNext().isPresent());
        Assert.assertTrue(this.inputGate.isFinished());
    }

    @Test
    public void testSingleChannelNoBarriers() throws Exception {
        BufferOrEvent[] bufferOrEventArr = {createBuffer(0), createBuffer(0), createBuffer(0)};
        this.inputGate = createCheckpointedInputGate(1, bufferOrEventArr);
        for (BufferOrEvent bufferOrEvent : bufferOrEventArr) {
            Assert.assertEquals(bufferOrEvent, this.inputGate.pollNext().get());
        }
    }

    @Test
    public void testMultiChannelNoBarriers() throws Exception {
        BufferOrEvent[] bufferOrEventArr = {createBuffer(2), createBuffer(2), createBuffer(0), createBuffer(1), createBuffer(0), createBuffer(3), createBuffer(1), createBuffer(1), createBuffer(2)};
        this.inputGate = createCheckpointedInputGate(4, bufferOrEventArr);
        for (BufferOrEvent bufferOrEvent : bufferOrEventArr) {
            Assert.assertEquals(bufferOrEvent, this.inputGate.pollNext().get());
        }
    }

    @Test
    public void testSingleChannelWithBarriers() throws Exception {
        BufferOrEvent[] bufferOrEventArr = {createBuffer(0), createBuffer(0), createBuffer(0), createBarrier(1L, 0), createBuffer(0), createBuffer(0), createBuffer(0), createBuffer(0), createBarrier(2L, 0), createBarrier(3L, 0), createBuffer(0), createBuffer(0), createBarrier(4L, 0), createBarrier(5L, 0), createBarrier(6L, 0), createBuffer(0)};
        this.inputGate = createCheckpointedInputGate(1, bufferOrEventArr, new CheckpointSequenceValidator(1, 2, 3, 4, 5, 6));
        for (BufferOrEvent bufferOrEvent : bufferOrEventArr) {
            Assert.assertEquals(bufferOrEvent, this.inputGate.pollNext().get());
        }
    }

    @Test
    public void testSingleChannelWithSkippedBarriers() throws Exception {
        BufferOrEvent[] bufferOrEventArr = {createBuffer(0), createBarrier(1L, 0), createBuffer(0), createBuffer(0), createBarrier(3L, 0), createBuffer(0), createBarrier(4L, 0), createBarrier(6L, 0), createBuffer(0), createBarrier(7L, 0), createBuffer(0), createBarrier(10L, 0), createBuffer(0)};
        this.inputGate = createCheckpointedInputGate(1, bufferOrEventArr, new CheckpointSequenceValidator(1, 3, 4, 6, 7, 10));
        for (BufferOrEvent bufferOrEvent : bufferOrEventArr) {
            Assert.assertEquals(bufferOrEvent, this.inputGate.pollNext().get());
        }
    }

    @Test
    public void testMultiChannelWithBarriers() throws Exception {
        BufferOrEvent[] bufferOrEventArr = {createBuffer(0), createBuffer(2), createBuffer(0), createBarrier(1L, 1), createBarrier(1L, 2), createBuffer(2), createBuffer(1), createBarrier(1L, 0), createBuffer(0), createBuffer(0), createBuffer(1), createBuffer(1), createBuffer(2), createBarrier(2L, 0), createBarrier(2L, 1), createBarrier(2L, 2), createBuffer(2), createBuffer(2), createBarrier(3L, 2), createBuffer(2), createBuffer(2), createBarrier(3L, 0), createBarrier(3L, 1), createBarrier(4L, 1), createBarrier(4L, 2), createBarrier(4L, 0), createBuffer(0)};
        this.inputGate = createCheckpointedInputGate(3, bufferOrEventArr, new CheckpointSequenceValidator(1, 2, 3, 4));
        for (BufferOrEvent bufferOrEvent : bufferOrEventArr) {
            Assert.assertEquals(bufferOrEvent, this.inputGate.pollNext().get());
        }
    }

    @Test
    public void testMultiChannelSkippingCheckpoints() throws Exception {
        BufferOrEvent[] bufferOrEventArr = {createBuffer(0), createBuffer(2), createBuffer(0), createBarrier(1L, 1), createBarrier(1L, 2), createBuffer(2), createBuffer(1), createBarrier(1L, 0), createBuffer(0), createBuffer(0), createBuffer(1), createBuffer(1), createBuffer(2), createBarrier(2L, 0), createBarrier(2L, 1), createBarrier(2L, 2), createBuffer(2), createBuffer(2), createBarrier(3L, 2), createBuffer(2), createBuffer(2), createBarrier(4L, 0), createBuffer(0), createBuffer(1), createBuffer(2), createBarrier(4L, 1), createBuffer(1), createBarrier(4L, 2), createBuffer(0)};
        this.inputGate = createCheckpointedInputGate(3, bufferOrEventArr, new CheckpointSequenceValidator(1, 2, 4));
        for (BufferOrEvent bufferOrEvent : bufferOrEventArr) {
            Assert.assertEquals(bufferOrEvent, this.inputGate.pollNext().get());
        }
    }

    @Test
    public void testCompleteCheckpointsOnLateBarriers() throws Exception {
        BufferOrEvent[] bufferOrEventArr = {createBuffer(1), createBuffer(1), createBuffer(0), createBuffer(2), createBarrier(2L, 1), createBarrier(2L, 0), createBarrier(2L, 2), createBuffer(1), createBuffer(0), createBarrier(3L, 1), createBarrier(3L, 2), createBuffer(1), createBuffer(0), createBarrier(4L, 2), createBarrier(4L, 1), createBuffer(1), createBuffer(2), createBarrier(3L, 0), createBuffer(0), createBarrier(4L, 0), createBuffer(1), createBuffer(2), createBarrier(5L, 1), createBuffer(0), createBarrier(5L, 0), createBuffer(1), createBarrier(5L, 2), createBuffer(1), createBarrier(6L, 1), createBuffer(0), createBarrier(6L, 0), createBuffer(1), createBarrier(7L, 1), createBuffer(0), createBarrier(7L, 2), createBuffer(2), createBarrier(8L, 2), createBuffer(0), createBarrier(8L, 1), createBuffer(1), createBarrier(9L, 1), createBarrier(7L, 0), createBuffer(0), createBarrier(9L, 2), createBuffer(2), createBarrier(10L, 2), createBarrier(8L, 0), createBuffer(1), createBuffer(2), createBarrier(9L, 0), createBuffer(1), createBuffer(0), createBuffer(2), createBarrier(10L, 0), createBarrier(10L, 1)};
        this.inputGate = createCheckpointedInputGate(3, bufferOrEventArr, new CheckpointSequenceValidator(2, 3, 4, 5, 7, 8, 9, 10));
        for (BufferOrEvent bufferOrEvent : bufferOrEventArr) {
            Assert.assertEquals(bufferOrEvent, this.inputGate.pollNext().get());
        }
    }

    @Test
    public void testSingleChannelAbortCheckpoint() throws Exception {
        BufferOrEvent[] bufferOrEventArr = {createBuffer(0), createBarrier(1L, 0), createBuffer(0), createBarrier(2L, 0), createCancellationBarrier(4L, 0), createBarrier(5L, 0), createBuffer(0), createCancellationBarrier(6L, 0), createBuffer(0)};
        this.inputGate = createCheckpointedInputGate(1, bufferOrEventArr, new CheckpointSequenceValidator(1, 2, -4, 5, -6));
        for (BufferOrEvent bufferOrEvent : bufferOrEventArr) {
            Assert.assertEquals(bufferOrEvent, this.inputGate.pollNext().get());
        }
    }

    @Test
    public void testMultiChannelAbortCheckpoint() throws Exception {
        BufferOrEvent[] bufferOrEventArr = {createBuffer(0), createBuffer(2), createBuffer(0), createBarrier(1L, 1), createBarrier(1L, 2), createBuffer(2), createBuffer(1), createBarrier(1L, 0), createBuffer(0), createBuffer(2), createBarrier(2L, 0), createBarrier(2L, 2), createBuffer(0), createBuffer(2), createCancellationBarrier(2L, 1), createBuffer(2), createBuffer(1), createBarrier(3L, 1), createBarrier(3L, 2), createBarrier(3L, 0), createBuffer(0), createBuffer(1), createCancellationBarrier(4L, 1), createBarrier(4L, 2), createBuffer(0), createBarrier(4L, 0), createBuffer(0), createBuffer(1), createBuffer(2), createBarrier(5L, 2), createBarrier(5L, 1), createBarrier(5L, 0), createBuffer(0), createBuffer(1), createCancellationBarrier(6L, 1), createCancellationBarrier(6L, 2), createBarrier(6L, 0), createBuffer(0)};
        this.inputGate = createCheckpointedInputGate(3, bufferOrEventArr, new CheckpointSequenceValidator(1, -2, 3, -4, 5, -6));
        for (BufferOrEvent bufferOrEvent : bufferOrEventArr) {
            Assert.assertEquals(bufferOrEvent, this.inputGate.pollNext().get());
        }
    }

    @Test
    public void testInterleavedCancellationBarriers() throws Exception {
        BufferOrEvent[] bufferOrEventArr = {createBarrier(1L, 0), createCancellationBarrier(2L, 0), createCancellationBarrier(1L, 1), createCancellationBarrier(2L, 1), createCancellationBarrier(1L, 2), createCancellationBarrier(2L, 2), createBuffer(0)};
        this.inputGate = createCheckpointedInputGate(3, bufferOrEventArr, new CheckpointSequenceValidator(-1, -2));
        for (BufferOrEvent bufferOrEvent : bufferOrEventArr) {
            Assert.assertEquals(bufferOrEvent, this.inputGate.pollNext().get());
        }
    }

    @Test
    public void testMetrics() throws Exception {
        ArrayList arrayList = new ArrayList();
        ValidatingCheckpointHandler validatingCheckpointHandler = new ValidatingCheckpointHandler();
        this.inputGate = createCheckpointedInputGate(3, validatingCheckpointHandler);
        int[] iArr = new int[3];
        long currentTimeMillis = System.currentTimeMillis();
        long nanoTime = System.nanoTime();
        Thread.sleep(10L);
        UnalignedCheckpointsTest.addSequence(this.inputGate, arrayList, iArr, createBuffer(0, 100), createBuffer(1, 100), createBuffer(2, 100), createBarrier(1L, 1, currentTimeMillis), createBuffer(0, 100), createBuffer(2, 100), createBarrier(1L, 0), createBuffer(2, 100));
        Thread.sleep(10L);
        UnalignedCheckpointsTest.addSequence(this.inputGate, arrayList, iArr, createBarrier(1L, 2), createBuffer(0, 100), createBuffer(1, 100), createBuffer(2, 100), createEndOfPartition(0), createEndOfPartition(1), createEndOfPartition(2));
        long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
        long nanoTime2 = System.nanoTime() - nanoTime;
        Assert.assertThat(Long.valueOf(this.inputGate.getCheckpointStartDelayNanos() / 1000000), Matchers.is(Matchers.both(Matchers.greaterThanOrEqualTo(10L)).and(Matchers.lessThanOrEqualTo(Long.valueOf(currentTimeMillis2)))));
        Assert.assertTrue(validatingCheckpointHandler.getLastAlignmentDurationNanos().isDone());
        Assert.assertThat(Long.valueOf(validatingCheckpointHandler.getLastAlignmentDurationNanos().get().longValue() / 1000000), Matchers.is(Matchers.both(Matchers.greaterThanOrEqualTo(10L)).and(Matchers.lessThanOrEqualTo(Long.valueOf(nanoTime2)))));
        Assert.assertTrue(validatingCheckpointHandler.getLastBytesProcessedDuringAlignment().isDone());
        Assert.assertThat(validatingCheckpointHandler.getLastBytesProcessedDuringAlignment().get(), Matchers.equalTo(Long.valueOf(3 * 100)));
    }

    @Test
    public void testSingleChannelMetrics() throws Exception {
        ArrayList arrayList = new ArrayList();
        ValidatingCheckpointHandler validatingCheckpointHandler = new ValidatingCheckpointHandler();
        this.inputGate = createCheckpointedInputGate(1, validatingCheckpointHandler);
        long currentTimeMillis = System.currentTimeMillis();
        Thread.sleep(10L);
        UnalignedCheckpointsTest.addSequence(this.inputGate, arrayList, new int[1], createBuffer(0, 100), createBarrier(1L, 0, currentTimeMillis), createBuffer(0, 100), createEndOfPartition(0));
        long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
        Assert.assertThat(Long.valueOf(this.inputGate.getCheckpointStartDelayNanos() / 1000000), Matchers.greaterThanOrEqualTo(10L));
        Assert.assertThat(Long.valueOf(this.inputGate.getCheckpointStartDelayNanos() / 1000000), Matchers.lessThanOrEqualTo(Long.valueOf(currentTimeMillis2)));
        Assert.assertTrue(validatingCheckpointHandler.getLastAlignmentDurationNanos().isDone());
        Assert.assertThat(validatingCheckpointHandler.getLastAlignmentDurationNanos().get(), Matchers.equalTo(0L));
        Assert.assertTrue(validatingCheckpointHandler.getLastBytesProcessedDuringAlignment().isDone());
        Assert.assertThat(validatingCheckpointHandler.getLastBytesProcessedDuringAlignment().get(), Matchers.equalTo(0L));
    }

    private CheckpointedInputGate createCheckpointedInputGate(int i, AbstractInvokable abstractInvokable) throws IOException {
        NettyShuffleEnvironment build = new NettyShuffleEnvironmentBuilder().build();
        SingleInputGate build2 = new SingleInputGateBuilder().setNumberOfChannels(i).setupBufferPoolFactory(build).build();
        build2.setInputChannels((InputChannel[]) IntStream.range(0, i).mapToObj(i2 -> {
            return InputChannelBuilder.newBuilder().setChannelIndex(i2).setupFromNettyShuffleEnvironment(build).setConnectionManager(new TestingConnectionManager()).buildRemoteChannel(build2);
        }).toArray(i3 -> {
            return new RemoteInputChannel[i3];
        }));
        build2.setup();
        build2.requestPartitions();
        return createCheckpointedInputGate((IndexedInputGate) build2, abstractInvokable);
    }

    private static CheckpointedInputGate createCheckpointedInputGate(int i, BufferOrEvent[] bufferOrEventArr) {
        return createCheckpointedInputGate(i, bufferOrEventArr, new DummyCheckpointInvokable());
    }

    private static CheckpointedInputGate createCheckpointedInputGate(int i, BufferOrEvent[] bufferOrEventArr, @Nullable AbstractInvokable abstractInvokable) {
        return createCheckpointedInputGate(new MockInputGate(i, Arrays.asList(bufferOrEventArr)), abstractInvokable);
    }

    private static CheckpointedInputGate createCheckpointedInputGate(IndexedInputGate indexedInputGate, @Nullable AbstractInvokable abstractInvokable) {
        return new CheckpointedInputGate(indexedInputGate, new CheckpointBarrierTracker(indexedInputGate.getNumberOfInputChannels(), abstractInvokable, SystemClock.getInstance()), new SyncMailboxExecutor());
    }

    private static BufferOrEvent createBarrier(long j, int i) {
        return createBarrier(j, i, System.currentTimeMillis());
    }

    private static BufferOrEvent createBarrier(long j, int i, long j2) {
        return new BufferOrEvent(new CheckpointBarrier(j, j2, CheckpointOptions.forCheckpointWithDefaultLocation()), new InputChannelInfo(0, i));
    }

    private static BufferOrEvent createCancellationBarrier(long j, int i) {
        return new BufferOrEvent(new CancelCheckpointMarker(j), new InputChannelInfo(0, i));
    }

    private static BufferOrEvent createBuffer(int i) {
        return new BufferOrEvent(new NetworkBuffer(MemorySegmentFactory.wrap(new byte[]{1, 2}), FreeingBufferRecycler.INSTANCE), new InputChannelInfo(0, i));
    }

    private static BufferOrEvent createBuffer(int i, int i2) {
        return new BufferOrEvent(TestBufferFactory.createBuffer(i2), new InputChannelInfo(0, i));
    }

    private static BufferOrEvent createEndOfPartition(int i) {
        return new BufferOrEvent(EndOfPartitionEvent.INSTANCE, new InputChannelInfo(0, i));
    }
}
