package org.apache.flink.runtime.executiongraph.failover.flip1;

import java.util.HashSet;
import java.util.Iterator;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.failover.flip1.TestFailoverTopology;
import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.util.TestLogger;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionStrategyTest.class */
public class RestartPipelinedRegionStrategyTest extends TestLogger {

    /* loaded from: input_file:org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionStrategyTest$TestResultPartitionAvailabilityChecker.class */
    private static class TestResultPartitionAvailabilityChecker implements ResultPartitionAvailabilityChecker {
        private final HashSet<IntermediateResultPartitionID> failedPartitions = new HashSet<>();

        public boolean isAvailable(IntermediateResultPartitionID intermediateResultPartitionID) {
            return !this.failedPartitions.contains(intermediateResultPartitionID);
        }

        public void markResultPartitionFailed(IntermediateResultPartitionID intermediateResultPartitionID) {
            this.failedPartitions.add(intermediateResultPartitionID);
        }

        public void removeResultPartitionFromFailedState(IntermediateResultPartitionID intermediateResultPartitionID) {
            this.failedPartitions.remove(intermediateResultPartitionID);
        }
    }

    @Test
    public void testRegionFailoverForRegionInternalErrors() throws Exception {
        TestFailoverTopology.Builder builder = new TestFailoverTopology.Builder();
        TestFailoverTopology.TestFailoverVertex newVertex = builder.newVertex();
        TestFailoverTopology.TestFailoverVertex newVertex2 = builder.newVertex();
        TestFailoverTopology.TestFailoverVertex newVertex3 = builder.newVertex();
        TestFailoverTopology.TestFailoverVertex newVertex4 = builder.newVertex();
        TestFailoverTopology.TestFailoverVertex newVertex5 = builder.newVertex();
        TestFailoverTopology.TestFailoverVertex newVertex6 = builder.newVertex();
        builder.connect(newVertex, newVertex4, ResultPartitionType.BLOCKING);
        builder.connect(newVertex, newVertex5, ResultPartitionType.BLOCKING);
        builder.connect(newVertex2, newVertex4, ResultPartitionType.BLOCKING);
        builder.connect(newVertex2, newVertex5, ResultPartitionType.BLOCKING);
        builder.connect(newVertex3, newVertex6, ResultPartitionType.BLOCKING);
        RestartPipelinedRegionStrategy restartPipelinedRegionStrategy = new RestartPipelinedRegionStrategy(builder.build());
        HashSet hashSet = new HashSet();
        hashSet.add(newVertex.m89getId());
        hashSet.add(newVertex4.m89getId());
        hashSet.add(newVertex5.m89getId());
        Assert.assertEquals(hashSet, restartPipelinedRegionStrategy.getTasksNeedingRestart(newVertex.m89getId(), new Exception("Test failure")));
        hashSet.clear();
        hashSet.add(newVertex2.m89getId());
        hashSet.add(newVertex4.m89getId());
        hashSet.add(newVertex5.m89getId());
        Assert.assertEquals(hashSet, restartPipelinedRegionStrategy.getTasksNeedingRestart(newVertex2.m89getId(), new Exception("Test failure")));
        hashSet.clear();
        hashSet.add(newVertex3.m89getId());
        hashSet.add(newVertex6.m89getId());
        Assert.assertEquals(hashSet, restartPipelinedRegionStrategy.getTasksNeedingRestart(newVertex3.m89getId(), new Exception("Test failure")));
        hashSet.clear();
        hashSet.add(newVertex4.m89getId());
        Assert.assertEquals(hashSet, restartPipelinedRegionStrategy.getTasksNeedingRestart(newVertex4.m89getId(), new Exception("Test failure")));
        hashSet.clear();
        hashSet.add(newVertex5.m89getId());
        Assert.assertEquals(hashSet, restartPipelinedRegionStrategy.getTasksNeedingRestart(newVertex5.m89getId(), new Exception("Test failure")));
        hashSet.clear();
        hashSet.add(newVertex6.m89getId());
        Assert.assertEquals(hashSet, restartPipelinedRegionStrategy.getTasksNeedingRestart(newVertex6.m89getId(), new Exception("Test failure")));
    }

    @Test
    public void testRegionFailoverForDataConsumptionErrors() throws Exception {
        TestFailoverTopology.Builder builder = new TestFailoverTopology.Builder();
        TestFailoverTopology.TestFailoverVertex newVertex = builder.newVertex();
        TestFailoverTopology.TestFailoverVertex newVertex2 = builder.newVertex();
        TestFailoverTopology.TestFailoverVertex newVertex3 = builder.newVertex();
        TestFailoverTopology.TestFailoverVertex newVertex4 = builder.newVertex();
        TestFailoverTopology.TestFailoverVertex newVertex5 = builder.newVertex();
        TestFailoverTopology.TestFailoverVertex newVertex6 = builder.newVertex();
        builder.connect(newVertex, newVertex4, ResultPartitionType.BLOCKING);
        builder.connect(newVertex, newVertex5, ResultPartitionType.BLOCKING);
        builder.connect(newVertex2, newVertex4, ResultPartitionType.BLOCKING);
        builder.connect(newVertex2, newVertex5, ResultPartitionType.BLOCKING);
        builder.connect(newVertex3, newVertex6, ResultPartitionType.BLOCKING);
        RestartPipelinedRegionStrategy restartPipelinedRegionStrategy = new RestartPipelinedRegionStrategy(builder.build());
        HashSet hashSet = new HashSet();
        Iterator<TestFailoverTopology.TestFailoverResultPartition> it = newVertex4.getConsumedResults().iterator();
        hashSet.add(newVertex.m89getId());
        hashSet.add(newVertex4.m89getId());
        hashSet.add(newVertex5.m89getId());
        Assert.assertEquals(hashSet, restartPipelinedRegionStrategy.getTasksNeedingRestart(newVertex4.m89getId(), new PartitionConnectionException(new ResultPartitionID(it.next().m88getId(), new ExecutionAttemptID()), new Exception("Test failure"))));
        hashSet.clear();
        hashSet.add(newVertex2.m89getId());
        hashSet.add(newVertex4.m89getId());
        hashSet.add(newVertex5.m89getId());
        Assert.assertEquals(hashSet, restartPipelinedRegionStrategy.getTasksNeedingRestart(newVertex4.m89getId(), new PartitionNotFoundException(new ResultPartitionID(it.next().m88getId(), new ExecutionAttemptID()))));
        hashSet.clear();
        Iterator<TestFailoverTopology.TestFailoverResultPartition> it2 = newVertex5.getConsumedResults().iterator();
        hashSet.add(newVertex.m89getId());
        hashSet.add(newVertex4.m89getId());
        hashSet.add(newVertex5.m89getId());
        Assert.assertEquals(hashSet, restartPipelinedRegionStrategy.getTasksNeedingRestart(newVertex5.m89getId(), new PartitionConnectionException(new ResultPartitionID(it2.next().m88getId(), new ExecutionAttemptID()), new Exception("Test failure"))));
        hashSet.clear();
        hashSet.add(newVertex2.m89getId());
        hashSet.add(newVertex4.m89getId());
        hashSet.add(newVertex5.m89getId());
        Assert.assertEquals(hashSet, restartPipelinedRegionStrategy.getTasksNeedingRestart(newVertex5.m89getId(), new PartitionNotFoundException(new ResultPartitionID(it2.next().m88getId(), new ExecutionAttemptID()))));
        hashSet.clear();
        Iterator<TestFailoverTopology.TestFailoverResultPartition> it3 = newVertex6.getConsumedResults().iterator();
        hashSet.add(newVertex3.m89getId());
        hashSet.add(newVertex6.m89getId());
        Assert.assertEquals(hashSet, restartPipelinedRegionStrategy.getTasksNeedingRestart(newVertex6.m89getId(), new PartitionConnectionException(new ResultPartitionID(it3.next().m88getId(), new ExecutionAttemptID()), new Exception("Test failure"))));
    }

    @Test
    public void testRegionFailoverForVariousResultPartitionAvailabilityCombinations() throws Exception {
        TestFailoverTopology.Builder builder = new TestFailoverTopology.Builder();
        TestFailoverTopology.TestFailoverVertex newVertex = builder.newVertex();
        TestFailoverTopology.TestFailoverVertex newVertex2 = builder.newVertex();
        TestFailoverTopology.TestFailoverVertex newVertex3 = builder.newVertex();
        builder.connect(newVertex, newVertex3, ResultPartitionType.BLOCKING);
        builder.connect(newVertex2, newVertex3, ResultPartitionType.BLOCKING);
        TestFailoverTopology build = builder.build();
        TestResultPartitionAvailabilityChecker testResultPartitionAvailabilityChecker = new TestResultPartitionAvailabilityChecker();
        RestartPipelinedRegionStrategy restartPipelinedRegionStrategy = new RestartPipelinedRegionStrategy(build, testResultPartitionAvailabilityChecker);
        IntermediateResultPartitionID m88getId = newVertex.getProducedResults().iterator().next().m88getId();
        IntermediateResultPartitionID m88getId2 = newVertex2.getProducedResults().iterator().next().m88getId();
        testResultPartitionAvailabilityChecker.failedPartitions.clear();
        MatcherAssert.assertThat(restartPipelinedRegionStrategy.getTasksNeedingRestart(newVertex.m89getId(), new Exception("Test failure")), Matchers.containsInAnyOrder(new ExecutionVertexID[]{newVertex.m89getId(), newVertex3.m89getId()}));
        MatcherAssert.assertThat(restartPipelinedRegionStrategy.getTasksNeedingRestart(newVertex2.m89getId(), new Exception("Test failure")), Matchers.containsInAnyOrder(new ExecutionVertexID[]{newVertex2.m89getId(), newVertex3.m89getId()}));
        MatcherAssert.assertThat(restartPipelinedRegionStrategy.getTasksNeedingRestart(newVertex3.m89getId(), new Exception("Test failure")), Matchers.containsInAnyOrder(new ExecutionVertexID[]{newVertex3.m89getId()}));
        testResultPartitionAvailabilityChecker.failedPartitions.clear();
        testResultPartitionAvailabilityChecker.markResultPartitionFailed(m88getId);
        MatcherAssert.assertThat(restartPipelinedRegionStrategy.getTasksNeedingRestart(newVertex.m89getId(), new Exception("Test failure")), Matchers.containsInAnyOrder(new ExecutionVertexID[]{newVertex.m89getId(), newVertex3.m89getId()}));
        MatcherAssert.assertThat(restartPipelinedRegionStrategy.getTasksNeedingRestart(newVertex2.m89getId(), new Exception("Test failure")), Matchers.containsInAnyOrder(new ExecutionVertexID[]{newVertex.m89getId(), newVertex2.m89getId(), newVertex3.m89getId()}));
        MatcherAssert.assertThat(restartPipelinedRegionStrategy.getTasksNeedingRestart(newVertex3.m89getId(), new Exception("Test failure")), Matchers.containsInAnyOrder(new ExecutionVertexID[]{newVertex.m89getId(), newVertex3.m89getId()}));
        testResultPartitionAvailabilityChecker.failedPartitions.clear();
        testResultPartitionAvailabilityChecker.markResultPartitionFailed(m88getId2);
        MatcherAssert.assertThat(restartPipelinedRegionStrategy.getTasksNeedingRestart(newVertex.m89getId(), new Exception("Test failure")), Matchers.containsInAnyOrder(new ExecutionVertexID[]{newVertex.m89getId(), newVertex2.m89getId(), newVertex3.m89getId()}));
        MatcherAssert.assertThat(restartPipelinedRegionStrategy.getTasksNeedingRestart(newVertex2.m89getId(), new Exception("Test failure")), Matchers.containsInAnyOrder(new ExecutionVertexID[]{newVertex2.m89getId(), newVertex3.m89getId()}));
        MatcherAssert.assertThat(restartPipelinedRegionStrategy.getTasksNeedingRestart(newVertex3.m89getId(), new Exception("Test failure")), Matchers.containsInAnyOrder(new ExecutionVertexID[]{newVertex2.m89getId(), newVertex3.m89getId()}));
        testResultPartitionAvailabilityChecker.failedPartitions.clear();
        testResultPartitionAvailabilityChecker.markResultPartitionFailed(m88getId);
        testResultPartitionAvailabilityChecker.markResultPartitionFailed(m88getId2);
        MatcherAssert.assertThat(restartPipelinedRegionStrategy.getTasksNeedingRestart(newVertex.m89getId(), new Exception("Test failure")), Matchers.containsInAnyOrder(new ExecutionVertexID[]{newVertex.m89getId(), newVertex2.m89getId(), newVertex3.m89getId()}));
        MatcherAssert.assertThat(restartPipelinedRegionStrategy.getTasksNeedingRestart(newVertex2.m89getId(), new Exception("Test failure")), Matchers.containsInAnyOrder(new ExecutionVertexID[]{newVertex.m89getId(), newVertex2.m89getId(), newVertex3.m89getId()}));
        MatcherAssert.assertThat(restartPipelinedRegionStrategy.getTasksNeedingRestart(newVertex3.m89getId(), new Exception("Test failure")), Matchers.containsInAnyOrder(new ExecutionVertexID[]{newVertex.m89getId(), newVertex2.m89getId(), newVertex3.m89getId()}));
    }

    @Test
    public void testRegionFailoverForMultipleVerticesRegions() throws Exception {
        TestFailoverTopology.Builder builder = new TestFailoverTopology.Builder();
        TestFailoverTopology.TestFailoverVertex newVertex = builder.newVertex();
        TestFailoverTopology.TestFailoverVertex newVertex2 = builder.newVertex();
        TestFailoverTopology.TestFailoverVertex newVertex3 = builder.newVertex();
        TestFailoverTopology.TestFailoverVertex newVertex4 = builder.newVertex();
        TestFailoverTopology.TestFailoverVertex newVertex5 = builder.newVertex();
        TestFailoverTopology.TestFailoverVertex newVertex6 = builder.newVertex();
        builder.connect(newVertex, newVertex2, ResultPartitionType.PIPELINED);
        builder.connect(newVertex2, newVertex3, ResultPartitionType.BLOCKING);
        builder.connect(newVertex3, newVertex4, ResultPartitionType.PIPELINED);
        builder.connect(newVertex4, newVertex5, ResultPartitionType.BLOCKING);
        builder.connect(newVertex5, newVertex6, ResultPartitionType.PIPELINED);
        RestartPipelinedRegionStrategy restartPipelinedRegionStrategy = new RestartPipelinedRegionStrategy(builder.build());
        HashSet hashSet = new HashSet();
        hashSet.add(newVertex3.m89getId());
        hashSet.add(newVertex4.m89getId());
        hashSet.add(newVertex5.m89getId());
        hashSet.add(newVertex6.m89getId());
        Assert.assertEquals(hashSet, restartPipelinedRegionStrategy.getTasksNeedingRestart(newVertex3.m89getId(), new Exception("Test failure")));
        hashSet.clear();
        hashSet.add(newVertex.m89getId());
        hashSet.add(newVertex2.m89getId());
        hashSet.add(newVertex3.m89getId());
        hashSet.add(newVertex4.m89getId());
        hashSet.add(newVertex5.m89getId());
        hashSet.add(newVertex6.m89getId());
        Assert.assertEquals(hashSet, restartPipelinedRegionStrategy.getTasksNeedingRestart(newVertex3.m89getId(), new PartitionConnectionException(new ResultPartitionID(newVertex3.getConsumedResults().iterator().next().m88getId(), new ExecutionAttemptID()), new Exception("Test failure"))));
    }
}
