package org.apache.flink.runtime.scheduler.exceptionhistory;

import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ErrorInfo;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
import org.apache.flink.runtime.executiongraph.TestingDefaultExecutionGraphBuilder;
import org.apache.flink.runtime.executiongraph.failover.FailureHandlingResult;
import org.apache.flink.runtime.failure.FailureEnricherUtils;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.shaded.guava32.com.google.common.collect.Iterables;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.SerializedThrowable;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

/* loaded from: input_file:org/apache/flink/runtime/scheduler/exceptionhistory/FailureHandlingResultSnapshotTest.class */
class FailureHandlingResultSnapshotTest {

    @RegisterExtension
    private static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorExtension();
    private ExecutionGraph executionGraph;

    FailureHandlingResultSnapshotTest() {
    }

    @BeforeEach
    void setup() throws JobException, JobExecutionException {
        JobGraph singleNoOpJobGraph = JobGraphTestUtils.singleNoOpJobGraph();
        singleNoOpJobGraph.getVertices().forEach(jobVertex -> {
            jobVertex.setParallelism(3);
        });
        this.executionGraph = TestingDefaultExecutionGraphBuilder.newBuilder().setJobGraph(singleNoOpJobGraph).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
        this.executionGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
    }

    @Test
    void testRootCauseVertexNotFailed() {
        FailureHandlingResult restartable = FailureHandlingResult.restartable(extractExecutionVertex(0).getCurrentExecutionAttempt(), new RuntimeException("Expected exception: root cause"), System.currentTimeMillis(), FailureEnricherUtils.EMPTY_FAILURE_LABELS, (Set) StreamSupport.stream(this.executionGraph.getAllExecutionVertices().spliterator(), false).map((v0) -> {
            return v0.getID();
        }).collect(Collectors.toSet()), 0L, false, true);
        Assertions.assertThatThrownBy(() -> {
            FailureHandlingResultSnapshot.create(restartable, this::getCurrentExecutions);
        }).isInstanceOf(IllegalArgumentException.class);
    }

    @Test
    void testMissingThrowableHandling() throws ExecutionException, InterruptedException {
        ExecutionVertex extractExecutionVertex = extractExecutionVertex(0);
        long triggerFailure = triggerFailure(extractExecutionVertex, null);
        FailureHandlingResult restartable = FailureHandlingResult.restartable(extractExecutionVertex.getCurrentExecutionAttempt(), (Throwable) null, triggerFailure, CompletableFuture.completedFuture(Collections.singletonMap("key2", "value2")), (Set) StreamSupport.stream(this.executionGraph.getAllExecutionVertices().spliterator(), false).map((v0) -> {
            return v0.getID();
        }).collect(Collectors.toSet()), 0L, false, true);
        Assertions.assertThat((Map) restartable.getFailureLabels().get()).isEqualTo(Collections.singletonMap("key2", "value2"));
        FailureHandlingResultSnapshot create = FailureHandlingResultSnapshot.create(restartable, this::getCurrentExecutions);
        Throwable deserializeError = new SerializedThrowable(create.getRootCause()).deserializeError(ClassLoader.getSystemClassLoader());
        Assertions.assertThat(deserializeError).isInstanceOf(FlinkException.class);
        Assertions.assertThat(deserializeError).hasMessageContaining(ErrorInfo.handleMissingThrowable((Throwable) null).getMessage());
        Assertions.assertThat(create.getTimestamp()).isEqualTo(triggerFailure);
        Assertions.assertThat(create.getRootCauseExecution()).isPresent();
        Assertions.assertThat((Execution) create.getRootCauseExecution().get()).isSameAs(extractExecutionVertex.getCurrentExecutionAttempt());
        Assertions.assertThat(create.isRootCause()).isTrue();
    }

    @Test
    void testLocalFailureHandlingResultSnapshotCreation() {
        ExecutionVertex extractExecutionVertex = extractExecutionVertex(0);
        RuntimeException runtimeException = new RuntimeException("Expected exception: root cause");
        ExecutionVertex extractExecutionVertex2 = extractExecutionVertex(1);
        IllegalStateException illegalStateException = new IllegalStateException("Expected exception: other failure");
        long triggerFailure = triggerFailure(extractExecutionVertex, runtimeException);
        triggerFailure(extractExecutionVertex2, illegalStateException);
        FailureHandlingResultSnapshot create = FailureHandlingResultSnapshot.create(FailureHandlingResult.restartable(extractExecutionVertex.getCurrentExecutionAttempt(), runtimeException, triggerFailure, FailureEnricherUtils.EMPTY_FAILURE_LABELS, (Set) StreamSupport.stream(this.executionGraph.getAllExecutionVertices().spliterator(), false).map((v0) -> {
            return v0.getID();
        }).collect(Collectors.toSet()), 0L, false, true), this::getCurrentExecutions);
        Assertions.assertThat(create.getRootCause()).isSameAs(runtimeException);
        Assertions.assertThat(create.getTimestamp()).isEqualTo(triggerFailure);
        Assertions.assertThat(create.getRootCauseExecution()).isPresent();
        Assertions.assertThat((Execution) create.getRootCauseExecution().get()).isSameAs(extractExecutionVertex.getCurrentExecutionAttempt());
        Assertions.assertThat(create.getConcurrentlyFailedExecution()).containsExactly(new Execution[]{extractExecutionVertex2.getCurrentExecutionAttempt()});
        Assertions.assertThat(create.isRootCause()).isTrue();
    }

    @Test
    void testFailureHandlingWithRootCauseExecutionBeingPartOfConcurrentlyFailedExecutions() {
        Execution currentExecutionAttempt = extractExecutionVertex(0).getCurrentExecutionAttempt();
        Assertions.assertThatThrownBy(() -> {
            new FailureHandlingResultSnapshot(currentExecutionAttempt, new RuntimeException("Expected exception"), System.currentTimeMillis(), FailureEnricherUtils.EMPTY_FAILURE_LABELS, Collections.singleton(currentExecutionAttempt), true);
        }).isInstanceOf(IllegalArgumentException.class);
    }

    @Test
    void testGlobalFailureHandlingResultSnapshotCreation() throws ExecutionException, InterruptedException {
        FlinkException flinkException = new FlinkException("Expected exception: root cause");
        long currentTimeMillis = System.currentTimeMillis();
        ExecutionVertex extractExecutionVertex = extractExecutionVertex(0);
        RuntimeException runtimeException = new RuntimeException("Expected exception: failure #0");
        ExecutionVertex extractExecutionVertex2 = extractExecutionVertex(1);
        IllegalStateException illegalStateException = new IllegalStateException("Expected exception: failure #1");
        triggerFailure(extractExecutionVertex, runtimeException);
        triggerFailure(extractExecutionVertex2, illegalStateException);
        FailureHandlingResult restartable = FailureHandlingResult.restartable((Execution) null, flinkException, currentTimeMillis, CompletableFuture.completedFuture(Collections.singletonMap("key2", "value2")), (Set) StreamSupport.stream(this.executionGraph.getAllExecutionVertices().spliterator(), false).map((v0) -> {
            return v0.getID();
        }).collect(Collectors.toSet()), 0L, true, true);
        Assertions.assertThat((Map) restartable.getFailureLabels().get()).isEqualTo(Collections.singletonMap("key2", "value2"));
        FailureHandlingResultSnapshot create = FailureHandlingResultSnapshot.create(restartable, this::getCurrentExecutions);
        Assertions.assertThat(create.getRootCause()).isSameAs(flinkException);
        Assertions.assertThat(create.getTimestamp()).isEqualTo(currentTimeMillis);
        Assertions.assertThat(create.getRootCauseExecution()).isNotPresent();
        Assertions.assertThat(create.getConcurrentlyFailedExecution()).containsExactlyInAnyOrder(new Execution[]{extractExecutionVertex.getCurrentExecutionAttempt(), extractExecutionVertex2.getCurrentExecutionAttempt()});
        Assertions.assertThat(create.isRootCause()).isTrue();
    }

    private Collection<Execution> getCurrentExecutions(ExecutionVertexID executionVertexID) {
        if (!this.executionGraph.getAllVertices().containsKey(executionVertexID.getJobVertexId())) {
            throw new IllegalArgumentException("The ExecutionJobVertex having the ID " + executionVertexID.getJobVertexId() + " does not exist.");
        }
        ExecutionVertex[] taskVertices = ((ExecutionJobVertex) this.executionGraph.getAllVertices().get(executionVertexID.getJobVertexId())).getTaskVertices();
        if (taskVertices.length <= executionVertexID.getSubtaskIndex()) {
            throw new IllegalArgumentException("The ExecutionVertex having the subtask ID " + executionVertexID.getSubtaskIndex() + " for ExecutionJobVertex " + executionVertexID.getJobVertexId() + " does not exist.");
        }
        return taskVertices[executionVertexID.getSubtaskIndex()].getCurrentExecutions();
    }

    private long triggerFailure(ExecutionVertex executionVertex, Throwable th) {
        this.executionGraph.updateState(new TaskExecutionStateTransition(new TaskExecutionState(executionVertex.getCurrentExecutionAttempt().getAttemptId(), ExecutionState.FAILED, th)));
        return ((ErrorInfo) executionVertex.getFailureInfo().orElseThrow(() -> {
            return new IllegalArgumentException("The transition into failed state didn't succeed for ExecutionVertex " + executionVertex.getID() + ".");
        })).getTimestamp();
    }

    private ExecutionVertex extractExecutionVertex(int i) {
        ExecutionVertex executionVertex = (ExecutionVertex) Iterables.get(this.executionGraph.getAllExecutionVertices(), i);
        executionVertex.tryAssignResource(new TestingLogicalSlotBuilder().createTestingLogicalSlot());
        return executionVertex;
    }
}
