/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.recovery;

import java.nio.file.Path;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.client.ClientUtils;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.RestartStrategyOptions;
import org.apache.flink.core.testutils.AllCallbackWrapper;
import org.apache.flink.core.testutils.CustomExtension;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobResourceRequirements;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable;
import org.apache.flink.runtime.zookeeper.ZooKeeperExtension;
import org.apache.flink.test.scheduling.UpdateJobResourceRequirementsITCase;
import org.apache.flink.util.TestLoggerExtension;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.api.io.TempDir;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ExtendWith(value={TestLoggerExtension.class})
class UpdateJobResourceRequirementsRecoveryITCase {
    private static final Logger LOG = LoggerFactory.getLogger(UpdateJobResourceRequirementsRecoveryITCase.class);
    @RegisterExtension
    private static final AllCallbackWrapper<ZooKeeperExtension> ZOOKEEPER_EXTENSION = new AllCallbackWrapper((CustomExtension)new ZooKeeperExtension());

    UpdateJobResourceRequirementsRecoveryITCase() {
    }

    @Test
    void testRescaledJobGraphsWillBeRecoveredCorrectly(@TempDir Path tmpFolder) throws Exception {
        Configuration configuration = new Configuration();
        JobVertex jobVertex = new JobVertex("operator");
        jobVertex.setParallelism(1);
        jobVertex.setInvokableClass(BlockingNoOpInvokable.class);
        JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph((JobVertex[])new JobVertex[]{jobVertex});
        JobID jobId = jobGraph.getJobID();
        configuration.set(RestartStrategyOptions.RESTART_STRATEGY, (Object)"fixed-delay");
        configuration.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, (Object)Integer.MAX_VALUE);
        configuration.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_DELAY, (Object)Duration.ofMillis(100L));
        configuration.set(JobManagerOptions.SCHEDULER, (Object)JobManagerOptions.SchedulerType.Adaptive);
        configuration.set(HighAvailabilityOptions.HA_MODE, (Object)"zookeeper");
        configuration.set(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, (Object)((ZooKeeperExtension)ZOOKEEPER_EXTENSION.getCustomExtension()).getConnectString());
        configuration.set(HighAvailabilityOptions.HA_STORAGE_PATH, (Object)tmpFolder.toFile().getAbsolutePath());
        MiniClusterConfiguration miniClusterConfiguration = new MiniClusterConfiguration.Builder().setConfiguration(configuration).setNumSlotsPerTaskManager(2).build();
        RestClusterClient restClusterClient = new RestClusterClient(configuration, (Object)"foobar");
        MiniCluster miniCluster = new MiniCluster(miniClusterConfiguration);
        miniCluster.start();
        FlinkAssertions.assertThatFuture((CompletableFuture)restClusterClient.submitJob(jobGraph)).eventuallySucceeds();
        ClientUtils.waitUntilJobInitializationFinished(() -> (JobStatus)restClusterClient.getJobStatus(jobId).get(), () -> (JobResult)restClusterClient.requestJobResult(jobId).get(), (ClassLoader)this.getClass().getClassLoader());
        FlinkAssertions.assertThatFuture((CompletableFuture)restClusterClient.updateJobResourceRequirements(jobGraph.getJobID(), JobResourceRequirements.newBuilder().setParallelismForJobVertex(jobVertex.getID(), 1, 2).build())).eventuallySucceeds();
        FlinkAssertions.assertThatFuture((CompletableFuture)miniCluster.closeAsyncWithoutCleaningHighAvailabilityData()).eventuallySucceeds();
        LOG.info("Start second mini cluster to recover the persisted job.");
        try (MiniCluster recoveredMiniCluster = new MiniCluster(miniClusterConfiguration);){
            recoveredMiniCluster.start();
            UpdateJobResourceRequirementsITCase.waitForRunningTasks(restClusterClient, jobId, 2);
        }
    }
}

