/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.executiongraph;

import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import org.apache.flink.api.common.InputDependencyConstraint;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
import org.apache.flink.runtime.executiongraph.TestRestartStrategy;
import org.apache.flink.runtime.executiongraph.TestingExecutionGraphBuilder;
import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;

public class ExecutionVertexInputConstraintTest
extends TestLogger {
    private ComponentMainThreadExecutor mainThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forMainThread();

    @Test
    public void testInputConsumable() throws Exception {
        List<JobVertex> vertices = ExecutionVertexInputConstraintTest.createOrderedVertices();
        ExecutionGraph eg = ExecutionVertexInputConstraintTest.createExecutionGraph(vertices, InputDependencyConstraint.ALL);
        ExecutionVertex ev11 = eg.getJobVertex(vertices.get(0).getID()).getTaskVertices()[0];
        ExecutionVertex ev21 = eg.getJobVertex(vertices.get(1).getID()).getTaskVertices()[0];
        ExecutionVertex ev22 = eg.getJobVertex(vertices.get(1).getID()).getTaskVertices()[1];
        ExecutionVertex ev31 = eg.getJobVertex(vertices.get(2).getID()).getTaskVertices()[0];
        ExecutionVertex ev32 = eg.getJobVertex(vertices.get(2).getID()).getTaskVertices()[1];
        eg.start(this.mainThreadExecutor);
        eg.scheduleForExecution();
        Assert.assertFalse((boolean)ev31.isInputConsumable(0));
        Assert.assertFalse((boolean)ev31.isInputConsumable(1));
        IntermediateResultPartition partition11 = (IntermediateResultPartition)ev11.getProducedPartitions().values().iterator().next();
        ev11.scheduleOrUpdateConsumers(new ResultPartitionID(partition11.getPartitionId(), ev11.getCurrentExecutionAttempt().getAttemptId()));
        Assert.assertTrue((boolean)ev31.isInputConsumable(0));
        Assert.assertFalse((boolean)ev32.isInputConsumable(0));
        ev21.getCurrentExecutionAttempt().markFinished();
        Assert.assertFalse((boolean)ev31.isInputConsumable(1));
        ev22.getCurrentExecutionAttempt().markFinished();
        Assert.assertTrue((boolean)ev31.isInputConsumable(1));
        ev11.fail((Throwable)new Exception());
        this.waitUntilJobRestarted(eg);
        Assert.assertFalse((boolean)ev31.isInputConsumable(0));
        Assert.assertFalse((boolean)ev31.isInputConsumable(1));
    }

    @Test
    public void testInputConstraintANY() throws Exception {
        List<JobVertex> vertices = ExecutionVertexInputConstraintTest.createOrderedVertices();
        ExecutionGraph eg = ExecutionVertexInputConstraintTest.createExecutionGraph(vertices, InputDependencyConstraint.ANY);
        ExecutionVertex ev11 = eg.getJobVertex(vertices.get(0).getID()).getTaskVertices()[0];
        ExecutionVertex ev21 = eg.getJobVertex(vertices.get(1).getID()).getTaskVertices()[0];
        ExecutionVertex ev22 = eg.getJobVertex(vertices.get(1).getID()).getTaskVertices()[1];
        ExecutionVertex ev31 = eg.getJobVertex(vertices.get(2).getID()).getTaskVertices()[0];
        eg.start(this.mainThreadExecutor);
        eg.scheduleForExecution();
        Assert.assertFalse((boolean)ev31.checkInputDependencyConstraints());
        IntermediateResultPartition partition11 = (IntermediateResultPartition)ev11.getProducedPartitions().values().iterator().next();
        ev11.scheduleOrUpdateConsumers(new ResultPartitionID(partition11.getPartitionId(), ev11.getCurrentExecutionAttempt().getAttemptId()));
        Assert.assertTrue((boolean)ev31.checkInputDependencyConstraints());
        ev11.fail((Throwable)new Exception());
        this.waitUntilJobRestarted(eg);
        Assert.assertFalse((boolean)ev31.checkInputDependencyConstraints());
        ExecutionGraphTestUtils.waitUntilExecutionVertexState(ev21, ExecutionState.DEPLOYING, 2000L);
        ExecutionGraphTestUtils.waitUntilExecutionVertexState(ev22, ExecutionState.DEPLOYING, 2000L);
        ev21.getCurrentExecutionAttempt().markFinished();
        ev22.getCurrentExecutionAttempt().markFinished();
        Assert.assertTrue((boolean)ev31.checkInputDependencyConstraints());
    }

    @Test
    public void testInputConstraintALL() throws Exception {
        List<JobVertex> vertices = ExecutionVertexInputConstraintTest.createOrderedVertices();
        ExecutionGraph eg = ExecutionVertexInputConstraintTest.createExecutionGraph(vertices, InputDependencyConstraint.ALL);
        ExecutionVertex ev11 = eg.getJobVertex(vertices.get(0).getID()).getTaskVertices()[0];
        ExecutionVertex ev21 = eg.getJobVertex(vertices.get(1).getID()).getTaskVertices()[0];
        ExecutionVertex ev22 = eg.getJobVertex(vertices.get(1).getID()).getTaskVertices()[1];
        ExecutionVertex ev31 = eg.getJobVertex(vertices.get(2).getID()).getTaskVertices()[0];
        eg.start(this.mainThreadExecutor);
        eg.scheduleForExecution();
        Assert.assertFalse((boolean)ev31.checkInputDependencyConstraints());
        IntermediateResultPartition partition11 = (IntermediateResultPartition)ev11.getProducedPartitions().values().iterator().next();
        ev11.scheduleOrUpdateConsumers(new ResultPartitionID(partition11.getPartitionId(), ev11.getCurrentExecutionAttempt().getAttemptId()));
        Assert.assertFalse((boolean)ev31.checkInputDependencyConstraints());
        ev21.getCurrentExecutionAttempt().markFinished();
        ev22.getCurrentExecutionAttempt().markFinished();
        Assert.assertTrue((boolean)ev31.checkInputDependencyConstraints());
        ev11.fail((Throwable)new Exception());
        this.waitUntilJobRestarted(eg);
        Assert.assertFalse((boolean)ev31.checkInputDependencyConstraints());
    }

    @Test
    public void testInputConstraintALLPerformance() throws Exception {
        int parallelism = 1000;
        JobVertex v1 = ExecutionVertexInputConstraintTest.createVertexWithAllInputConstraints("vertex1", 1000);
        JobVertex v2 = ExecutionVertexInputConstraintTest.createVertexWithAllInputConstraints("vertex2", 1000);
        JobVertex v3 = ExecutionVertexInputConstraintTest.createVertexWithAllInputConstraints("vertex3", 1000);
        v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
        v2.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
        ExecutionGraph eg = ExecutionVertexInputConstraintTest.createExecutionGraph(Arrays.asList(v1, v2, v3), InputDependencyConstraint.ALL, 3000);
        eg.start(this.mainThreadExecutor);
        eg.scheduleForExecution();
        for (int i = 0; i < 999; ++i) {
            ExecutionVertexInputConstraintTest.finishSubtask(eg, v1.getID(), i);
        }
        long startTime = System.nanoTime();
        ExecutionVertexInputConstraintTest.finishSubtask(eg, v1.getID(), 999);
        Duration duration = Duration.ofNanos(System.nanoTime() - startTime);
        Duration timeout = Duration.ofSeconds(5L);
        Assert.assertThat((Object)duration, (Matcher)Matchers.lessThan((Comparable)timeout));
    }

    private static JobVertex createVertexWithAllInputConstraints(String name, int parallelism) {
        JobVertex v = new JobVertex(name);
        v.setParallelism(parallelism);
        v.setInvokableClass(AbstractInvokable.class);
        v.setInputDependencyConstraint(InputDependencyConstraint.ALL);
        return v;
    }

    private static void finishSubtask(ExecutionGraph graph, JobVertexID jvId, int subtask) {
        ExecutionVertex[] vertices = graph.getJobVertex(jvId).getTaskVertices();
        graph.updateState(new TaskExecutionState(graph.getJobID(), vertices[subtask].getCurrentExecutionAttempt().getAttemptId(), ExecutionState.FINISHED));
    }

    private static List<JobVertex> createOrderedVertices() {
        JobVertex v1 = new JobVertex("vertex1");
        JobVertex v2 = new JobVertex("vertex2");
        JobVertex v3 = new JobVertex("vertex3");
        v1.setParallelism(2);
        v2.setParallelism(2);
        v3.setParallelism(2);
        v1.setInvokableClass(AbstractInvokable.class);
        v2.setInvokableClass(AbstractInvokable.class);
        v3.setInvokableClass(AbstractInvokable.class);
        v3.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
        v3.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
        return Arrays.asList(v1, v2, v3);
    }

    private static ExecutionGraph createExecutionGraph(List<JobVertex> orderedVertices, InputDependencyConstraint inputDependencyConstraint) throws Exception {
        return ExecutionVertexInputConstraintTest.createExecutionGraph(orderedVertices, inputDependencyConstraint, 20);
    }

    private static ExecutionGraph createExecutionGraph(List<JobVertex> orderedVertices, InputDependencyConstraint inputDependencyConstraint, int numSlots) throws Exception {
        for (JobVertex vertex : orderedVertices) {
            vertex.setInputDependencyConstraint(inputDependencyConstraint);
        }
        JobGraph jobGraph = new JobGraph(orderedVertices.toArray(new JobVertex[0]));
        SimpleSlotProvider slotProvider = new SimpleSlotProvider(numSlots);
        return TestingExecutionGraphBuilder.newBuilder().setJobGraph(jobGraph).setRestartStrategy(TestRestartStrategy.directExecuting()).setSlotProvider(slotProvider).build();
    }

    private void waitUntilJobRestarted(ExecutionGraph eg) throws Exception {
        ExecutionGraphTestUtils.waitForAllExecutionsPredicate(eg, ExecutionGraphTestUtils.isInExecutionState(ExecutionState.CANCELING).or(ExecutionGraphTestUtils.isInExecutionState(ExecutionState.CANCELED)).or(ExecutionGraphTestUtils.isInExecutionState(ExecutionState.FAILED)).or(ExecutionGraphTestUtils.isInExecutionState(ExecutionState.FINISHED)), 2000L);
        for (ExecutionVertex ev : eg.getAllExecutionVertices()) {
            if (ev.getCurrentExecutionAttempt().getState() != ExecutionState.CANCELING) continue;
            ev.getCurrentExecutionAttempt().completeCancelling();
        }
        ExecutionGraphTestUtils.waitUntilJobStatus(eg, JobStatus.RUNNING, 2000L);
    }
}

