package org.apache.flink.runtime.jobmaster.slotpool;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.JobID;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutorService;
import org.apache.flink.runtime.jobmaster.slotpool.DeclareResourceRequirementServiceConnectionManager;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.slots.ResourceRequirement;
import org.apache.flink.runtime.slots.ResourceRequirements;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.concurrent.FutureUtils;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclareResourceRequirementServiceConnectionManagerTest.class */
class DefaultDeclareResourceRequirementServiceConnectionManagerTest {
    private final ManuallyTriggeredScheduledExecutorService scheduledExecutor = new ManuallyTriggeredScheduledExecutorService();
    private final JobID jobId = new JobID();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclareResourceRequirementServiceConnectionManagerTest$FailingDeclareResourceRequirementsService.class */
    public static final class FailingDeclareResourceRequirementsService implements DeclareResourceRequirementServiceConnectionManager.DeclareResourceRequirementsService {
        private final BlockingQueue<ResourceRequirements> resourceRequirements = new ArrayBlockingQueue(2);
        private final OneShotLatch declareResourceRequirementsLatch = new OneShotLatch();
        private int failureCounter;

        private FailingDeclareResourceRequirementsService(int i) {
            this.failureCounter = i;
        }

        public CompletableFuture<Acknowledge> declareResourceRequirements(ResourceRequirements resourceRequirements) {
            if (this.failureCounter <= 0) {
                this.resourceRequirements.offer(resourceRequirements);
                return CompletableFuture.completedFuture(Acknowledge.get());
            }
            this.failureCounter--;
            this.declareResourceRequirementsLatch.trigger();
            return FutureUtils.completedExceptionally(new FlinkException("Test exception"));
        }

        private boolean hasResourceRequirements() {
            return !this.resourceRequirements.isEmpty();
        }

        private ResourceRequirements nextResourceRequirements() throws InterruptedException {
            return this.resourceRequirements.take();
        }

        public void waitForResourceRequirementsDeclaration() throws InterruptedException {
            this.declareResourceRequirementsLatch.await();
        }
    }

    DefaultDeclareResourceRequirementServiceConnectionManagerTest() {
    }

    @Test
    void testIgnoreDeclareResourceRequirementsIfNotConnected() {
        createResourceManagerConnectionManager().declareResourceRequirements(createResourceRequirements());
    }

    @Test
    void testDeclareResourceRequirementsSendsRequirementsIfConnected() {
        DeclareResourceRequirementServiceConnectionManager createResourceManagerConnectionManager = createResourceManagerConnectionManager();
        CompletableFuture completableFuture = new CompletableFuture();
        createResourceManagerConnectionManager.connect(resourceRequirements -> {
            completableFuture.complete(resourceRequirements);
            return CompletableFuture.completedFuture(Acknowledge.get());
        });
        ResourceRequirements createResourceRequirements = createResourceRequirements();
        createResourceManagerConnectionManager.declareResourceRequirements(createResourceRequirements);
        Assertions.assertThat((ResourceRequirements) completableFuture.join()).isEqualTo(createResourceRequirements);
    }

    @Test
    void testRetryDeclareResourceRequirementsIfTransmissionFailed() throws InterruptedException {
        DeclareResourceRequirementServiceConnectionManager createResourceManagerConnectionManager = createResourceManagerConnectionManager();
        FailingDeclareResourceRequirementsService failingDeclareResourceRequirementsService = new FailingDeclareResourceRequirementsService(4);
        createResourceManagerConnectionManager.connect(failingDeclareResourceRequirementsService);
        ResourceRequirements createResourceRequirements = createResourceRequirements();
        createResourceManagerConnectionManager.declareResourceRequirements(createResourceRequirements);
        this.scheduledExecutor.triggerNonPeriodicScheduledTasksWithRecursion();
        Assertions.assertThat(failingDeclareResourceRequirementsService.nextResourceRequirements()).isEqualTo(createResourceRequirements);
        Assertions.assertThat(failingDeclareResourceRequirementsService.hasResourceRequirements()).isFalse();
    }

    @Test
    void testDisconnectStopsSendingResourceRequirements() throws InterruptedException {
        runStopSendingResourceRequirementsTest((v0) -> {
            v0.disconnect();
        });
    }

    @Test
    void testCloseStopsSendingResourceRequirements() throws InterruptedException {
        runStopSendingResourceRequirementsTest((v0) -> {
            v0.close();
        });
    }

    private void runStopSendingResourceRequirementsTest(Consumer<DeclareResourceRequirementServiceConnectionManager> consumer) throws InterruptedException {
        DeclareResourceRequirementServiceConnectionManager createResourceManagerConnectionManager = createResourceManagerConnectionManager();
        FailingDeclareResourceRequirementsService failingDeclareResourceRequirementsService = new FailingDeclareResourceRequirementsService(1);
        createResourceManagerConnectionManager.connect(failingDeclareResourceRequirementsService);
        createResourceManagerConnectionManager.declareResourceRequirements(createResourceRequirements());
        failingDeclareResourceRequirementsService.waitForResourceRequirementsDeclaration();
        consumer.accept(createResourceManagerConnectionManager);
        this.scheduledExecutor.triggerNonPeriodicScheduledTasksWithRecursion();
        Assertions.assertThat(failingDeclareResourceRequirementsService.hasResourceRequirements()).isFalse();
    }

    @Test
    void testNewResourceRequirementsOverrideOldRequirements() throws InterruptedException {
        DeclareResourceRequirementServiceConnectionManager createResourceManagerConnectionManager = createResourceManagerConnectionManager();
        ResourceRequirements createResourceRequirements = createResourceRequirements(Arrays.asList(ResourceRequirement.create(ResourceProfile.UNKNOWN, 1)));
        ResourceRequirements createResourceRequirements2 = createResourceRequirements(Arrays.asList(ResourceRequirement.create(ResourceProfile.UNKNOWN, 2)));
        FailingDeclareResourceRequirementsService failingDeclareResourceRequirementsService = new FailingDeclareResourceRequirementsService(1);
        createResourceManagerConnectionManager.connect(failingDeclareResourceRequirementsService);
        createResourceManagerConnectionManager.declareResourceRequirements(createResourceRequirements);
        failingDeclareResourceRequirementsService.waitForResourceRequirementsDeclaration();
        createResourceManagerConnectionManager.declareResourceRequirements(createResourceRequirements2);
        this.scheduledExecutor.triggerNonPeriodicScheduledTasksWithRecursion();
        Assertions.assertThat(failingDeclareResourceRequirementsService.nextResourceRequirements()).isEqualTo(createResourceRequirements2);
        Assertions.assertThat(failingDeclareResourceRequirementsService.hasResourceRequirements()).isFalse();
    }

    @Nonnull
    private ResourceRequirements createResourceRequirements() {
        return createResourceRequirements(Arrays.asList(ResourceRequirement.create(ResourceProfile.UNKNOWN, 2)));
    }

    private ResourceRequirements createResourceRequirements(List<ResourceRequirement> list) {
        return ResourceRequirements.create(this.jobId, "localhost", list);
    }

    @Nonnull
    private DeclareResourceRequirementServiceConnectionManager createResourceManagerConnectionManager() {
        return DefaultDeclareResourceRequirementServiceConnectionManager.create(this.scheduledExecutor);
    }
}
