/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.executiongraph;

import java.util.List;
import org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.RegionPartitionReleaseStrategy;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
import org.apache.flink.runtime.scheduler.strategy.TestingSchedulingExecutionVertex;
import org.apache.flink.runtime.scheduler.strategy.TestingSchedulingResultPartition;
import org.apache.flink.runtime.scheduler.strategy.TestingSchedulingTopology;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class RegionPartitionReleaseStrategyTest
extends TestLogger {
    private TestingSchedulingTopology testingSchedulingTopology;

    @Before
    public void setUp() throws Exception {
        this.testingSchedulingTopology = new TestingSchedulingTopology();
    }

    @Test
    public void releasePartitionsIfDownstreamRegionIsFinished() {
        List<TestingSchedulingExecutionVertex> producers = this.testingSchedulingTopology.addExecutionVertices().finish();
        List<TestingSchedulingExecutionVertex> consumers = this.testingSchedulingTopology.addExecutionVertices().finish();
        List<TestingSchedulingResultPartition> resultPartitions = this.testingSchedulingTopology.connectPointwise(producers, consumers).finish();
        ExecutionVertexID onlyConsumerVertexId = consumers.get(0).getId();
        IntermediateResultPartitionID onlyResultPartitionId = resultPartitions.get(0).getId();
        RegionPartitionReleaseStrategy regionPartitionReleaseStrategy = new RegionPartitionReleaseStrategy((SchedulingTopology)this.testingSchedulingTopology);
        List partitionsToRelease = regionPartitionReleaseStrategy.vertexFinished(onlyConsumerVertexId);
        Assert.assertThat((Object)partitionsToRelease, (Matcher)Matchers.contains((Object[])new IntermediateResultPartitionID[]{onlyResultPartitionId}));
    }

    @Test
    public void releasePartitionsIfDownstreamRegionWithMultipleOperatorsIsFinished() {
        List<TestingSchedulingExecutionVertex> sourceVertices = this.testingSchedulingTopology.addExecutionVertices().finish();
        List<TestingSchedulingExecutionVertex> intermediateVertices = this.testingSchedulingTopology.addExecutionVertices().finish();
        List<TestingSchedulingExecutionVertex> sinkVertices = this.testingSchedulingTopology.addExecutionVertices().finish();
        List<TestingSchedulingResultPartition> sourceResultPartitions = this.testingSchedulingTopology.connectAllToAll(sourceVertices, intermediateVertices).finish();
        this.testingSchedulingTopology.connectAllToAll(intermediateVertices, sinkVertices).withResultPartitionType(ResultPartitionType.PIPELINED).finish();
        ExecutionVertexID onlyIntermediateVertexId = intermediateVertices.get(0).getId();
        ExecutionVertexID onlySinkVertexId = sinkVertices.get(0).getId();
        IntermediateResultPartitionID onlySourceResultPartitionId = sourceResultPartitions.get(0).getId();
        RegionPartitionReleaseStrategy regionPartitionReleaseStrategy = new RegionPartitionReleaseStrategy((SchedulingTopology)this.testingSchedulingTopology);
        regionPartitionReleaseStrategy.vertexFinished(onlyIntermediateVertexId);
        List partitionsToRelease = regionPartitionReleaseStrategy.vertexFinished(onlySinkVertexId);
        Assert.assertThat((Object)partitionsToRelease, (Matcher)Matchers.contains((Object[])new IntermediateResultPartitionID[]{onlySourceResultPartitionId}));
    }

    @Test
    public void notReleasePartitionsIfDownstreamRegionIsNotFinished() {
        List<TestingSchedulingExecutionVertex> producers = this.testingSchedulingTopology.addExecutionVertices().finish();
        List<TestingSchedulingExecutionVertex> consumers = this.testingSchedulingTopology.addExecutionVertices().withParallelism(2).finish();
        this.testingSchedulingTopology.connectAllToAll(producers, consumers).finish();
        ExecutionVertexID consumerVertex1 = consumers.get(0).getId();
        RegionPartitionReleaseStrategy regionPartitionReleaseStrategy = new RegionPartitionReleaseStrategy((SchedulingTopology)this.testingSchedulingTopology);
        List partitionsToRelease = regionPartitionReleaseStrategy.vertexFinished(consumerVertex1);
        Assert.assertThat((Object)partitionsToRelease, (Matcher)Matchers.is((Matcher)Matchers.empty()));
    }

    @Test
    public void toggleVertexFinishedUnfinished() {
        List<TestingSchedulingExecutionVertex> producers = this.testingSchedulingTopology.addExecutionVertices().finish();
        List<TestingSchedulingExecutionVertex> consumers = this.testingSchedulingTopology.addExecutionVertices().withParallelism(2).finish();
        this.testingSchedulingTopology.connectAllToAll(producers, consumers).finish();
        ExecutionVertexID consumerVertex1 = consumers.get(0).getId();
        ExecutionVertexID consumerVertex2 = consumers.get(1).getId();
        RegionPartitionReleaseStrategy regionPartitionReleaseStrategy = new RegionPartitionReleaseStrategy((SchedulingTopology)this.testingSchedulingTopology);
        regionPartitionReleaseStrategy.vertexFinished(consumerVertex1);
        regionPartitionReleaseStrategy.vertexFinished(consumerVertex2);
        regionPartitionReleaseStrategy.vertexUnfinished(consumerVertex2);
        List partitionsToRelease = regionPartitionReleaseStrategy.vertexFinished(consumerVertex1);
        Assert.assertThat((Object)partitionsToRelease, (Matcher)Matchers.is((Matcher)Matchers.empty()));
    }
}

