package org.apache.flink.runtime.scheduler.adaptive;

import java.io.IOException;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.MiniClusterResource;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.SerializedThrowable;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.SupplierWithException;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerSimpleITCase.class */
public class AdaptiveSchedulerSimpleITCase extends TestLogger {
    private static final int NUMBER_TASK_MANAGERS = 2;
    private static final int NUMBER_SLOTS_PER_TASK_MANAGER = 2;
    private static final int PARALLELISM = 10;
    private static final Configuration configuration = getConfiguration();

    @ClassRule
    public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(configuration).setNumberTaskManagers(2).setNumberSlotsPerTaskManager(2).build());

    /* loaded from: input_file:org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerSimpleITCase$AlwaysFailingInvokable.class */
    public static final class AlwaysFailingInvokable extends AbstractInvokable {
        public AlwaysFailingInvokable(Environment environment) {
            super(environment);
        }

        public void invoke() throws Exception {
            throw new FlinkRuntimeException("Test failure.");
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerSimpleITCase$OnceFailingInvokable.class */
    public static final class OnceFailingInvokable extends AbstractInvokable {
        private static volatile boolean hasFailed = false;

        public OnceFailingInvokable(Environment environment) {
            super(environment);
        }

        public void invoke() throws Exception {
            if (hasFailed || getIndexInSubtaskGroup() != 0) {
                return;
            }
            hasFailed = true;
            throw new FlinkRuntimeException("Test failure.");
        }

        /* JADX INFO: Access modifiers changed from: private */
        public static void reset() {
            hasFailed = false;
        }
    }

    private static Configuration getConfiguration() {
        Configuration configuration2 = new Configuration();
        configuration2.set(JobManagerOptions.SCHEDULER, JobManagerOptions.SchedulerType.Adaptive);
        configuration2.set(JobManagerOptions.RESOURCE_STABILIZATION_TIMEOUT, Duration.ofMillis(100L));
        return configuration2;
    }

    @Test
    public void testSchedulingOfSimpleJob() throws Exception {
        MiniCluster miniCluster = MINI_CLUSTER_RESOURCE.getMiniCluster();
        JobGraph createJobGraph = createJobGraph();
        miniCluster.submitJob(createJobGraph).join();
        JobResult jobResult = (JobResult) miniCluster.requestJobResult(createJobGraph.getJobID()).join();
        jobResult.toJobExecutionResult(getClass().getClassLoader());
        Assert.assertTrue(jobResult.isSuccess());
    }

    private JobGraph createJobGraph() {
        JobVertex jobVertex = new JobVertex("Source");
        jobVertex.setInvokableClass(NoOpInvokable.class);
        jobVertex.setParallelism(10);
        JobVertex jobVertex2 = new JobVertex("sink");
        jobVertex2.setInvokableClass(NoOpInvokable.class);
        jobVertex2.setParallelism(10);
        jobVertex2.connectNewDataSetAsInput(jobVertex, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
        return JobGraphTestUtils.streamingJobGraph(jobVertex, jobVertex2);
    }

    @Test
    public void testJobCancellationWhileRestartingSucceeds() throws Exception {
        MiniCluster miniCluster = MINI_CLUSTER_RESOURCE.getMiniCluster();
        JobVertex jobVertex = new JobVertex("Always failing operator");
        jobVertex.setInvokableClass(AlwaysFailingInvokable.class);
        jobVertex.setParallelism(1);
        JobGraph streamingJobGraph = JobGraphTestUtils.streamingJobGraph(jobVertex);
        ExecutionConfig executionConfig = new ExecutionConfig();
        executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 10000L));
        streamingJobGraph.setExecutionConfig(executionConfig);
        miniCluster.submitJob(streamingJobGraph).join();
        CommonTestUtils.waitUntilCondition((SupplierWithException<Boolean, Exception>) () -> {
            return Boolean.valueOf(miniCluster.getJobStatus(streamingJobGraph.getJobID()).get() == JobStatus.RESTARTING);
        }, Deadline.fromNow(Duration.of(10000L, ChronoUnit.MILLIS)), 5L);
        miniCluster.cancelJob(streamingJobGraph.getJobID()).get();
    }

    @Test
    public void testGlobalFailoverIfTaskFails() throws Throwable {
        MiniCluster miniCluster = MINI_CLUSTER_RESOURCE.getMiniCluster();
        JobGraph createOnceFailingJobGraph = createOnceFailingJobGraph();
        miniCluster.submitJob(createOnceFailingJobGraph).join();
        JobResult jobResult = (JobResult) miniCluster.requestJobResult(createOnceFailingJobGraph.getJobID()).join();
        if (!jobResult.isSuccess()) {
            throw ((SerializedThrowable) jobResult.getSerializedThrowable().get()).deserializeError(ClassLoader.getSystemClassLoader());
        }
    }

    private JobGraph createOnceFailingJobGraph() throws IOException {
        JobVertex jobVertex = new JobVertex("Once failing operator");
        OnceFailingInvokable.reset();
        jobVertex.setInvokableClass(OnceFailingInvokable.class);
        jobVertex.setParallelism(1);
        JobGraph streamingJobGraph = JobGraphTestUtils.streamingJobGraph(jobVertex);
        ExecutionConfig executionConfig = new ExecutionConfig();
        executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0L));
        streamingJobGraph.setExecutionConfig(executionConfig);
        return streamingJobGraph;
    }
}
