package org.apache.flink.runtime.scheduler.adaptive;

import java.time.Duration;
import java.util.concurrent.ScheduledFuture;
import java.util.function.Function;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.scheduler.adaptive.StateTransitionManager;
import org.apache.flink.runtime.scheduler.adaptive.StateTransitions;
import org.apache.flink.runtime.scheduler.adaptive.StateWithoutExecutionGraph;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/flink/runtime/scheduler/adaptive/WaitingForResources.class */
public class WaitingForResources extends StateWithoutExecutionGraph implements ResourceListener, StateTransitionManager.Context {
    private final Context context;

    @Nullable
    private ScheduledFuture<?> resourceTimeoutFuture;

    @Nullable
    private final ExecutionGraph previousExecutionGraph;
    private final StateTransitionManager stateTransitionManager;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/runtime/scheduler/adaptive/WaitingForResources$Context.class */
    public interface Context extends StateWithoutExecutionGraph.Context, StateTransitions.ToCreatingExecutionGraph {
        boolean hasDesiredResources();

        boolean hasSufficientResources();

        ScheduledFuture<?> runIfState(State state, Runnable runnable, Duration duration);
    }

    /* loaded from: input_file:org/apache/flink/runtime/scheduler/adaptive/WaitingForResources$Factory.class */
    static class Factory implements StateFactory<WaitingForResources> {
        private final Context context;
        private final Logger log;
        private final Duration submissionResourceWaitTimeout;

        @Nullable
        private final ExecutionGraph previousExecutionGraph;
        private final Function<StateTransitionManager.Context, StateTransitionManager> stateTransitionManagerFactory;

        public Factory(Context context, Logger logger, Duration duration, Function<StateTransitionManager.Context, StateTransitionManager> function, @Nullable ExecutionGraph executionGraph) {
            this.context = context;
            this.log = logger;
            this.submissionResourceWaitTimeout = duration;
            this.previousExecutionGraph = executionGraph;
            this.stateTransitionManagerFactory = function;
        }

        @Override // org.apache.flink.runtime.scheduler.adaptive.StateFactory
        public Class<WaitingForResources> getStateClass() {
            return WaitingForResources.class;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.flink.runtime.scheduler.adaptive.StateFactory
        public WaitingForResources getState() {
            return new WaitingForResources(this.context, this.log, this.submissionResourceWaitTimeout, this.previousExecutionGraph, this.stateTransitionManagerFactory);
        }
    }

    @VisibleForTesting
    WaitingForResources(Context context, Logger logger, Duration duration, Function<StateTransitionManager.Context, StateTransitionManager> function) {
        this(context, logger, duration, null, function);
    }

    WaitingForResources(Context context, Logger logger, Duration duration, @Nullable ExecutionGraph executionGraph, Function<StateTransitionManager.Context, StateTransitionManager> function) {
        super(context, logger);
        this.context = (Context) Preconditions.checkNotNull(context);
        Preconditions.checkNotNull(duration);
        this.stateTransitionManager = function.apply(this);
        if (!duration.isNegative()) {
            this.resourceTimeoutFuture = context.runIfState(this, this::resourceTimeout, duration);
        }
        this.previousExecutionGraph = executionGraph;
        context.runIfState(this, this::checkPotentialStateTransition, Duration.ZERO);
    }

    @Override // org.apache.flink.runtime.scheduler.adaptive.State
    public void onLeave(Class<? extends State> cls) {
        if (this.resourceTimeoutFuture != null) {
            this.resourceTimeoutFuture.cancel(false);
        }
        this.stateTransitionManager.close();
        super.onLeave(cls);
    }

    @Override // org.apache.flink.runtime.scheduler.adaptive.State
    public JobStatus getJobStatus() {
        return JobStatus.CREATED;
    }

    @Override // org.apache.flink.runtime.scheduler.adaptive.ResourceListener
    public void onNewResourcesAvailable() {
        checkPotentialStateTransition();
    }

    @Override // org.apache.flink.runtime.scheduler.adaptive.ResourceListener
    public void onNewResourceRequirements() {
        checkPotentialStateTransition();
    }

    private void checkPotentialStateTransition() {
        this.stateTransitionManager.onChange();
        this.stateTransitionManager.onTrigger();
    }

    private void resourceTimeout() {
        getLogger().debug("Initial resource allocation timeout triggered: Creating ExecutionGraph with available resources.");
        transitionToSubsequentState();
    }

    @Override // org.apache.flink.runtime.scheduler.adaptive.StateTransitionManager.Context
    public boolean hasSufficientResources() {
        return this.context.hasSufficientResources();
    }

    @Override // org.apache.flink.runtime.scheduler.adaptive.StateTransitionManager.Context
    public boolean hasDesiredResources() {
        return this.context.hasDesiredResources();
    }

    @Override // org.apache.flink.runtime.scheduler.adaptive.StateTransitionManager.Context
    public void transitionToSubsequentState() {
        this.context.goToCreatingExecutionGraph(this.previousExecutionGraph);
    }

    @Override // org.apache.flink.runtime.scheduler.adaptive.StateTransitionManager.Context
    public ScheduledFuture<?> scheduleOperation(Runnable runnable, Duration duration) {
        return this.context.runIfState(this, runnable, duration);
    }
}
