/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.scheduling;

import org.apache.flink.api.common.JobID;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobResourceRequirements;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.rest.handler.legacy.messages.ClusterOverviewWithVersion;
import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.test.junit5.InjectClusterClient;
import org.apache.flink.test.junit5.MiniClusterExtension;
import org.apache.flink.util.TestLoggerExtension;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.RegisterExtension;

@ExtendWith(value={TestLoggerExtension.class})
public class UpdateJobResourceRequirementsITCase {
    private static final int NUMBER_OF_SLOTS = 4;
    @RegisterExtension
    private static final MiniClusterExtension MINI_CLUSTER_EXTENSION = new MiniClusterExtension(new MiniClusterResourceConfiguration.Builder().setConfiguration(UpdateJobResourceRequirementsITCase.createConfiguration()).setNumberSlotsPerTaskManager(4).build());
    private RestClusterClient<?> restClusterClient;

    private static Configuration createConfiguration() {
        Configuration configuration = new Configuration();
        configuration.set(JobManagerOptions.SCHEDULER, (Object)JobManagerOptions.SchedulerType.Adaptive);
        configuration.set(WebOptions.REFRESH_INTERVAL, (Object)50L);
        configuration.set(JobManagerOptions.SLOT_IDLE_TIMEOUT, (Object)50L);
        return configuration;
    }

    @BeforeEach
    void beforeEach(@InjectClusterClient RestClusterClient<?> restClusterClient) {
        this.restClusterClient = restClusterClient;
    }

    @Test
    void testManualUpScalingWithNewSlotAllocation() throws Exception {
        JobVertex jobVertex = new JobVertex("Single operator");
        boolean initialParallelism = true;
        int parallelismAfterRescaling = 2;
        jobVertex.setParallelism(1);
        jobVertex.setInvokableClass(BlockingNoOpInvokable.class);
        JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph((JobVertex[])new JobVertex[]{jobVertex});
        this.runRescalingTest(jobGraph, JobResourceRequirements.newBuilder().setParallelismForJobVertex(jobVertex.getID(), 1, 2).build(), 1, 2, 2);
    }

    @Test
    void testManualUpScalingWithNoNewSlotAllocation() throws Exception {
        int initialRunningTasks = 3;
        int runningTasksAfterRescale = 4;
        JobVertex jobVertex1 = new JobVertex("Operator1");
        jobVertex1.setParallelism(1);
        jobVertex1.setInvokableClass(BlockingNoOpInvokable.class);
        JobVertex jobVertex2 = new JobVertex("Operator2");
        jobVertex2.setParallelism(2);
        jobVertex2.setInvokableClass(BlockingNoOpInvokable.class);
        SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
        jobVertex1.setSlotSharingGroup(slotSharingGroup);
        jobVertex2.setSlotSharingGroup(slotSharingGroup);
        JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph((JobVertex[])new JobVertex[]{jobVertex1, jobVertex2});
        this.runRescalingTest(jobGraph, JobResourceRequirements.newBuilder().setParallelismForJobVertex(jobVertex1.getID(), 1, 2).setParallelismForJobVertex(jobVertex2.getID(), 1, 2).build(), 3, 4, 2);
    }

    @Test
    void testManualUpScalingWithDifferentSlotSharingGroups() throws Exception {
        boolean initialParallelism = true;
        int desiredParallelism = 2;
        int initialRunningTasks = 2;
        int runningTasksAfterRescale = 4;
        JobVertex jobVertex1 = new JobVertex("Operator1");
        jobVertex1.setParallelism(1);
        jobVertex1.setInvokableClass(BlockingNoOpInvokable.class);
        JobVertex jobVertex2 = new JobVertex("Operator2");
        jobVertex2.setParallelism(1);
        jobVertex2.setInvokableClass(BlockingNoOpInvokable.class);
        jobVertex1.setSlotSharingGroup(new SlotSharingGroup());
        jobVertex2.setSlotSharingGroup(new SlotSharingGroup());
        JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph((JobVertex[])new JobVertex[]{jobVertex1, jobVertex2});
        this.runRescalingTest(jobGraph, JobResourceRequirements.newBuilder().setParallelismForJobVertex(jobVertex1.getID(), 1, 2).setParallelismForJobVertex(jobVertex2.getID(), 1, 2).build(), 2, 4, 4 - jobGraph.getSlotSharingGroups().size() * 2);
    }

    @Test
    void testManualDownScaling() throws Exception {
        int initialRunningTasks = 2;
        boolean runningTasksAfterRescale = true;
        JobVertex jobVertex = new JobVertex("Operator");
        jobVertex.setParallelism(2);
        jobVertex.setInvokableClass(BlockingNoOpInvokable.class);
        JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph((JobVertex[])new JobVertex[]{jobVertex});
        this.runRescalingTest(jobGraph, JobResourceRequirements.newBuilder().setParallelismForJobVertex(jobVertex.getID(), 1, 1).build(), 2, 1, 3);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void runRescalingTest(JobGraph jobGraph, JobResourceRequirements newJobVertexParallelism, int initialRunningTasks, int runningTasksAfterRescale, int freeSlotsAfterRescale) throws Exception {
        this.restClusterClient.submitJob(jobGraph).join();
        try {
            JobID jobId = jobGraph.getJobID();
            UpdateJobResourceRequirementsITCase.waitForRunningTasks(this.restClusterClient, jobId, initialRunningTasks);
            this.restClusterClient.updateJobResourceRequirements(jobId, newJobVertexParallelism).join();
            UpdateJobResourceRequirementsITCase.waitForRunningTasks(this.restClusterClient, jobId, runningTasksAfterRescale);
            UpdateJobResourceRequirementsITCase.waitForAvailableSlots(this.restClusterClient, freeSlotsAfterRescale);
        }
        finally {
            this.restClusterClient.cancel(jobGraph.getJobID()).join();
        }
    }

    private static int getNumberRunningTasks(RestClusterClient<?> restClusterClient, JobID jobId) {
        JobDetailsInfo jobDetailsInfo = (JobDetailsInfo)restClusterClient.getJobDetails(jobId).join();
        return jobDetailsInfo.getJobVertexInfos().stream().map(JobDetailsInfo.JobVertexDetailsInfo::getTasksPerState).map(tasksPerState -> (Integer)tasksPerState.get(ExecutionState.RUNNING)).mapToInt(Integer::intValue).sum();
    }

    public static void waitForRunningTasks(RestClusterClient<?> restClusterClient, JobID jobId, int desiredNumberOfRunningTasks) throws Exception {
        CommonTestUtils.waitUntilCondition(() -> {
            int numberOfRunningTasks = UpdateJobResourceRequirementsITCase.getNumberRunningTasks(restClusterClient, jobId);
            return numberOfRunningTasks == desiredNumberOfRunningTasks;
        });
    }

    public static void waitForAvailableSlots(RestClusterClient<?> restClusterClient, int desiredNumberOfAvailableSlots) throws Exception {
        CommonTestUtils.waitUntilCondition(() -> {
            ClusterOverviewWithVersion clusterOverview = (ClusterOverviewWithVersion)restClusterClient.getClusterOverview().join();
            return clusterOverview.getNumSlotsAvailable() == desiredNumberOfAvailableSlots;
        });
    }
}

