/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.jobmanager.scheduler;

import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotProfile;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.instance.SlotSharingGroupId;
import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.jobmaster.slotpool.LocationPreferenceSlotSelectionStrategy;
import org.apache.flink.runtime.jobmaster.slotpool.Scheduler;
import org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
import org.apache.flink.runtime.jobmaster.slotpool.SlotSelectionStrategy;
import org.apache.flink.runtime.jobmaster.slotpool.TestingSlotPoolImpl;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
import org.junit.After;
import org.junit.Before;

public abstract class SchedulerTestBase
extends TestLogger {
    protected TestingSlotPoolSlotProvider testingSlotProvider;
    private TestingSlotPoolImpl slotPool;
    private Scheduler scheduler;
    private ComponentMainThreadExecutor componentMainThreadExecutor;

    @Before
    public void setup() throws Exception {
        JobID jobId = new JobID();
        this.slotPool = new TestingSlotPoolImpl(jobId);
        this.scheduler = new SchedulerImpl((SlotSelectionStrategy)LocationPreferenceSlotSelectionStrategy.createDefault(), (SlotPool)this.slotPool);
        this.testingSlotProvider = new TestingSlotPoolSlotProvider();
        JobMasterId jobMasterId = JobMasterId.generate();
        String jobManagerAddress = "localhost";
        this.componentMainThreadExecutor = this.getComponentMainThreadExecutor();
        this.slotPool.start(jobMasterId, "localhost", this.componentMainThreadExecutor);
        this.scheduler.start(this.componentMainThreadExecutor);
    }

    protected abstract ComponentMainThreadExecutor getComponentMainThreadExecutor();

    @After
    public void teardown() throws Exception {
        if (this.testingSlotProvider != null) {
            this.testingSlotProvider.shutdown();
            this.testingSlotProvider = null;
        }
    }

    protected final <T> T supplyInMainThreadExecutor(Supplier<T> supplier) {
        return CompletableFuture.supplyAsync(supplier, (Executor)this.componentMainThreadExecutor).join();
    }

    protected final void runInMainThreadExecutor(Runnable runnable) {
        CompletableFuture.runAsync(runnable, (Executor)this.componentMainThreadExecutor).join();
    }

    protected final class TestingSlotPoolSlotProvider
    implements SlotProvider {
        private final AtomicInteger numberOfLocalizedAssignments = new AtomicInteger();
        private final AtomicInteger numberOfNonLocalizedAssignments = new AtomicInteger();
        private final AtomicInteger numberOfUnconstrainedAssignments = new AtomicInteger();
        private final AtomicInteger numberOfHostLocalizedAssignments = new AtomicInteger();

        private TestingSlotPoolSlotProvider() {
        }

        public TaskManagerLocation addTaskManager(int numberSlots) {
            Collection acceptedSlotOffers;
            LocalTaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
            ResourceID resourceId = taskManagerLocation.getResourceID();
            try {
                SchedulerTestBase.this.supplyInMainThreadExecutor(() -> SchedulerTestBase.this.slotPool.registerTaskManager(resourceId));
            }
            catch (Exception e) {
                throw new RuntimeException("Unexpected exception occurred. This indicates a programming bug.", e);
            }
            SimpleAckingTaskManagerGateway taskManagerGateway = new SimpleAckingTaskManagerGateway();
            ArrayList<SlotOffer> slotOffers = new ArrayList<SlotOffer>(numberSlots);
            for (int i = 0; i < numberSlots; ++i) {
                SlotOffer slotOffer = new SlotOffer(new AllocationID(), i, ResourceProfile.ANY);
                slotOffers.add(slotOffer);
            }
            try {
                acceptedSlotOffers = SchedulerTestBase.this.supplyInMainThreadExecutor(() -> SchedulerTestBase.this.slotPool.offerSlots(taskManagerLocation, taskManagerGateway, slotOffers));
            }
            catch (Exception e) {
                throw new RuntimeException("Unexpected exception occurred. This indicates a programming bug.", e);
            }
            Preconditions.checkState((acceptedSlotOffers.size() == numberSlots ? 1 : 0) != 0);
            return taskManagerLocation;
        }

        public void releaseTaskManager(ResourceID resourceId) {
            try {
                SchedulerTestBase.this.supplyInMainThreadExecutor(() -> SchedulerTestBase.this.slotPool.releaseTaskManager(resourceId, null));
            }
            catch (Exception e) {
                throw new RuntimeException("Should not have happened.", e);
            }
        }

        public int getNumberOfAvailableSlots() {
            return SchedulerTestBase.this.supplyInMainThreadExecutor(() -> SchedulerTestBase.this.slotPool.getAvailableSlotsInformation().size());
        }

        public int getNumberOfLocalizedAssignments() {
            return this.numberOfLocalizedAssignments.get();
        }

        public int getNumberOfNonLocalizedAssignments() {
            return this.numberOfNonLocalizedAssignments.get();
        }

        public int getNumberOfUnconstrainedAssignments() {
            return this.numberOfUnconstrainedAssignments.get();
        }

        public int getNumberOfHostLocalizedAssignments() {
            return this.numberOfHostLocalizedAssignments.get();
        }

        public void shutdown() {
            SchedulerTestBase.this.runInMainThreadExecutor(() -> SchedulerTestBase.this.slotPool.close());
        }

        public CompletableFuture<LogicalSlot> allocateSlot(SlotRequestId slotRequestId, ScheduledUnit task, SlotProfile slotProfile, Time allocationTimeout) {
            return SchedulerTestBase.this.supplyInMainThreadExecutor(() -> SchedulerTestBase.this.scheduler.allocateSlot(task, slotProfile, allocationTimeout).thenApply(logicalSlot -> {
                switch (logicalSlot.getLocality()) {
                    case LOCAL: {
                        this.numberOfLocalizedAssignments.incrementAndGet();
                        break;
                    }
                    case UNCONSTRAINED: {
                        this.numberOfUnconstrainedAssignments.incrementAndGet();
                        break;
                    }
                    case NON_LOCAL: {
                        this.numberOfNonLocalizedAssignments.incrementAndGet();
                        break;
                    }
                    case HOST_LOCAL: {
                        this.numberOfHostLocalizedAssignments.incrementAndGet();
                        break;
                    }
                }
                return logicalSlot;
            }));
        }

        public void cancelSlotRequest(SlotRequestId slotRequestId, @Nullable SlotSharingGroupId slotSharingGroupId, Throwable cause) {
        }

        public TestingSlotPoolImpl getSlotPool() {
            return SchedulerTestBase.this.slotPool;
        }
    }
}

