package org.apache.flink.runtime.dispatcher;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.runtime.checkpoint.EmbeddedCompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.PerJobCheckpointRecoveryFactory;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.dispatcher.AbstractDispatcherTest;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneRunningJobsRegistry;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.jobmaster.JobMasterGateway;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.testutils.TestingJobGraphStore;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.SerializedValue;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/dispatcher/DispatcherFailoverITCase.class */
public class DispatcherFailoverITCase extends AbstractDispatcherTest {
    private final BlockingQueue<RpcEndpoint> toTerminate = new LinkedBlockingQueue();

    @Override // org.apache.flink.runtime.dispatcher.AbstractDispatcherTest
    @Before
    public void setUp() throws Exception {
        super.setUp();
        this.haServices.setCheckpointRecoveryFactory(new PerJobCheckpointRecoveryFactory((num, embeddedCompletedCheckpointStore) -> {
            if (embeddedCompletedCheckpointStore == null) {
                return new EmbeddedCompletedCheckpointStore(num.intValue());
            }
            Assert.assertFalse(embeddedCompletedCheckpointStore.getShutdownStatus().isPresent());
            Assert.assertFalse(embeddedCompletedCheckpointStore.getAllCheckpoints().isEmpty());
            return new EmbeddedCompletedCheckpointStore(num.intValue(), embeddedCompletedCheckpointStore.getAllCheckpoints());
        }));
    }

    @Override // org.apache.flink.runtime.dispatcher.AbstractDispatcherTest
    @After
    public void tearDown() {
        while (!this.toTerminate.isEmpty()) {
            try {
                RpcUtils.terminateRpcEndpoint(this.toTerminate.poll(), TIMEOUT);
            } catch (Exception e) {
            }
        }
    }

    @Test
    public void testRecoverFromCheckpointAfterJobGraphRemovalOfTerminatedJobFailed() throws Exception {
        DispatcherGateway selfGateway;
        Throwable th;
        JobGraph createJobGraph = createJobGraph();
        JobID jobID = createJobGraph.getJobID();
        Error error = new Error("Unable to remove job graph.");
        TestingJobGraphStore build = TestingJobGraphStore.newBuilder().setRemoveJobGraphConsumer(jobID2 -> {
            throw error;
        }).build();
        build.start(null);
        this.haServices.setJobGraphStore(build);
        TestingLeaderElectionService testingLeaderElectionService = new TestingLeaderElectionService();
        this.haServices.setJobMasterLeaderElectionService(jobID, testingLeaderElectionService);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        RpcEndpoint createRecoveredDispatcher = createRecoveredDispatcher(th2 -> {
            Optional findThrowable = ExceptionUtils.findThrowable(th2, Error.class);
            if (findThrowable.isPresent() && error.equals(findThrowable.get())) {
                countDownLatch.countDown();
            } else {
                this.testingFatalErrorHandlerResource.getFatalErrorHandler().onFatalError(th2);
            }
        });
        this.toTerminate.add(createRecoveredDispatcher);
        testingLeaderElectionService.isLeader(UUID.randomUUID());
        DispatcherGateway selfGateway2 = createRecoveredDispatcher.getSelfGateway(DispatcherGateway.class);
        selfGateway2.submitJob(createJobGraph, TIMEOUT).get();
        JobMasterTester jobMasterTester = new JobMasterTester(rpcService, jobID, connectToLeadingJobMaster(testingLeaderElectionService).get());
        Throwable th3 = null;
        try {
            try {
                CompletableFuture<List<TaskDeploymentDescriptor>> deployVertices = jobMasterTester.deployVertices(2);
                awaitStatus(selfGateway2, jobID, JobStatus.RUNNING);
                jobMasterTester.transitionTo(deployVertices.get(), ExecutionState.INITIALIZING).get();
                jobMasterTester.transitionTo(deployVertices.get(), ExecutionState.RUNNING).get();
                jobMasterTester.getCheckpointFuture(1L).get();
                jobMasterTester.transitionTo(deployVertices.get(), ExecutionState.FINISHED).get();
                if (jobMasterTester != null) {
                    if (0 != 0) {
                        try {
                            jobMasterTester.close();
                        } catch (Throwable th4) {
                            th3.addSuppressed(th4);
                        }
                    } else {
                        jobMasterTester.close();
                    }
                }
                awaitStatus(selfGateway2, jobID, JobStatus.FINISHED);
                countDownLatch.await();
                testingLeaderElectionService.notLeader();
                testingLeaderElectionService.stop();
                RpcEndpoint createRecoveredDispatcher2 = createRecoveredDispatcher(null);
                this.toTerminate.add(createRecoveredDispatcher2);
                selfGateway = createRecoveredDispatcher2.getSelfGateway(DispatcherGateway.class);
                testingLeaderElectionService.isLeader(UUID.randomUUID());
                jobMasterTester = new JobMasterTester(rpcService, jobID, connectToLeadingJobMaster(testingLeaderElectionService).get());
                th = null;
            } catch (Throwable th5) {
                th3 = th5;
                throw th5;
            }
            try {
                try {
                    CompletableFuture<List<TaskDeploymentDescriptor>> deployVertices2 = jobMasterTester.deployVertices(2);
                    awaitStatus(selfGateway, jobID, JobStatus.RUNNING);
                    Assert.assertTrue("Job has recovered from checkpoint.", deployVertices2.get().stream().map((v0) -> {
                        return v0.getTaskRestore();
                    }).filter((v0) -> {
                        return Objects.nonNull(v0);
                    }).findAny().isPresent());
                    if (jobMasterTester != null) {
                        if (0 == 0) {
                            jobMasterTester.close();
                            return;
                        }
                        try {
                            jobMasterTester.close();
                        } catch (Throwable th6) {
                            th.addSuppressed(th6);
                        }
                    }
                } catch (Throwable th7) {
                    th = th7;
                    throw th7;
                }
            } finally {
            }
        } finally {
        }
    }

    private JobGraph createJobGraph() {
        JobVertex jobVertex = new JobVertex("first");
        jobVertex.setInvokableClass(NoOpInvokable.class);
        jobVertex.setParallelism(1);
        JobVertex jobVertex2 = new JobVertex("second");
        jobVertex2.setInvokableClass(NoOpInvokable.class);
        jobVertex2.setParallelism(1);
        return JobGraphBuilder.newStreamingJobGraphBuilder().addJobVertex(jobVertex).addJobVertex(jobVertex2).setJobCheckpointingSettings(new JobCheckpointingSettings(CheckpointCoordinatorConfiguration.builder().setCheckpointInterval(20L).setMinPauseBetweenCheckpoints(20L).setCheckpointTimeout(10000L).build(), (SerializedValue) null)).build();
    }

    private TestingDispatcher createRecoveredDispatcher(@Nullable FatalErrorHandler fatalErrorHandler) throws Exception {
        ArrayList arrayList = new ArrayList();
        Iterator it = this.haServices.getJobGraphStore().getJobIds().iterator();
        while (it.hasNext()) {
            arrayList.add(this.haServices.getJobGraphStore().recoverJobGraph((JobID) it.next()));
        }
        this.haServices.setRunningJobsRegistry(new StandaloneRunningJobsRegistry());
        TestingDispatcher build = new AbstractDispatcherTest.TestingDispatcherBuilder().setJobManagerRunnerFactory(JobMasterServiceLeadershipRunnerFactory.INSTANCE).setJobGraphWriter(this.haServices.getJobGraphStore()).setInitialJobGraphs(arrayList).setFatalErrorHandler(fatalErrorHandler == null ? this.testingFatalErrorHandlerResource.getFatalErrorHandler() : fatalErrorHandler).build();
        build.start();
        return build;
    }

    private static CompletableFuture<JobMasterGateway> connectToLeadingJobMaster(TestingLeaderElectionService testingLeaderElectionService) {
        return testingLeaderElectionService.getConfirmationFuture().thenCompose(leaderConnectionInfo -> {
            return rpcService.connect(leaderConnectionInfo.getAddress(), JobMasterId.fromUuidOrNull(leaderConnectionInfo.getLeaderSessionId()), JobMasterGateway.class);
        });
    }
}
