package org.apache.flink.runtime.executiongraph.failover.flip1;

import java.util.HashSet;
import java.util.Iterator;
import java.util.stream.Stream;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
import org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition;
import org.apache.flink.runtime.scheduler.strategy.TestingSchedulingExecutionVertex;
import org.apache.flink.runtime.scheduler.strategy.TestingSchedulingResultPartition;
import org.apache.flink.runtime.scheduler.strategy.TestingSchedulingTopology;
import org.apache.flink.util.TestLogger;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionFailoverStrategyTest.class */
public class RestartPipelinedRegionFailoverStrategyTest extends TestLogger {

    /* loaded from: input_file:org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionFailoverStrategyTest$TestResultPartitionAvailabilityChecker.class */
    private static class TestResultPartitionAvailabilityChecker implements ResultPartitionAvailabilityChecker {
        private final HashSet<IntermediateResultPartitionID> failedPartitions = new HashSet<>();

        public boolean isAvailable(IntermediateResultPartitionID intermediateResultPartitionID) {
            return !this.failedPartitions.contains(intermediateResultPartitionID);
        }

        public void markResultPartitionFailed(IntermediateResultPartitionID intermediateResultPartitionID) {
            this.failedPartitions.add(intermediateResultPartitionID);
        }

        public void removeResultPartitionFromFailedState(IntermediateResultPartitionID intermediateResultPartitionID) {
            this.failedPartitions.remove(intermediateResultPartitionID);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionFailoverStrategyTest$VerificationContext.class */
    public static class VerificationContext {
        private final FailoverStrategy strategy;
        private final SchedulingExecutionVertex executionVertex;
        private Throwable cause;

        private VerificationContext(FailoverStrategy failoverStrategy, SchedulingExecutionVertex schedulingExecutionVertex) {
            this.cause = new Exception("Test failure");
            this.strategy = failoverStrategy;
            this.executionVertex = schedulingExecutionVertex;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public VerificationContext partitionConnectionCause(SchedulingResultPartition schedulingResultPartition) {
            return cause(new PartitionConnectionException(new ResultPartitionID(schedulingResultPartition.getId(), new ExecutionAttemptID()), new Exception("Test failure")));
        }

        private VerificationContext cause(Throwable th) {
            this.cause = th;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void restarts(SchedulingExecutionVertex... schedulingExecutionVertexArr) {
            MatcherAssert.assertThat(this.strategy.getTasksNeedingRestart(this.executionVertex.getId(), this.cause), Matchers.containsInAnyOrder(Stream.of((Object[]) schedulingExecutionVertexArr).map((v0) -> {
                return v0.getId();
            }).toArray()));
        }
    }

    @Test
    public void testRegionFailoverForRegionInternalErrors() {
        TestingSchedulingTopology testingSchedulingTopology = new TestingSchedulingTopology();
        TestingSchedulingExecutionVertex newExecutionVertex = testingSchedulingTopology.newExecutionVertex(ExecutionState.FINISHED);
        TestingSchedulingExecutionVertex newExecutionVertex2 = testingSchedulingTopology.newExecutionVertex(ExecutionState.FINISHED);
        TestingSchedulingExecutionVertex newExecutionVertex3 = testingSchedulingTopology.newExecutionVertex(ExecutionState.FINISHED);
        TestingSchedulingExecutionVertex newExecutionVertex4 = testingSchedulingTopology.newExecutionVertex(ExecutionState.FINISHED);
        TestingSchedulingExecutionVertex newExecutionVertex5 = testingSchedulingTopology.newExecutionVertex(ExecutionState.SCHEDULED);
        TestingSchedulingExecutionVertex newExecutionVertex6 = testingSchedulingTopology.newExecutionVertex(ExecutionState.RUNNING);
        testingSchedulingTopology.connect(newExecutionVertex, newExecutionVertex4, ResultPartitionType.BLOCKING);
        testingSchedulingTopology.connect(newExecutionVertex, newExecutionVertex5, ResultPartitionType.BLOCKING);
        testingSchedulingTopology.connect(newExecutionVertex2, newExecutionVertex4, ResultPartitionType.BLOCKING);
        testingSchedulingTopology.connect(newExecutionVertex2, newExecutionVertex5, ResultPartitionType.BLOCKING);
        testingSchedulingTopology.connect(newExecutionVertex3, newExecutionVertex6, ResultPartitionType.BLOCKING);
        RestartPipelinedRegionFailoverStrategy restartPipelinedRegionFailoverStrategy = new RestartPipelinedRegionFailoverStrategy(testingSchedulingTopology);
        verifyThatFailedExecution(restartPipelinedRegionFailoverStrategy, newExecutionVertex).restarts(newExecutionVertex, newExecutionVertex4, newExecutionVertex5);
        verifyThatFailedExecution(restartPipelinedRegionFailoverStrategy, newExecutionVertex2).restarts(newExecutionVertex2, newExecutionVertex4, newExecutionVertex5);
        verifyThatFailedExecution(restartPipelinedRegionFailoverStrategy, newExecutionVertex3).restarts(newExecutionVertex3, newExecutionVertex6);
        verifyThatFailedExecution(restartPipelinedRegionFailoverStrategy, newExecutionVertex4).restarts(newExecutionVertex4);
        verifyThatFailedExecution(restartPipelinedRegionFailoverStrategy, newExecutionVertex5).restarts(newExecutionVertex5);
        verifyThatFailedExecution(restartPipelinedRegionFailoverStrategy, newExecutionVertex6).restarts(newExecutionVertex6);
    }

    @Test
    public void testRegionFailoverForDataConsumptionErrors() throws Exception {
        TestingSchedulingTopology testingSchedulingTopology = new TestingSchedulingTopology();
        TestingSchedulingExecutionVertex newExecutionVertex = testingSchedulingTopology.newExecutionVertex(ExecutionState.FINISHED);
        TestingSchedulingExecutionVertex newExecutionVertex2 = testingSchedulingTopology.newExecutionVertex(ExecutionState.FINISHED);
        TestingSchedulingExecutionVertex newExecutionVertex3 = testingSchedulingTopology.newExecutionVertex(ExecutionState.FINISHED);
        TestingSchedulingExecutionVertex newExecutionVertex4 = testingSchedulingTopology.newExecutionVertex(ExecutionState.RUNNING);
        TestingSchedulingExecutionVertex newExecutionVertex5 = testingSchedulingTopology.newExecutionVertex(ExecutionState.RUNNING);
        TestingSchedulingExecutionVertex newExecutionVertex6 = testingSchedulingTopology.newExecutionVertex(ExecutionState.RUNNING);
        testingSchedulingTopology.connect(newExecutionVertex, newExecutionVertex4, ResultPartitionType.BLOCKING);
        testingSchedulingTopology.connect(newExecutionVertex, newExecutionVertex5, ResultPartitionType.BLOCKING);
        testingSchedulingTopology.connect(newExecutionVertex2, newExecutionVertex4, ResultPartitionType.BLOCKING);
        testingSchedulingTopology.connect(newExecutionVertex2, newExecutionVertex5, ResultPartitionType.BLOCKING);
        testingSchedulingTopology.connect(newExecutionVertex3, newExecutionVertex6, ResultPartitionType.BLOCKING);
        RestartPipelinedRegionFailoverStrategy restartPipelinedRegionFailoverStrategy = new RestartPipelinedRegionFailoverStrategy(testingSchedulingTopology);
        Iterator<TestingSchedulingResultPartition> it = newExecutionVertex4.getConsumedResults().iterator();
        verifyThatFailedExecution(restartPipelinedRegionFailoverStrategy, newExecutionVertex4).partitionConnectionCause(it.next()).restarts(newExecutionVertex, newExecutionVertex4, newExecutionVertex5);
        verifyThatFailedExecution(restartPipelinedRegionFailoverStrategy, newExecutionVertex4).partitionConnectionCause(it.next()).restarts(newExecutionVertex2, newExecutionVertex4, newExecutionVertex5);
        Iterator<TestingSchedulingResultPartition> it2 = newExecutionVertex5.getConsumedResults().iterator();
        verifyThatFailedExecution(restartPipelinedRegionFailoverStrategy, newExecutionVertex5).partitionConnectionCause(it2.next()).restarts(newExecutionVertex, newExecutionVertex4, newExecutionVertex5);
        verifyThatFailedExecution(restartPipelinedRegionFailoverStrategy, newExecutionVertex5).partitionConnectionCause(it2.next()).restarts(newExecutionVertex2, newExecutionVertex4, newExecutionVertex5);
        verifyThatFailedExecution(restartPipelinedRegionFailoverStrategy, newExecutionVertex6).partitionConnectionCause(newExecutionVertex6.getConsumedResults().iterator().next()).restarts(newExecutionVertex3, newExecutionVertex6);
    }

    @Test
    public void testRegionFailoverForVariousResultPartitionAvailabilityCombinations() throws Exception {
        TestingSchedulingTopology testingSchedulingTopology = new TestingSchedulingTopology();
        TestingSchedulingExecutionVertex newExecutionVertex = testingSchedulingTopology.newExecutionVertex(ExecutionState.FINISHED);
        TestingSchedulingExecutionVertex newExecutionVertex2 = testingSchedulingTopology.newExecutionVertex(ExecutionState.FINISHED);
        TestingSchedulingExecutionVertex newExecutionVertex3 = testingSchedulingTopology.newExecutionVertex(ExecutionState.RUNNING);
        testingSchedulingTopology.connect(newExecutionVertex, newExecutionVertex3, ResultPartitionType.BLOCKING);
        testingSchedulingTopology.connect(newExecutionVertex2, newExecutionVertex3, ResultPartitionType.BLOCKING);
        TestResultPartitionAvailabilityChecker testResultPartitionAvailabilityChecker = new TestResultPartitionAvailabilityChecker();
        RestartPipelinedRegionFailoverStrategy restartPipelinedRegionFailoverStrategy = new RestartPipelinedRegionFailoverStrategy(testingSchedulingTopology, testResultPartitionAvailabilityChecker);
        IntermediateResultPartitionID m478getId = newExecutionVertex.getProducedResults().iterator().next().m478getId();
        IntermediateResultPartitionID m478getId2 = newExecutionVertex2.getProducedResults().iterator().next().m478getId();
        testResultPartitionAvailabilityChecker.failedPartitions.clear();
        verifyThatFailedExecution(restartPipelinedRegionFailoverStrategy, newExecutionVertex).restarts(newExecutionVertex, newExecutionVertex3);
        verifyThatFailedExecution(restartPipelinedRegionFailoverStrategy, newExecutionVertex2).restarts(newExecutionVertex2, newExecutionVertex3);
        verifyThatFailedExecution(restartPipelinedRegionFailoverStrategy, newExecutionVertex3).restarts(newExecutionVertex3);
        testResultPartitionAvailabilityChecker.failedPartitions.clear();
        testResultPartitionAvailabilityChecker.markResultPartitionFailed(m478getId);
        verifyThatFailedExecution(restartPipelinedRegionFailoverStrategy, newExecutionVertex).restarts(newExecutionVertex, newExecutionVertex3);
        verifyThatFailedExecution(restartPipelinedRegionFailoverStrategy, newExecutionVertex2).restarts(newExecutionVertex, newExecutionVertex2, newExecutionVertex3);
        verifyThatFailedExecution(restartPipelinedRegionFailoverStrategy, newExecutionVertex3).restarts(newExecutionVertex, newExecutionVertex3);
        testResultPartitionAvailabilityChecker.failedPartitions.clear();
        testResultPartitionAvailabilityChecker.markResultPartitionFailed(m478getId2);
        verifyThatFailedExecution(restartPipelinedRegionFailoverStrategy, newExecutionVertex).restarts(newExecutionVertex, newExecutionVertex2, newExecutionVertex3);
        verifyThatFailedExecution(restartPipelinedRegionFailoverStrategy, newExecutionVertex2).restarts(newExecutionVertex2, newExecutionVertex3);
        verifyThatFailedExecution(restartPipelinedRegionFailoverStrategy, newExecutionVertex3).restarts(newExecutionVertex2, newExecutionVertex3);
        testResultPartitionAvailabilityChecker.failedPartitions.clear();
        testResultPartitionAvailabilityChecker.markResultPartitionFailed(m478getId);
        testResultPartitionAvailabilityChecker.markResultPartitionFailed(m478getId2);
        verifyThatFailedExecution(restartPipelinedRegionFailoverStrategy, newExecutionVertex).restarts(newExecutionVertex, newExecutionVertex2, newExecutionVertex3);
        verifyThatFailedExecution(restartPipelinedRegionFailoverStrategy, newExecutionVertex2).restarts(newExecutionVertex, newExecutionVertex2, newExecutionVertex3);
        verifyThatFailedExecution(restartPipelinedRegionFailoverStrategy, newExecutionVertex3).restarts(newExecutionVertex, newExecutionVertex2, newExecutionVertex3);
    }

    @Test
    public void testRegionFailoverForMultipleVerticesRegions() throws Exception {
        TestingSchedulingTopology testingSchedulingTopology = new TestingSchedulingTopology();
        TestingSchedulingExecutionVertex newExecutionVertex = testingSchedulingTopology.newExecutionVertex(ExecutionState.FINISHED);
        TestingSchedulingExecutionVertex newExecutionVertex2 = testingSchedulingTopology.newExecutionVertex(ExecutionState.FINISHED);
        TestingSchedulingExecutionVertex newExecutionVertex3 = testingSchedulingTopology.newExecutionVertex(ExecutionState.RUNNING);
        TestingSchedulingExecutionVertex newExecutionVertex4 = testingSchedulingTopology.newExecutionVertex(ExecutionState.RUNNING);
        TestingSchedulingExecutionVertex newExecutionVertex5 = testingSchedulingTopology.newExecutionVertex(ExecutionState.FAILED);
        TestingSchedulingExecutionVertex newExecutionVertex6 = testingSchedulingTopology.newExecutionVertex(ExecutionState.CANCELED);
        testingSchedulingTopology.connect(newExecutionVertex, newExecutionVertex2, ResultPartitionType.PIPELINED);
        testingSchedulingTopology.connect(newExecutionVertex2, newExecutionVertex3, ResultPartitionType.BLOCKING);
        testingSchedulingTopology.connect(newExecutionVertex3, newExecutionVertex4, ResultPartitionType.PIPELINED);
        testingSchedulingTopology.connect(newExecutionVertex4, newExecutionVertex5, ResultPartitionType.BLOCKING);
        testingSchedulingTopology.connect(newExecutionVertex5, newExecutionVertex6, ResultPartitionType.PIPELINED);
        RestartPipelinedRegionFailoverStrategy restartPipelinedRegionFailoverStrategy = new RestartPipelinedRegionFailoverStrategy(testingSchedulingTopology);
        verifyThatFailedExecution(restartPipelinedRegionFailoverStrategy, newExecutionVertex3).restarts(newExecutionVertex3, newExecutionVertex4, newExecutionVertex5, newExecutionVertex6);
        verifyThatFailedExecution(restartPipelinedRegionFailoverStrategy, newExecutionVertex3).partitionConnectionCause(newExecutionVertex3.getConsumedResults().iterator().next()).restarts(newExecutionVertex, newExecutionVertex2, newExecutionVertex3, newExecutionVertex4, newExecutionVertex5, newExecutionVertex6);
    }

    @Test
    public void testRegionFailoverDoesNotRestartCreatedExecutions() {
        TestingSchedulingTopology testingSchedulingTopology = new TestingSchedulingTopology();
        TestingSchedulingExecutionVertex newExecutionVertex = testingSchedulingTopology.newExecutionVertex(ExecutionState.CREATED);
        TestingSchedulingExecutionVertex newExecutionVertex2 = testingSchedulingTopology.newExecutionVertex(ExecutionState.CREATED);
        testingSchedulingTopology.connect(newExecutionVertex, newExecutionVertex2, ResultPartitionType.BLOCKING);
        RestartPipelinedRegionFailoverStrategy restartPipelinedRegionFailoverStrategy = new RestartPipelinedRegionFailoverStrategy(testingSchedulingTopology);
        verifyThatFailedExecution(restartPipelinedRegionFailoverStrategy, newExecutionVertex2).restarts(new SchedulingExecutionVertex[0]);
        verifyThatFailedExecution(restartPipelinedRegionFailoverStrategy, newExecutionVertex2).partitionConnectionCause(newExecutionVertex2.getConsumedResults().iterator().next()).restarts(new SchedulingExecutionVertex[0]);
    }

    @Test
    public void testRegionFailoverForPipelinedApproximate() {
        TestingSchedulingTopology testingSchedulingTopology = new TestingSchedulingTopology();
        TestingSchedulingExecutionVertex newExecutionVertex = testingSchedulingTopology.newExecutionVertex(ExecutionState.RUNNING);
        TestingSchedulingExecutionVertex newExecutionVertex2 = testingSchedulingTopology.newExecutionVertex(ExecutionState.RUNNING);
        TestingSchedulingExecutionVertex newExecutionVertex3 = testingSchedulingTopology.newExecutionVertex(ExecutionState.RUNNING);
        TestingSchedulingExecutionVertex newExecutionVertex4 = testingSchedulingTopology.newExecutionVertex(ExecutionState.RUNNING);
        testingSchedulingTopology.connect(newExecutionVertex, newExecutionVertex2, ResultPartitionType.PIPELINED_APPROXIMATE);
        testingSchedulingTopology.connect(newExecutionVertex, newExecutionVertex3, ResultPartitionType.PIPELINED_APPROXIMATE);
        testingSchedulingTopology.connect(newExecutionVertex2, newExecutionVertex4, ResultPartitionType.PIPELINED_APPROXIMATE);
        testingSchedulingTopology.connect(newExecutionVertex3, newExecutionVertex4, ResultPartitionType.PIPELINED_APPROXIMATE);
        RestartPipelinedRegionFailoverStrategy restartPipelinedRegionFailoverStrategy = new RestartPipelinedRegionFailoverStrategy(testingSchedulingTopology);
        verifyThatFailedExecution(restartPipelinedRegionFailoverStrategy, newExecutionVertex).restarts(newExecutionVertex, newExecutionVertex2, newExecutionVertex3, newExecutionVertex4);
        verifyThatFailedExecution(restartPipelinedRegionFailoverStrategy, newExecutionVertex2).restarts(newExecutionVertex2, newExecutionVertex4);
        verifyThatFailedExecution(restartPipelinedRegionFailoverStrategy, newExecutionVertex3).restarts(newExecutionVertex3, newExecutionVertex4);
        verifyThatFailedExecution(restartPipelinedRegionFailoverStrategy, newExecutionVertex4).restarts(newExecutionVertex4);
    }

    private static VerificationContext verifyThatFailedExecution(FailoverStrategy failoverStrategy, SchedulingExecutionVertex schedulingExecutionVertex) {
        return new VerificationContext(failoverStrategy, schedulingExecutionVertex);
    }
}
