package org.apache.flink.runtime.dispatcher;

import java.io.File;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.time.Duration;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.function.Supplier;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunnerFactory;
import org.apache.flink.runtime.dispatcher.runner.JobDispatcherLeaderProcessFactoryFactory;
import org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory;
import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
import org.apache.flink.runtime.entrypoint.component.FileJobGraphRetriever;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServicesWithLeadershipControl;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.TestingMiniCluster;
import org.apache.flink.runtime.minicluster.TestingMiniClusterConfiguration;
import org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory;
import org.apache.flink.runtime.rest.JobRestEndpointFactory;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:org/apache/flink/runtime/dispatcher/JobDispatcherITCase.class */
public class JobDispatcherITCase extends TestLogger {
    private static final Duration TIMEOUT = Duration.ofMinutes(10);

    @ClassRule
    public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();

    /* loaded from: input_file:org/apache/flink/runtime/dispatcher/JobDispatcherITCase$AtLeastOneCheckpointInvokable.class */
    public static class AtLeastOneCheckpointInvokable extends AbstractInvokable {
        private static final long CANCEL_SIGNAL = -2;
        private final BlockingQueue<Long> checkpointsToConfirm;
        private static volatile CountDownLatch atLeastOneCheckpointCompleted;

        /* JADX INFO: Access modifiers changed from: private */
        public static void reset() {
            atLeastOneCheckpointCompleted = new CountDownLatch(1);
        }

        public AtLeastOneCheckpointInvokable(Environment environment) {
            super(environment);
            this.checkpointsToConfirm = new ArrayBlockingQueue(1);
        }

        public void invoke() throws Exception {
            long longValue = this.checkpointsToConfirm.take().longValue();
            while (true) {
                long j = longValue;
                if (j == CANCEL_SIGNAL) {
                    return;
                }
                getEnvironment().acknowledgeCheckpoint(j, new CheckpointMetrics());
                longValue = this.checkpointsToConfirm.take().longValue();
            }
        }

        public Future<Void> cancel() throws Exception {
            this.checkpointsToConfirm.add(Long.valueOf(CANCEL_SIGNAL));
            return FutureUtils.completedVoidFuture();
        }

        /* renamed from: triggerCheckpointAsync, reason: merged with bridge method [inline-methods] */
        public CompletableFuture<Boolean> m71triggerCheckpointAsync(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
            this.checkpointsToConfirm.add(Long.valueOf(checkpointMetaData.getCheckpointId()));
            return CompletableFuture.completedFuture(true);
        }

        public Future<Void> notifyCheckpointCompleteAsync(long j) {
            atLeastOneCheckpointCompleted.countDown();
            return CompletableFuture.completedFuture(null);
        }
    }

    private Supplier<DispatcherResourceManagerComponentFactory> createJobModeDispatcherResourceManagerComponentFactorySupplier(Configuration configuration) {
        return () -> {
            try {
                return new DefaultDispatcherResourceManagerComponentFactory(new DefaultDispatcherRunnerFactory(JobDispatcherLeaderProcessFactoryFactory.create(FileJobGraphRetriever.createFrom(configuration, (File) null))), StandaloneResourceManagerFactory.getInstance(), JobRestEndpointFactory.INSTANCE);
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        };
    }

    @Test
    public void testRecoverFromCheckpointAfterLosingAndRegainingLeadership() throws Exception {
        Deadline fromNow = Deadline.fromNow(TIMEOUT);
        Configuration configuration = new Configuration();
        configuration.set(HighAvailabilityOptions.HA_MODE, HighAvailabilityMode.ZOOKEEPER.name());
        TestingMiniClusterConfiguration build = new TestingMiniClusterConfiguration.Builder().setConfiguration(configuration).build();
        EmbeddedHaServicesWithLeadershipControl embeddedHaServicesWithLeadershipControl = new EmbeddedHaServicesWithLeadershipControl(TestingUtils.defaultExecutor());
        Configuration configuration2 = new Configuration(build.getConfiguration());
        JobID generateAndPersistJobGraph = generateAndPersistJobGraph(configuration2, 100L, TEMPORARY_FOLDER.newFolder().toPath());
        AtLeastOneCheckpointInvokable.reset();
        TestingMiniCluster testingMiniCluster = new TestingMiniCluster(build, () -> {
            return embeddedHaServicesWithLeadershipControl;
        }, createJobModeDispatcherResourceManagerComponentFactorySupplier(configuration2));
        Throwable th = null;
        try {
            try {
                testingMiniCluster.start();
                AtLeastOneCheckpointInvokable.atLeastOneCheckpointCompleted.await();
                CompletableFuture requestJobResult = testingMiniCluster.requestJobResult(generateAndPersistJobGraph);
                embeddedHaServicesWithLeadershipControl.revokeDispatcherLeadership();
                Assert.assertEquals(ApplicationStatus.UNKNOWN, ((JobResult) requestJobResult.get()).getApplicationStatus());
                embeddedHaServicesWithLeadershipControl.grantDispatcherLeadership();
                awaitJobStatus(testingMiniCluster, generateAndPersistJobGraph, JobStatus.RUNNING, fromNow);
                Assert.assertNotNull(((AccessExecutionGraph) testingMiniCluster.getExecutionGraph(generateAndPersistJobGraph).get()).getCheckpointStatsSnapshot().getLatestRestoredCheckpoint());
                if (testingMiniCluster != null) {
                    if (0 == 0) {
                        testingMiniCluster.close();
                        return;
                    }
                    try {
                        testingMiniCluster.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (testingMiniCluster != null) {
                if (th != null) {
                    try {
                        testingMiniCluster.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    testingMiniCluster.close();
                }
            }
            throw th4;
        }
    }

    private JobID generateAndPersistJobGraph(Configuration configuration, long j, Path path) throws Exception {
        JobVertex jobVertex = new JobVertex("jobVertex");
        jobVertex.setInvokableClass(AtLeastOneCheckpointInvokable.class);
        jobVertex.setParallelism(1);
        JobGraph build = JobGraphBuilder.newStreamingJobGraphBuilder().addJobVertex(jobVertex).setJobCheckpointingSettings(new JobCheckpointingSettings(CheckpointCoordinatorConfiguration.builder().setCheckpointInterval(j).build(), (SerializedValue) null)).build();
        Path resolve = path.resolve((String) FileJobGraphRetriever.JOB_GRAPH_FILE_PATH.defaultValue());
        ObjectOutputStream objectOutputStream = new ObjectOutputStream(Files.newOutputStream(resolve, StandardOpenOption.CREATE));
        Throwable th = null;
        try {
            try {
                objectOutputStream.writeObject(build);
                if (objectOutputStream != null) {
                    if (0 != 0) {
                        try {
                            objectOutputStream.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        objectOutputStream.close();
                    }
                }
                configuration.setString(FileJobGraphRetriever.JOB_GRAPH_FILE_PATH.key(), resolve.toString());
                return build.getJobID();
            } finally {
            }
        } catch (Throwable th3) {
            if (objectOutputStream != null) {
                if (th != null) {
                    try {
                        objectOutputStream.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    objectOutputStream.close();
                }
            }
            throw th3;
        }
    }

    private static void awaitJobStatus(MiniCluster miniCluster, JobID jobID, JobStatus jobStatus, Deadline deadline) throws Exception {
        CommonTestUtils.waitUntilCondition(() -> {
            try {
                return Boolean.valueOf(miniCluster.getJobStatus(jobID).get() == jobStatus);
            } catch (ExecutionException e) {
                if (ExceptionUtils.findThrowable(e, FlinkJobNotFoundException.class).isPresent()) {
                    return false;
                }
                throw e;
            }
        }, deadline);
    }
}
