package org.apache.flink.runtime.scheduler.adaptive;

import java.time.Duration;
import java.util.Comparator;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.apache.flink.core.testutils.ScheduledTask;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.scheduler.adaptive.AdaptiveSchedulerTest;
import org.apache.flink.runtime.scheduler.adaptive.WaitingForResources;
import org.apache.flink.util.clock.Clock;
import org.apache.flink.util.clock.ManualClock;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.AtomicBooleanAssert;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/scheduler/adaptive/WaitingForResourcesTest.class */
class WaitingForResourcesTest {
    private static final Logger LOG = LoggerFactory.getLogger(WaitingForResourcesTest.class);
    private static final Duration STABILIZATION_TIMEOUT = Duration.ofSeconds(1);

    @RegisterExtension
    MockContext ctx = new MockContext();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/scheduler/adaptive/WaitingForResourcesTest$ManualTestTime.class */
    public static final class ManualTestTime {
        private static final Logger LOG = LoggerFactory.getLogger(ManualTestTime.class);
        private final ManualClock testingClock;
        private final Consumer<Duration> runOnAdvance;
        private Duration durationSinceTestStart;

        private ManualTestTime(Consumer<Duration> consumer) {
            this.testingClock = new ManualClock();
            this.durationSinceTestStart = Duration.ZERO;
            this.runOnAdvance = consumer;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public Clock getClock() {
            return this.testingClock;
        }

        public void advanceMillis(long j) {
            this.durationSinceTestStart = this.durationSinceTestStart.plusMillis(j);
            LOG.info("Advance testing time by {} ms to time {} ms", Long.valueOf(j), Long.valueOf(this.durationSinceTestStart.toMillis()));
            this.testingClock.advanceTime(j, TimeUnit.MILLISECONDS);
            this.runOnAdvance.accept(this.durationSinceTestStart);
        }

        public Duration getTestDuration() {
            return this.durationSinceTestStart;
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/scheduler/adaptive/WaitingForResourcesTest$MockContext.class */
    private static class MockContext extends MockStateWithoutExecutionGraphContext implements WaitingForResources.Context {
        private static final Logger LOG = LoggerFactory.getLogger(MockContext.class);
        private final StateValidator<Void> creatingExecutionGraphStateValidator;
        private Supplier<Boolean> hasDesiredResourcesSupplier;
        private Supplier<Boolean> hasSufficientResourcesSupplier;
        private final Queue<ScheduledTask<Void>> scheduledTasks;
        private final ManualTestTime testTime;

        private MockContext() {
            this.creatingExecutionGraphStateValidator = new StateValidator<>("executing");
            this.hasDesiredResourcesSupplier = () -> {
                return false;
            };
            this.hasSufficientResourcesSupplier = () -> {
                return false;
            };
            this.scheduledTasks = new PriorityQueue(Comparator.comparingLong(scheduledTask -> {
                return scheduledTask.getDelay(TimeUnit.MILLISECONDS);
            }));
            this.testTime = new ManualTestTime(duration -> {
                runScheduledTasks(duration.toMillis());
            });
        }

        public void setHasDesiredResources(Supplier<Boolean> supplier) {
            this.hasDesiredResourcesSupplier = supplier;
        }

        public void setHasSufficientResources(Supplier<Boolean> supplier) {
            this.hasSufficientResourcesSupplier = supplier;
        }

        void setExpectCreatingExecutionGraph() {
            this.creatingExecutionGraphStateValidator.expectInput(r1 -> {
            });
        }

        void runScheduledTasks(long j) {
            LOG.info("Running scheduled tasks with a delay between 0 and {}ms:", Long.valueOf(j));
            while (this.scheduledTasks.peek() != null && this.scheduledTasks.peek().getDelay(TimeUnit.MILLISECONDS) <= j) {
                ScheduledTask<Void> poll = this.scheduledTasks.poll();
                LOG.info("Running task with delay {}", Long.valueOf(poll.getDelay(TimeUnit.MILLISECONDS)));
                poll.execute();
                if (poll.isPeriodic()) {
                    this.scheduledTasks.add(poll);
                }
            }
        }

        void runScheduledTasks() {
            runScheduledTasks(Long.MAX_VALUE);
        }

        @Override // org.apache.flink.runtime.scheduler.adaptive.MockStateWithoutExecutionGraphContext
        public void afterEach(ExtensionContext extensionContext) throws Exception {
            super.afterEach(extensionContext);
            this.creatingExecutionGraphStateValidator.close();
        }

        public boolean hasDesiredResources() {
            return this.hasDesiredResourcesSupplier.get().booleanValue();
        }

        public boolean hasSufficientResources() {
            return this.hasSufficientResourcesSupplier.get().booleanValue();
        }

        public ScheduledFuture<?> runIfState(State state, Runnable runnable, Duration duration) {
            LOG.info("Scheduling work with delay {} for earliest execution at {}", Long.valueOf(duration.toMillis()), Long.valueOf(this.testTime.getClock().absoluteTimeMillis() + duration.toMillis()));
            ScheduledTask<Void> scheduledTask = new ScheduledTask<>(() -> {
                if (hasStateTransition()) {
                    return null;
                }
                runnable.run();
                return null;
            }, this.testTime.getClock().absoluteTimeMillis() + duration.toMillis());
            this.scheduledTasks.add(scheduledTask);
            return scheduledTask;
        }

        public void goToCreatingExecutionGraph(@Nullable ExecutionGraph executionGraph) {
            this.creatingExecutionGraphStateValidator.validateInput(null);
            registerStateTransition();
        }

        public Clock getClock() {
            return this.testTime.getClock();
        }

        public void advanceTimeByMillis(long j) {
            this.testTime.advanceMillis(j);
        }

        public Duration getTestDuration() {
            return this.testTime.getTestDuration();
        }
    }

    WaitingForResourcesTest() {
    }

    @Test
    void testTransitionToCreatingExecutionGraph() {
        this.ctx.setHasDesiredResources(() -> {
            return true;
        });
        this.ctx.setExpectCreatingExecutionGraph();
        new WaitingForResources(this.ctx, LOG, Duration.ZERO, STABILIZATION_TIMEOUT);
        this.ctx.runScheduledTasks();
    }

    @Test
    void testNotEnoughResources() {
        this.ctx.setHasDesiredResources(() -> {
            return false;
        });
        new WaitingForResources(this.ctx, LOG, Duration.ZERO, STABILIZATION_TIMEOUT).onNewResourcesAvailable();
    }

    @Test
    void testNotifyNewResourcesAvailable() {
        this.ctx.setHasDesiredResources(() -> {
            return false;
        });
        WaitingForResources waitingForResources = new WaitingForResources(this.ctx, LOG, Duration.ZERO, STABILIZATION_TIMEOUT);
        this.ctx.setHasDesiredResources(() -> {
            return true;
        });
        this.ctx.setExpectCreatingExecutionGraph();
        waitingForResources.onNewResourcesAvailable();
    }

    @Test
    void testSchedulingWithSufficientResourcesAndNoStabilizationTimeout() {
        WaitingForResources waitingForResources = new WaitingForResources(this.ctx, LOG, Duration.ofSeconds(1000L), Duration.ofMillis(0L));
        this.ctx.setHasDesiredResources(() -> {
            return false;
        });
        this.ctx.setHasSufficientResources(() -> {
            return true;
        });
        this.ctx.setExpectCreatingExecutionGraph();
        waitingForResources.onNewResourcesAvailable();
    }

    @Test
    void testNoSchedulingIfStabilizationTimeoutIsConfigured() {
        WaitingForResources waitingForResources = new WaitingForResources(this.ctx, LOG, Duration.ofSeconds(1000L), Duration.ofMillis(50000L));
        this.ctx.setHasDesiredResources(() -> {
            return false;
        });
        this.ctx.setHasSufficientResources(() -> {
            return true;
        });
        waitingForResources.onNewResourcesAvailable();
        Assertions.assertThat(this.ctx.hasStateTransition()).isFalse();
    }

    @Test
    void testSchedulingWithSufficientResourcesAfterStabilizationTimeout() {
        Duration ofMillis = Duration.ofMillis(-1L);
        Duration ofMillis2 = Duration.ofMillis(50000L);
        WaitingForResources waitingForResources = new WaitingForResources(this.ctx, LOG, ofMillis, ofMillis2, this.ctx.getClock(), (ExecutionGraph) null);
        this.ctx.setHasDesiredResources(() -> {
            return false;
        });
        this.ctx.setHasSufficientResources(() -> {
            return true;
        });
        waitingForResources.onNewResourcesAvailable();
        this.ctx.setExpectCreatingExecutionGraph();
        Duration plusMillis = ofMillis2.plusMillis(1L);
        this.ctx.advanceTimeByMillis(plusMillis.toMillis());
        this.ctx.runScheduledTasks(plusMillis.toMillis());
        Assertions.assertThat(this.ctx.hasStateTransition()).isTrue();
    }

    @Test
    void testStabilizationTimeoutReset() {
        Duration ofMillis = Duration.ofMillis(-1L);
        Duration ofMillis2 = Duration.ofMillis(50L);
        WaitingForResources waitingForResources = new WaitingForResources(this.ctx, LOG, ofMillis, ofMillis2, this.ctx.getClock(), (ExecutionGraph) null);
        this.ctx.setHasDesiredResources(() -> {
            return false;
        });
        this.ctx.setHasSufficientResources(() -> {
            return true;
        });
        this.ctx.advanceTimeByMillis(40L);
        waitingForResources.onNewResourcesAvailable();
        this.ctx.setHasSufficientResources(() -> {
            return false;
        });
        this.ctx.advanceTimeByMillis(40L);
        waitingForResources.onNewResourcesAvailable();
        this.ctx.setHasSufficientResources(() -> {
            return true;
        });
        this.ctx.advanceTimeByMillis(40L);
        waitingForResources.onNewResourcesAvailable();
        Assertions.assertThat(this.ctx.hasStateTransition()).isFalse();
        Assertions.assertThat(this.ctx.getTestDuration()).isGreaterThan(ofMillis2);
        this.ctx.setExpectCreatingExecutionGraph();
        this.ctx.advanceTimeByMillis(1L);
        Assertions.assertThat(this.ctx.hasStateTransition()).isFalse();
        this.ctx.advanceTimeByMillis(ofMillis2.toMillis());
        Assertions.assertThat(this.ctx.hasStateTransition()).isTrue();
    }

    @Test
    void testNoStateTransitionOnNoResourceTimeout() {
        this.ctx.setHasDesiredResources(() -> {
            return false;
        });
        new WaitingForResources(this.ctx, LOG, Duration.ofMillis(-1L), STABILIZATION_TIMEOUT);
        this.ctx.runScheduledTasks();
        Assertions.assertThat(this.ctx.hasStateTransition()).isFalse();
    }

    @Test
    void testStateTransitionOnResourceTimeout() {
        this.ctx.setHasDesiredResources(() -> {
            return false;
        });
        new WaitingForResources(this.ctx, LOG, Duration.ZERO, STABILIZATION_TIMEOUT);
        this.ctx.setExpectCreatingExecutionGraph();
        this.ctx.runScheduledTasks();
    }

    @Test
    void testInternalRunScheduledTasks_correctExecutionOrder() {
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        AtomicBoolean atomicBoolean2 = new AtomicBoolean(false);
        AtomicBoolean atomicBoolean3 = new AtomicBoolean(false);
        Runnable runnable = () -> {
            atomicBoolean.set(true);
        };
        Runnable runnable2 = () -> {
            ((AtomicBooleanAssert) Assertions.assertThat(atomicBoolean).as("order violated", new Object[0])).isTrue();
            atomicBoolean2.set(true);
        };
        this.ctx.runIfState(new AdaptiveSchedulerTest.DummyState(), () -> {
            ((AtomicBooleanAssert) Assertions.assertThat(atomicBoolean2).as("order violated", new Object[0])).isTrue();
            atomicBoolean3.set(true);
        }, Duration.ofMillis(999L));
        this.ctx.runIfState(new AdaptiveSchedulerTest.DummyState(), runnable, Duration.ZERO);
        this.ctx.runIfState(new AdaptiveSchedulerTest.DummyState(), runnable2, Duration.ZERO);
        this.ctx.runScheduledTasks();
        Assertions.assertThat(atomicBoolean3).isTrue();
    }

    @Test
    void testInternalRunScheduledTasks_tasksAreRemovedAfterExecution() {
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        this.ctx.runIfState(new AdaptiveSchedulerTest.DummyState(), () -> {
            ((AtomicBooleanAssert) Assertions.assertThat(atomicBoolean).as("Multiple executions", new Object[0])).isFalse();
            atomicBoolean.set(true);
        }, Duration.ZERO);
        this.ctx.runScheduledTasks();
        this.ctx.runScheduledTasks();
        Assertions.assertThat(atomicBoolean).isTrue();
    }

    @Test
    void testInternalRunScheduledTasks_upperBoundRespected() {
        this.ctx.runIfState(new AdaptiveSchedulerTest.DummyState(), () -> {
            Assertions.fail("Not expected");
        }, Duration.ofMillis(10L));
        this.ctx.runScheduledTasks(4L);
    }

    @Test
    void testInternalRunScheduledTasks_scheduleTaskFromRunnable() {
        AdaptiveSchedulerTest.DummyState dummyState = new AdaptiveSchedulerTest.DummyState();
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        this.ctx.runIfState(dummyState, () -> {
            this.ctx.runIfState(dummyState, () -> {
                atomicBoolean.set(true);
            }, Duration.ofMillis(4L));
        }, Duration.ZERO);
        this.ctx.runScheduledTasks(10L);
        Assertions.assertThat(atomicBoolean).isTrue();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static <T> Consumer<T> assertNonNull() {
        return obj -> {
            Assertions.assertThat(obj).isNotNull();
        };
    }
}
