/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler;

import java.util.Collections;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.JobStatusListener;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.ScheduleMode;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway;
import org.apache.flink.runtime.jobmaster.slotpool.LocationPreferenceSlotSelectionStrategy;
import org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolBuilder;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolUtils;
import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
import org.apache.flink.runtime.jobmaster.slotpool.SlotSelectionStrategy;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.scheduler.SchedulerNG;
import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.CheckedSupplier;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultSchedulerBatchSchedulingTest
extends TestLogger {
    protected final Logger log = LoggerFactory.getLogger(((Object)((Object)this)).getClass());
    private static final JobID jobId = new JobID();
    private static ScheduledExecutorService singleThreadScheduledExecutorService;
    private static ComponentMainThreadExecutor mainThreadExecutor;

    @BeforeClass
    public static void setupClass() {
        singleThreadScheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
        mainThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(singleThreadScheduledExecutorService);
    }

    @AfterClass
    public static void teardownClass() {
        if (singleThreadScheduledExecutorService != null) {
            singleThreadScheduledExecutorService.shutdownNow();
        }
    }

    @Test
    public void testSchedulingOfJobWithFewerSlotsThanParallelism() throws Exception {
        int parallelism = 5;
        Time batchSlotTimeout = Time.milliseconds((long)5L);
        JobGraph jobGraph = this.createJobGraph(5);
        jobGraph.setScheduleMode(ScheduleMode.LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST);
        try (SlotPoolImpl slotPool = this.createSlotPool(mainThreadExecutor, batchSlotTimeout);){
            ArrayBlockingQueue submittedTasksQueue = new ArrayBlockingQueue(5);
            TestingTaskExecutorGateway testingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setSubmitTaskConsumer((tdd, ignored) -> {
                submittedTasksQueue.offer(tdd.getExecutionAttemptId());
                return CompletableFuture.completedFuture(Acknowledge.get());
            }).createTestingTaskExecutorGateway();
            SlotPoolUtils.offerSlots(slotPool, mainThreadExecutor, Collections.singletonList(ResourceProfile.ANY), (TaskManagerGateway)new RpcTaskManagerGateway((TaskExecutorGateway)testingTaskExecutorGateway, JobMasterId.generate()));
            SlotProvider slotProvider = this.createSlotProvider((SlotPool)slotPool, mainThreadExecutor);
            SchedulerNG scheduler = this.createScheduler(jobGraph, slotProvider, batchSlotTimeout);
            GloballyTerminalJobStatusListener jobStatusListener = new GloballyTerminalJobStatusListener();
            scheduler.registerJobStatusListener((JobStatusListener)jobStatusListener);
            this.startScheduling(scheduler, mainThreadExecutor);
            Thread.sleep(batchSlotTimeout.toMilliseconds());
            CompletableFuture<JobStatus> terminationFuture = jobStatusListener.getTerminationFuture();
            for (int i = 0; i < 5; ++i) {
                CompletableFuture submittedTaskFuture = CompletableFuture.supplyAsync(CheckedSupplier.unchecked(submittedTasksQueue::take));
                CompletableFuture.anyOf(submittedTaskFuture, terminationFuture).join();
                if (submittedTaskFuture.isDone()) {
                    this.finishExecution((ExecutionAttemptID)submittedTaskFuture.get(), scheduler, mainThreadExecutor);
                    continue;
                }
                Assert.fail((String)String.format("Job reached a globally terminal state %s before all executions were finished.", terminationFuture.get()));
            }
            MatcherAssert.assertThat((Object)terminationFuture.get(), (Matcher)Matchers.is((Object)JobStatus.FINISHED));
        }
    }

    private void finishExecution(ExecutionAttemptID executionAttemptId, SchedulerNG scheduler, ComponentMainThreadExecutor mainThreadExecutor) {
        CompletableFuture.runAsync(() -> {
            scheduler.updateTaskExecutionState(new TaskExecutionState(jobId, executionAttemptId, ExecutionState.RUNNING));
            scheduler.updateTaskExecutionState(new TaskExecutionState(jobId, executionAttemptId, ExecutionState.FINISHED));
        }, (Executor)mainThreadExecutor).join();
    }

    @Nonnull
    private SlotProvider createSlotProvider(SlotPool slotPool, ComponentMainThreadExecutor mainThreadExecutor) {
        SchedulerImpl scheduler = new SchedulerImpl((SlotSelectionStrategy)LocationPreferenceSlotSelectionStrategy.createDefault(), slotPool);
        scheduler.start(mainThreadExecutor);
        return scheduler;
    }

    private void startScheduling(SchedulerNG scheduler, ComponentMainThreadExecutor mainThreadExecutor) {
        scheduler.setMainThreadExecutor(mainThreadExecutor);
        CompletableFuture.runAsync(() -> ((SchedulerNG)scheduler).startScheduling(), (Executor)mainThreadExecutor).join();
    }

    private SlotPoolImpl createSlotPool(ComponentMainThreadExecutor mainThreadExecutor, Time batchSlotTimeout) throws Exception {
        return new SlotPoolBuilder(mainThreadExecutor).setBatchSlotTimeout(batchSlotTimeout).build();
    }

    private JobGraph createJobGraph(int parallelism) {
        JobVertex jobVertex = new JobVertex("testing task");
        jobVertex.setParallelism(parallelism);
        jobVertex.setInvokableClass(NoOpInvokable.class);
        return new JobGraph(jobId, "test job", new JobVertex[]{jobVertex});
    }

    private SchedulerNG createScheduler(JobGraph jobGraph, SlotProvider slotProvider, Time slotRequestTimeout) throws Exception {
        return SchedulerTestingUtils.createScheduler(jobGraph, slotProvider, slotRequestTimeout);
    }

    private static class GloballyTerminalJobStatusListener
    implements JobStatusListener {
        private final CompletableFuture<JobStatus> globallyTerminalJobStatusFuture = new CompletableFuture();

        private GloballyTerminalJobStatusListener() {
        }

        public void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long timestamp, Throwable error) {
            if (newJobStatus.isGloballyTerminalState()) {
                this.globallyTerminalJobStatusFuture.complete(newJobStatus);
            }
        }

        public CompletableFuture<JobStatus> getTerminationFuture() {
            return this.globallyTerminalJobStatusFuture;
        }
    }
}

