/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.io.checkpointing;

import java.util.Arrays;
import java.util.List;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetricsBuilder;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.event.RuntimeEvent;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.partition.consumer.CheckpointableInput;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannelBuilder;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler;
import org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinator;
import org.apache.flink.streaming.runtime.tasks.TestSubtaskCheckpointCoordinator;
import org.apache.flink.util.clock.Clock;
import org.apache.flink.util.clock.SystemClock;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class UnalignedCheckpointsCancellationTest {
    private final List<RuntimeEvent> events;
    private final boolean expectTriggerCheckpoint;
    private final boolean expectAbortCheckpoint;
    private final int numChannels;
    private final int channel;

    public UnalignedCheckpointsCancellationTest(boolean expectTriggerCheckpoint, boolean expectAbortCheckpoint, List<RuntimeEvent> events, int numChannels, int channel) {
        this.events = events;
        this.expectTriggerCheckpoint = expectTriggerCheckpoint;
        this.expectAbortCheckpoint = expectAbortCheckpoint;
        this.numChannels = numChannels;
        this.channel = channel;
    }

    @Parameterized.Parameters(name="expect trigger: {0}, expect abort {1}, numChannels: {3}, chan: {4}, events: {2}")
    public static Object[][] parameters() {
        return new Object[][]{{false, true, Arrays.asList(UnalignedCheckpointsCancellationTest.cancel(10), UnalignedCheckpointsCancellationTest.cancel(20)), 1, 0}, {false, true, Arrays.asList(UnalignedCheckpointsCancellationTest.cancel(20), UnalignedCheckpointsCancellationTest.cancel(10)), 1, 0}, {false, true, Arrays.asList(UnalignedCheckpointsCancellationTest.cancel(10), UnalignedCheckpointsCancellationTest.checkpoint(10)), 1, 0}, {true, true, Arrays.asList(UnalignedCheckpointsCancellationTest.cancel(10), UnalignedCheckpointsCancellationTest.checkpoint(20)), 1, 0}, {false, true, Arrays.asList(UnalignedCheckpointsCancellationTest.cancel(20), UnalignedCheckpointsCancellationTest.checkpoint(10)), 1, 0}, {true, false, Arrays.asList(UnalignedCheckpointsCancellationTest.checkpoint(10), UnalignedCheckpointsCancellationTest.checkpoint(10)), 1, 0}, {true, false, Arrays.asList(UnalignedCheckpointsCancellationTest.checkpoint(10), UnalignedCheckpointsCancellationTest.checkpoint(20)), 1, 0}, {true, true, Arrays.asList(UnalignedCheckpointsCancellationTest.checkpoint(10), UnalignedCheckpointsCancellationTest.checkpoint(20)), 2, 0}, {true, false, Arrays.asList(UnalignedCheckpointsCancellationTest.checkpoint(20), UnalignedCheckpointsCancellationTest.checkpoint(10)), 1, 0}, {true, false, Arrays.asList(UnalignedCheckpointsCancellationTest.checkpoint(10), UnalignedCheckpointsCancellationTest.cancel(10)), 1, 0}, {true, true, Arrays.asList(UnalignedCheckpointsCancellationTest.checkpoint(10), UnalignedCheckpointsCancellationTest.cancel(10)), 2, 0}, {true, true, Arrays.asList(UnalignedCheckpointsCancellationTest.checkpoint(10), UnalignedCheckpointsCancellationTest.cancel(20)), 1, 0}, {true, false, Arrays.asList(UnalignedCheckpointsCancellationTest.checkpoint(20), UnalignedCheckpointsCancellationTest.cancel(10)), 1, 0}};
    }

    @Test
    public void test() throws Exception {
        TestInvokable invokable = new TestInvokable();
        SingleInputGate inputGate = new SingleInputGateBuilder().setNumberOfChannels(this.numChannels).setChannelFactory(InputChannelBuilder::buildLocalChannel).build();
        SingleCheckpointBarrierHandler unaligner = SingleCheckpointBarrierHandler.createUnalignedCheckpointBarrierHandler((SubtaskCheckpointCoordinator)TestSubtaskCheckpointCoordinator.INSTANCE, (String)"test", (AbstractInvokable)invokable, (Clock)SystemClock.getInstance(), (CheckpointableInput[])new CheckpointableInput[]{inputGate});
        for (RuntimeEvent e : this.events) {
            if (e instanceof CancelCheckpointMarker) {
                unaligner.processCancellationBarrier((CancelCheckpointMarker)e);
                continue;
            }
            if (e instanceof CheckpointBarrier) {
                unaligner.processBarrier((CheckpointBarrier)e, new InputChannelInfo(0, this.channel));
                continue;
            }
            throw new IllegalArgumentException("unexpected event type: " + e);
        }
        Assert.assertEquals((String)"expectAbortCheckpoint", (Object)this.expectAbortCheckpoint, (Object)invokable.checkpointAborted);
        Assert.assertEquals((String)"expectTriggerCheckpoint", (Object)this.expectTriggerCheckpoint, (Object)invokable.checkpointTriggered);
    }

    private static CheckpointBarrier checkpoint(int checkpointId) {
        return new CheckpointBarrier((long)checkpointId, 1L, CheckpointOptions.forCheckpointWithDefaultLocation().toUnaligned());
    }

    private static CancelCheckpointMarker cancel(int checkpointId) {
        return new CancelCheckpointMarker((long)checkpointId);
    }

    private static class TestInvokable
    extends AbstractInvokable {
        private boolean checkpointAborted;
        private boolean checkpointTriggered;

        TestInvokable() {
            super((Environment)new DummyEnvironment());
        }

        public void invoke() {
        }

        public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetricsBuilder checkpointMetrics) {
            this.checkpointTriggered = true;
        }

        public void abortCheckpointOnBarrier(long checkpointId, CheckpointException cause) {
            this.checkpointAborted = true;
        }
    }
}

