/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler.strategy;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
import org.apache.flink.runtime.scheduler.SchedulerOperations;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy;
import org.apache.flink.runtime.scheduler.strategy.ResultPartitionState;
import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
import org.apache.flink.runtime.scheduler.strategy.TestingSchedulerOperations;
import org.apache.flink.runtime.scheduler.strategy.TestingSchedulingExecutionVertex;
import org.apache.flink.runtime.scheduler.strategy.TestingSchedulingTopology;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class PipelinedRegionSchedulingStrategyTest
extends TestLogger {
    private TestingSchedulerOperations testingSchedulerOperation;
    private static final int PARALLELISM = 2;
    private TestingSchedulingTopology testingSchedulingTopology;
    private List<TestingSchedulingExecutionVertex> source;
    private List<TestingSchedulingExecutionVertex> map;
    private List<TestingSchedulingExecutionVertex> sink;

    @Before
    public void setUp() {
        this.testingSchedulerOperation = new TestingSchedulerOperations();
        this.buildTopology();
    }

    private void buildTopology() {
        this.testingSchedulingTopology = new TestingSchedulingTopology();
        this.source = this.testingSchedulingTopology.addExecutionVertices().withParallelism(2).finish();
        this.map = this.testingSchedulingTopology.addExecutionVertices().withParallelism(2).finish();
        this.sink = this.testingSchedulingTopology.addExecutionVertices().withParallelism(2).finish();
        this.testingSchedulingTopology.connectPointwise(this.source, this.map).withResultPartitionState(ResultPartitionState.CREATED).withResultPartitionType(ResultPartitionType.PIPELINED_BOUNDED).finish();
        this.testingSchedulingTopology.connectAllToAll(this.map, this.sink).withResultPartitionState(ResultPartitionState.CREATED).withResultPartitionType(ResultPartitionType.BLOCKING).finish();
    }

    @Test
    public void testStartScheduling() {
        this.startScheduling(this.testingSchedulingTopology);
        ArrayList<List<TestingSchedulingExecutionVertex>> expectedScheduledVertices = new ArrayList<List<TestingSchedulingExecutionVertex>>();
        expectedScheduledVertices.add(Arrays.asList(this.source.get(0), this.map.get(0)));
        expectedScheduledVertices.add(Arrays.asList(this.source.get(1), this.map.get(1)));
        this.assertLatestScheduledVerticesAreEqualTo(expectedScheduledVertices);
    }

    @Test
    public void testRestartTasks() {
        PipelinedRegionSchedulingStrategy schedulingStrategy = this.startScheduling(this.testingSchedulingTopology);
        Set verticesToRestart = Stream.of(this.source, this.map, this.sink).flatMap(Collection::stream).map(TestingSchedulingExecutionVertex::getId).collect(Collectors.toSet());
        schedulingStrategy.restartTasks(verticesToRestart);
        ArrayList<List<TestingSchedulingExecutionVertex>> expectedScheduledVertices = new ArrayList<List<TestingSchedulingExecutionVertex>>();
        expectedScheduledVertices.add(Arrays.asList(this.source.get(0), this.map.get(0)));
        expectedScheduledVertices.add(Arrays.asList(this.source.get(1), this.map.get(1)));
        this.assertLatestScheduledVerticesAreEqualTo(expectedScheduledVertices);
    }

    @Test
    public void testNotifyingBlockingResultPartitionProducerFinished() {
        PipelinedRegionSchedulingStrategy schedulingStrategy = this.startScheduling(this.testingSchedulingTopology);
        TestingSchedulingExecutionVertex map1 = this.map.get(0);
        map1.getProducedResults().iterator().next().setState(ResultPartitionState.CONSUMABLE);
        schedulingStrategy.onExecutionStateChange(map1.getId(), ExecutionState.FINISHED);
        Assert.assertThat(this.testingSchedulerOperation.getScheduledVertices(), (Matcher)Matchers.hasSize((int)2));
        TestingSchedulingExecutionVertex map2 = this.map.get(1);
        map2.getProducedResults().iterator().next().setState(ResultPartitionState.CONSUMABLE);
        schedulingStrategy.onExecutionStateChange(map2.getId(), ExecutionState.FINISHED);
        Assert.assertThat(this.testingSchedulerOperation.getScheduledVertices(), (Matcher)Matchers.hasSize((int)4));
        ArrayList<List<TestingSchedulingExecutionVertex>> expectedScheduledVertices = new ArrayList<List<TestingSchedulingExecutionVertex>>();
        expectedScheduledVertices.add(Arrays.asList(this.sink.get(0)));
        expectedScheduledVertices.add(Arrays.asList(this.sink.get(1)));
        this.assertLatestScheduledVerticesAreEqualTo(expectedScheduledVertices);
    }

    private PipelinedRegionSchedulingStrategy startScheduling(TestingSchedulingTopology testingSchedulingTopology) {
        PipelinedRegionSchedulingStrategy schedulingStrategy = new PipelinedRegionSchedulingStrategy((SchedulerOperations)this.testingSchedulerOperation, (SchedulingTopology)testingSchedulingTopology);
        schedulingStrategy.startScheduling();
        return schedulingStrategy;
    }

    private void assertLatestScheduledVerticesAreEqualTo(List<List<TestingSchedulingExecutionVertex>> expected) {
        List<List<ExecutionVertexDeploymentOption>> deploymentOptions = this.testingSchedulerOperation.getScheduledVertices();
        int expectedScheduledBulks = expected.size();
        Assert.assertThat((Object)expectedScheduledBulks, (Matcher)Matchers.lessThanOrEqualTo((Comparable)Integer.valueOf(deploymentOptions.size())));
        for (int i = 0; i < expectedScheduledBulks; ++i) {
            Assert.assertEquals(PipelinedRegionSchedulingStrategyTest.idsFromVertices(expected.get(expectedScheduledBulks - i - 1)), PipelinedRegionSchedulingStrategyTest.idsFromDeploymentOptions(deploymentOptions.get(deploymentOptions.size() - i - 1)));
        }
    }

    private static List<ExecutionVertexID> idsFromVertices(List<TestingSchedulingExecutionVertex> vertices) {
        return vertices.stream().map(TestingSchedulingExecutionVertex::getId).collect(Collectors.toList());
    }

    private static List<ExecutionVertexID> idsFromDeploymentOptions(List<ExecutionVertexDeploymentOption> deploymentOptions) {
        return deploymentOptions.stream().map(ExecutionVertexDeploymentOption::getExecutionVertexId).collect(Collectors.toList());
    }
}

