package org.apache.flink.test.scheduling;

import org.apache.flink.api.common.JobID;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobResourceRequirements;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.rest.handler.legacy.messages.ClusterOverviewWithVersion;
import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.test.junit5.InjectClusterClient;
import org.apache.flink.test.junit5.MiniClusterExtension;
import org.apache.flink.util.TestLoggerExtension;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.RegisterExtension;

@ExtendWith({TestLoggerExtension.class})
/* loaded from: input_file:org/apache/flink/test/scheduling/UpdateJobResourceRequirementsITCase.class */
public class UpdateJobResourceRequirementsITCase {
    private static final int NUMBER_OF_SLOTS = 4;

    @RegisterExtension
    private static final MiniClusterExtension MINI_CLUSTER_EXTENSION = new MiniClusterExtension(new MiniClusterResourceConfiguration.Builder().setConfiguration(createConfiguration()).setNumberSlotsPerTaskManager(NUMBER_OF_SLOTS).build());
    private RestClusterClient<?> restClusterClient;

    private static Configuration createConfiguration() {
        Configuration configuration = new Configuration();
        configuration.set(JobManagerOptions.SCHEDULER, JobManagerOptions.SchedulerType.Adaptive);
        configuration.set(WebOptions.REFRESH_INTERVAL, 50L);
        configuration.set(JobManagerOptions.SLOT_IDLE_TIMEOUT, 50L);
        return configuration;
    }

    @BeforeEach
    void beforeEach(@InjectClusterClient RestClusterClient<?> restClusterClient) {
        this.restClusterClient = restClusterClient;
    }

    @Test
    void testManualUpScalingWithNewSlotAllocation() throws Exception {
        JobVertex jobVertex = new JobVertex("Single operator");
        jobVertex.setParallelism(1);
        jobVertex.setInvokableClass(BlockingNoOpInvokable.class);
        runRescalingTest(JobGraphTestUtils.streamingJobGraph(new JobVertex[]{jobVertex}), JobResourceRequirements.newBuilder().setParallelismForJobVertex(jobVertex.getID(), 1, 2).build(), 1, 2, 2);
    }

    @Test
    void testManualUpScalingWithNoNewSlotAllocation() throws Exception {
        JobVertex jobVertex = new JobVertex("Operator1");
        jobVertex.setParallelism(1);
        jobVertex.setInvokableClass(BlockingNoOpInvokable.class);
        JobVertex jobVertex2 = new JobVertex("Operator2");
        jobVertex2.setParallelism(2);
        jobVertex2.setInvokableClass(BlockingNoOpInvokable.class);
        SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
        jobVertex.setSlotSharingGroup(slotSharingGroup);
        jobVertex2.setSlotSharingGroup(slotSharingGroup);
        runRescalingTest(JobGraphTestUtils.streamingJobGraph(new JobVertex[]{jobVertex, jobVertex2}), JobResourceRequirements.newBuilder().setParallelismForJobVertex(jobVertex.getID(), 1, 2).setParallelismForJobVertex(jobVertex2.getID(), 1, 2).build(), 3, NUMBER_OF_SLOTS, 2);
    }

    @Test
    void testManualUpScalingWithDifferentSlotSharingGroups() throws Exception {
        JobVertex jobVertex = new JobVertex("Operator1");
        jobVertex.setParallelism(1);
        jobVertex.setInvokableClass(BlockingNoOpInvokable.class);
        JobVertex jobVertex2 = new JobVertex("Operator2");
        jobVertex2.setParallelism(1);
        jobVertex2.setInvokableClass(BlockingNoOpInvokable.class);
        jobVertex.setSlotSharingGroup(new SlotSharingGroup());
        jobVertex2.setSlotSharingGroup(new SlotSharingGroup());
        JobGraph streamingJobGraph = JobGraphTestUtils.streamingJobGraph(new JobVertex[]{jobVertex, jobVertex2});
        runRescalingTest(streamingJobGraph, JobResourceRequirements.newBuilder().setParallelismForJobVertex(jobVertex.getID(), 1, 2).setParallelismForJobVertex(jobVertex2.getID(), 1, 2).build(), 2, NUMBER_OF_SLOTS, NUMBER_OF_SLOTS - (streamingJobGraph.getSlotSharingGroups().size() * 2));
    }

    @Test
    void testManualDownScaling() throws Exception {
        JobVertex jobVertex = new JobVertex("Operator");
        jobVertex.setParallelism(2);
        jobVertex.setInvokableClass(BlockingNoOpInvokable.class);
        runRescalingTest(JobGraphTestUtils.streamingJobGraph(new JobVertex[]{jobVertex}), JobResourceRequirements.newBuilder().setParallelismForJobVertex(jobVertex.getID(), 1, 1).build(), 2, 1, 3);
    }

    private void runRescalingTest(JobGraph jobGraph, JobResourceRequirements jobResourceRequirements, int i, int i2, int i3) throws Exception {
        this.restClusterClient.submitJob(jobGraph).join();
        try {
            JobID jobID = jobGraph.getJobID();
            waitForRunningTasks(this.restClusterClient, jobID, i);
            this.restClusterClient.updateJobResourceRequirements(jobID, jobResourceRequirements).join();
            waitForRunningTasks(this.restClusterClient, jobID, i2);
            waitForAvailableSlots(this.restClusterClient, i3);
            this.restClusterClient.cancel(jobGraph.getJobID()).join();
        } catch (Throwable th) {
            this.restClusterClient.cancel(jobGraph.getJobID()).join();
            throw th;
        }
    }

    private static int getNumberRunningTasks(RestClusterClient<?> restClusterClient, JobID jobID) {
        return ((JobDetailsInfo) restClusterClient.getJobDetails(jobID).join()).getJobVertexInfos().stream().map((v0) -> {
            return v0.getTasksPerState();
        }).map(map -> {
            return (Integer) map.get(ExecutionState.RUNNING);
        }).mapToInt((v0) -> {
            return v0.intValue();
        }).sum();
    }

    public static void waitForRunningTasks(RestClusterClient<?> restClusterClient, JobID jobID, int i) throws Exception {
        CommonTestUtils.waitUntilCondition(() -> {
            return Boolean.valueOf(getNumberRunningTasks(restClusterClient, jobID) == i);
        });
    }

    public static void waitForAvailableSlots(RestClusterClient<?> restClusterClient, int i) throws Exception {
        CommonTestUtils.waitUntilCondition(() -> {
            return Boolean.valueOf(((ClusterOverviewWithVersion) restClusterClient.getClusterOverview().join()).getNumSlotsAvailable() == i);
        });
    }
}
