package org.apache.flink.runtime.scheduler;

import java.io.File;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.runtime.blob.VoidBlobWriter;
import org.apache.flink.runtime.checkpoint.CheckpointProperties;
import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.DefaultCheckpointStatsTracker;
import org.apache.flink.runtime.checkpoint.NoOpCheckpointStatsTracker;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.SubTaskInitializationMetricsBuilder;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory;
import org.apache.flink.runtime.executiongraph.DefaultVertexAttemptNumberStore;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.io.network.partition.NoOpJobMasterPartitionTracker;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.jobmaster.DefaultExecutionDeploymentTracker;
import org.apache.flink.runtime.jobmaster.TestUtils;
import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.scheduler.adaptivebatch.NonAdaptiveExecutionPlanSchedulingContext;
import org.apache.flink.runtime.shuffle.ShuffleTestUtils;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.traces.SpanBuilder;
import org.apache.flink.util.IterableUtils;
import org.apache.flink.util.clock.SystemClock;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.api.io.TempDir;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactoryTest.class */
class DefaultExecutionGraphFactoryTest {

    @TempDir
    private File tempDir;
    private File temporaryFile;
    private static final Logger log = LoggerFactory.getLogger(DefaultExecutionGraphFactoryTest.class);

    @RegisterExtension
    private static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_EXTENSION = TestingUtils.defaultExecutorExtension();

    DefaultExecutionGraphFactoryTest() {
    }

    @BeforeEach
    private void setup() {
        this.temporaryFile = new File(this.tempDir.getAbsolutePath(), "stateFile");
    }

    @Test
    void testRestoringModifiedJobFromSavepointFails() throws Exception {
        JobGraph createJobGraphWithSavepoint = createJobGraphWithSavepoint(false, 42L, 1);
        ExecutionGraphFactory createExecutionGraphFactory = createExecutionGraphFactory();
        Assertions.assertThatThrownBy(() -> {
            createExecutionGraphFactory.createAndRestoreExecutionGraph(createJobGraphWithSavepoint, new StandaloneCompletedCheckpointStore(1), new CheckpointsCleaner(), new StandaloneCheckpointIDCounter(), NoOpCheckpointStatsTracker.INSTANCE, TaskDeploymentDescriptorFactory.PartitionLocationConstraint.CAN_BE_UNKNOWN, 0L, new DefaultVertexAttemptNumberStore(), SchedulerBase.computeVertexParallelismStore(createJobGraphWithSavepoint), (executionAttemptID, executionState, executionState2) -> {
            }, resultPartitionType -> {
                return false;
            }, NonAdaptiveExecutionPlanSchedulingContext.INSTANCE, log);
        }).withFailMessage("Expected ExecutionGraph creation to fail because of non restored state.", new Object[0]).isInstanceOf(Exception.class).hasMessageContaining("Failed to rollback to checkpoint/savepoint");
    }

    @Test
    void testRestoringModifiedJobFromSavepointWithAllowNonRestoredStateSucceeds() throws Exception {
        JobGraph createJobGraphWithSavepoint = createJobGraphWithSavepoint(true, 42L, 1);
        ExecutionGraphFactory createExecutionGraphFactory = createExecutionGraphFactory();
        StandaloneCompletedCheckpointStore standaloneCompletedCheckpointStore = new StandaloneCompletedCheckpointStore(1);
        createExecutionGraphFactory.createAndRestoreExecutionGraph(createJobGraphWithSavepoint, standaloneCompletedCheckpointStore, new CheckpointsCleaner(), new StandaloneCheckpointIDCounter(), NoOpCheckpointStatsTracker.INSTANCE, TaskDeploymentDescriptorFactory.PartitionLocationConstraint.CAN_BE_UNKNOWN, 0L, new DefaultVertexAttemptNumberStore(), SchedulerBase.computeVertexParallelismStore(createJobGraphWithSavepoint), (executionAttemptID, executionState, executionState2) -> {
        }, resultPartitionType -> {
            return false;
        }, NonAdaptiveExecutionPlanSchedulingContext.INSTANCE, log);
        CompletedCheckpoint latestCheckpoint = standaloneCompletedCheckpointStore.getLatestCheckpoint();
        Assertions.assertThat(latestCheckpoint).isNotNull();
        Assertions.assertThat(latestCheckpoint.getCheckpointID()).isEqualTo(42L);
    }

    @Test
    void testCheckpointStatsTrackerUpdatedWithNewParallelism() throws Exception {
        JobGraph createJobGraphWithSavepoint = createJobGraphWithSavepoint(true, 42L, 2);
        final ArrayList arrayList = new ArrayList();
        UnregisteredMetricGroups.UnregisteredJobManagerJobMetricGroup unregisteredJobManagerJobMetricGroup = new UnregisteredMetricGroups.UnregisteredJobManagerJobMetricGroup() { // from class: org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactoryTest.1
            public void addSpan(SpanBuilder spanBuilder) {
                arrayList.add(spanBuilder.build());
            }
        };
        ExecutionGraphFactory createExecutionGraphFactory = createExecutionGraphFactory(unregisteredJobManagerJobMetricGroup);
        StandaloneCompletedCheckpointStore standaloneCompletedCheckpointStore = new StandaloneCompletedCheckpointStore(1);
        DefaultCheckpointStatsTracker defaultCheckpointStatsTracker = new DefaultCheckpointStatsTracker(10, unregisteredJobManagerJobMetricGroup);
        ExecutionGraph createAndRestoreExecutionGraph = createExecutionGraphFactory.createAndRestoreExecutionGraph(createJobGraphWithSavepoint, standaloneCompletedCheckpointStore, new CheckpointsCleaner(), new StandaloneCheckpointIDCounter(), defaultCheckpointStatsTracker, TaskDeploymentDescriptorFactory.PartitionLocationConstraint.CAN_BE_UNKNOWN, 0L, new DefaultVertexAttemptNumberStore(), new VertexParallelismStore() { // from class: org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactoryTest.2
            public VertexParallelismInformation getParallelismInfo(JobVertexID jobVertexID) {
                return new DefaultVertexParallelismInfo(1, 1337, num -> {
                    return Optional.empty();
                });
            }

            public Map<JobVertexID, VertexParallelismInformation> getAllParallelismInfo() {
                return Collections.emptyMap();
            }
        }, (executionAttemptID, executionState, executionState2) -> {
        }, resultPartitionType -> {
            return false;
        }, NonAdaptiveExecutionPlanSchedulingContext.INSTANCE, log);
        defaultCheckpointStatsTracker.reportRestoredCheckpoint(42L, CheckpointProperties.forSavepoint(false, SavepointFormatType.NATIVE), "foo", 1337L);
        Set set = (Set) IterableUtils.toStream(createAndRestoreExecutionGraph.getAllExecutionVertices()).map((v0) -> {
            return v0.getCurrentExecutionAttempt();
        }).map((v0) -> {
            return v0.getAttemptId();
        }).collect(Collectors.toSet());
        Assertions.assertThat(set).hasSize(1);
        defaultCheckpointStatsTracker.reportInitializationMetrics((ExecutionAttemptID) set.iterator().next(), new SubTaskInitializationMetricsBuilder(SystemClock.getInstance().absoluteTimeMillis()).build());
        Assertions.assertThat(arrayList).hasSize(1);
    }

    private ExecutionGraphFactory createExecutionGraphFactory() {
        return createExecutionGraphFactory(UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup());
    }

    private ExecutionGraphFactory createExecutionGraphFactory(JobManagerJobMetricGroup jobManagerJobMetricGroup) {
        return new DefaultExecutionGraphFactory(new Configuration(), ClassLoader.getSystemClassLoader(), new DefaultExecutionDeploymentTracker(), (ScheduledExecutorService) EXECUTOR_EXTENSION.getExecutor(), EXECUTOR_EXTENSION.getExecutor(), Duration.ofMillis(0L), jobManagerJobMetricGroup, VoidBlobWriter.getInstance(), ShuffleTestUtils.DEFAULT_SHUFFLE_MASTER, NoOpJobMasterPartitionTracker.INSTANCE);
    }

    @Nonnull
    private JobGraph createJobGraphWithSavepoint(boolean z, long j, int i) throws IOException {
        SavepointRestoreSettings forPath = SavepointRestoreSettings.forPath(TestUtils.createSavepointWithOperatorState(this.temporaryFile, j, new OperatorID()).getAbsolutePath(), z);
        JobVertex jobVertex = new JobVertex("New operator");
        jobVertex.setInvokableClass(NoOpInvokable.class);
        jobVertex.setParallelism(i);
        return TestUtils.createJobGraphFromJobVerticesWithCheckpointing(forPath, jobVertex);
    }
}
