/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler.adaptivebatch;

import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.DefaultExecutionGraph;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.IOMetrics;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.scheduler.SchedulerBase;
import org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchSchedulerTestUtils;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Test;

public class AdaptiveBatchSchedulerTest
extends TestLogger {
    private static final int SOURCE_PARALLELISM_1 = 6;
    private static final int SOURCE_PARALLELISM_2 = 4;
    private static final ComponentMainThreadExecutor mainThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forMainThread();

    @Test
    public void testAdaptiveBatchScheduler() throws Exception {
        JobGraph jobGraph = this.createJobGraph(false);
        Iterator jobVertexIterator = jobGraph.getVertices().iterator();
        JobVertex source1 = (JobVertex)jobVertexIterator.next();
        JobVertex source2 = (JobVertex)jobVertexIterator.next();
        JobVertex sink = (JobVertex)jobVertexIterator.next();
        SchedulerBase scheduler = this.createScheduler(jobGraph);
        DefaultExecutionGraph graph = (DefaultExecutionGraph)scheduler.getExecutionGraph();
        ExecutionJobVertex sinkExecutionJobVertex = graph.getJobVertex(sink.getID());
        scheduler.startScheduling();
        MatcherAssert.assertThat((Object)sinkExecutionJobVertex.getParallelism(), (Matcher)Matchers.is((Object)-1));
        AdaptiveBatchSchedulerTest.transitionExecutionsState(scheduler, ExecutionState.FINISHED, source1);
        MatcherAssert.assertThat((Object)sinkExecutionJobVertex.getParallelism(), (Matcher)Matchers.is((Object)-1));
        AdaptiveBatchSchedulerTest.transitionExecutionsState(scheduler, ExecutionState.FINISHED, source2);
        MatcherAssert.assertThat((Object)sinkExecutionJobVertex.getParallelism(), (Matcher)Matchers.is((Object)10));
        MatcherAssert.assertThat((Object)sink.getParallelism(), (Matcher)Matchers.is((Object)10));
    }

    @Test
    public void testDecideParallelismForForwardTarget() throws Exception {
        JobGraph jobGraph = this.createJobGraph(true);
        Iterator jobVertexIterator = jobGraph.getVertices().iterator();
        JobVertex source1 = (JobVertex)jobVertexIterator.next();
        JobVertex source2 = (JobVertex)jobVertexIterator.next();
        JobVertex sink = (JobVertex)jobVertexIterator.next();
        SchedulerBase scheduler = this.createScheduler(jobGraph);
        DefaultExecutionGraph graph = (DefaultExecutionGraph)scheduler.getExecutionGraph();
        ExecutionJobVertex sinkExecutionJobVertex = graph.getJobVertex(sink.getID());
        scheduler.startScheduling();
        MatcherAssert.assertThat((Object)sinkExecutionJobVertex.getParallelism(), (Matcher)Matchers.is((Object)-1));
        AdaptiveBatchSchedulerTest.transitionExecutionsState(scheduler, ExecutionState.FINISHED, source1);
        MatcherAssert.assertThat((Object)sinkExecutionJobVertex.getParallelism(), (Matcher)Matchers.is((Object)-1));
        AdaptiveBatchSchedulerTest.transitionExecutionsState(scheduler, ExecutionState.FINISHED, source2);
        MatcherAssert.assertThat((Object)sinkExecutionJobVertex.getParallelism(), (Matcher)Matchers.is((Object)6));
        MatcherAssert.assertThat((Object)sink.getParallelism(), (Matcher)Matchers.is((Object)6));
    }

    public static void transitionExecutionsState(SchedulerBase scheduler, ExecutionState state, List<Execution> executions) {
        for (Execution execution : executions) {
            scheduler.updateTaskExecutionState(new TaskExecutionState(execution.getAttemptId(), state, null, null, new IOMetrics(0L, 0L, 0L, 0L)));
        }
    }

    public static void transitionExecutionsState(SchedulerBase scheduler, ExecutionState state, JobVertex jobVertex) {
        ExecutionGraph executionGraph = scheduler.getExecutionGraph();
        List<Execution> executions = Arrays.asList(executionGraph.getJobVertex(jobVertex.getID()).getTaskVertices()).stream().map(ExecutionVertex::getCurrentExecutionAttempt).collect(Collectors.toList());
        AdaptiveBatchSchedulerTest.transitionExecutionsState(scheduler, state, executions);
    }

    public JobVertex createJobVertex(String jobVertexName, int parallelism) {
        JobVertex jobVertex = new JobVertex(jobVertexName);
        jobVertex.setInvokableClass(NoOpInvokable.class);
        if (parallelism > 0) {
            jobVertex.setParallelism(parallelism);
        }
        return jobVertex;
    }

    public JobGraph createJobGraph(boolean withForwardEdge) {
        JobVertex source1 = this.createJobVertex("source1", 6);
        JobVertex source2 = this.createJobVertex("source2", 4);
        JobVertex sink = this.createJobVertex("sink", -1);
        sink.connectNewDataSetAsInput(source1, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
        sink.connectNewDataSetAsInput(source2, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
        if (withForwardEdge) {
            ((IntermediateDataSet)source1.getProducedDataSets().get(0)).getConsumer().setForward(true);
        }
        return new JobGraph(new JobID(), "test job", new JobVertex[]{source1, source2, sink});
    }

    public SchedulerBase createScheduler(JobGraph jobGraph) throws Exception {
        Configuration configuration = new Configuration();
        configuration.set(JobManagerOptions.SCHEDULER, (Object)JobManagerOptions.SchedulerType.AdaptiveBatch);
        AdaptiveBatchSchedulerTestUtils.AdaptiveBatchSchedulerBuilder schedulerBuilder = (AdaptiveBatchSchedulerTestUtils.AdaptiveBatchSchedulerBuilder)new AdaptiveBatchSchedulerTestUtils.AdaptiveBatchSchedulerBuilder(jobGraph, mainThreadExecutor).setJobMasterConfiguration(configuration);
        schedulerBuilder.setVertexParallelismDecider(ignored -> 10);
        return schedulerBuilder.build();
    }
}

