package org.apache.flink.runtime.resourcemanager;

import java.util.Collection;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.core.testutils.FlinkCompletableFutureAssert;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.instance.HardwareDescription;
import org.apache.flink.runtime.leaderelection.TestingLeaderElection;
import org.apache.flink.runtime.registration.RegistrationResponse;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.rpc.exceptions.FencingTokenException;
import org.apache.flink.runtime.taskexecutor.SlotReport;
import org.apache.flink.runtime.taskexecutor.SlotStatus;
import org.apache.flink.runtime.taskexecutor.TaskExecutorMemoryConfiguration;
import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.util.FlinkException;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

/* loaded from: input_file:org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.class */
class ResourceManagerTaskExecutorTest {
    private static final Time TIMEOUT = TestingUtils.infiniteTime();
    private static final ResourceProfile DEFAULT_SLOT_PROFILE = ResourceProfile.fromResources(1.0d, 1234);

    @RegisterExtension
    static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_EXTENSION = TestingUtils.defaultExecutorExtension();
    private static TestingRpcService rpcService;
    private TestingTaskExecutorGateway taskExecutorGateway;
    private final int dataPort = 1234;
    private final int jmxPort = 23456;
    private final HardwareDescription hardwareDescription = new HardwareDescription(1, 2, 3, 4);
    private ResourceID taskExecutorResourceID;
    private TestingResourceManagerService rmService;
    private ResourceManagerGateway rmGateway;
    private ResourceManagerGateway wronglyFencedGateway;

    ResourceManagerTaskExecutorTest() {
    }

    @BeforeAll
    static void setupClass() {
        rpcService = new TestingRpcService();
    }

    @BeforeEach
    void setup() throws Exception {
        rpcService = new TestingRpcService();
        createAndRegisterTaskExecutorGateway();
        this.taskExecutorResourceID = ResourceID.generate();
        createAndStartResourceManager();
        this.wronglyFencedGateway = (ResourceManagerGateway) rpcService.connect(this.rmGateway.getAddress(), ResourceManagerId.generate(), ResourceManagerGateway.class).get();
    }

    private void createAndRegisterTaskExecutorGateway() {
        this.taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway();
        rpcService.registerGateway(this.taskExecutorGateway.getAddress(), this.taskExecutorGateway);
    }

    private void createAndStartResourceManager() throws Exception {
        this.rmService = TestingResourceManagerService.newBuilder().setRpcService(rpcService).setRmLeaderElection(new TestingLeaderElection()).build();
        this.rmService.start();
        this.rmService.isLeader(UUID.randomUUID()).join();
        this.rmGateway = this.rmService.getResourceManagerGateway().orElseThrow(() -> {
            return new AssertionError("RM not available after confirming leadership.");
        });
    }

    @AfterEach
    void teardown() throws Exception {
        if (this.rmService != null) {
            this.rmService.rethrowFatalErrorIfAny();
            this.rmService.cleanUp();
        }
    }

    @AfterAll
    static void teardownClass() throws Exception {
        if (rpcService != null) {
            RpcUtils.terminateRpcService(new RpcService[]{rpcService});
        }
    }

    @Test
    void testRegisterTaskExecutor() throws Exception {
        TaskExecutorRegistrationSuccess taskExecutorRegistrationSuccess = (RegistrationResponse) registerTaskExecutor(this.rmGateway, this.taskExecutorGateway.getAddress()).get();
        Assertions.assertThat(taskExecutorRegistrationSuccess).isInstanceOf(TaskExecutorRegistrationSuccess.class);
        Assertions.assertThat(((TaskManagerInfoWithSlots) this.rmGateway.requestTaskManagerDetailsInfo(this.taskExecutorResourceID, TIMEOUT).get()).getTaskManagerInfo().getResourceId()).isEqualTo(this.taskExecutorResourceID);
        TaskExecutorRegistrationSuccess taskExecutorRegistrationSuccess2 = (RegistrationResponse) registerTaskExecutor(this.rmGateway, this.taskExecutorGateway.getAddress()).get();
        Assertions.assertThat(taskExecutorRegistrationSuccess2).isInstanceOf(TaskExecutorRegistrationSuccess.class);
        Assertions.assertThat(taskExecutorRegistrationSuccess.getRegistrationId()).isNotEqualTo(taskExecutorRegistrationSuccess2.getRegistrationId());
        Assertions.assertThat(((ResourceOverview) this.rmGateway.requestResourceOverview(TIMEOUT).get()).getNumberTaskManagers()).isOne();
    }

    @Test
    void testDelayedRegisterTaskExecutor() throws Exception {
        Time milliseconds = Time.milliseconds(1L);
        try {
            OneShotLatch oneShotLatch = new OneShotLatch();
            OneShotLatch oneShotLatch2 = new OneShotLatch();
            rpcService.setRpcGatewayFutureFunction(rpcGateway -> {
                return CompletableFuture.supplyAsync(() -> {
                    oneShotLatch.trigger();
                    try {
                        oneShotLatch2.await();
                    } catch (InterruptedException e) {
                    }
                    return rpcGateway;
                }, EXECUTOR_EXTENSION.getExecutor());
            });
            TaskExecutorRegistration taskExecutorRegistration = new TaskExecutorRegistration(this.taskExecutorGateway.getAddress(), this.taskExecutorResourceID, 1234, 23456, this.hardwareDescription, new TaskExecutorMemoryConfiguration(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L), DEFAULT_SLOT_PROFILE, DEFAULT_SLOT_PROFILE, this.taskExecutorGateway.getAddress());
            ((FlinkCompletableFutureAssert) FlinkAssertions.assertThatFuture(this.rmGateway.registerTaskExecutor(taskExecutorRegistration, milliseconds)).as("Should have failed because connection to taskmanager is delayed beyond timeout", new Object[0])).eventuallyFails().withThrowableOfType(Exception.class).withCauseInstanceOf(TimeoutException.class).withMessageContaining("ResourceManagerGateway.registerTaskExecutor");
            oneShotLatch.await();
            rpcService.resetRpcGatewayFutureFunction();
            TaskExecutorRegistrationSuccess taskExecutorRegistrationSuccess = (RegistrationResponse) this.rmGateway.registerTaskExecutor(taskExecutorRegistration, TIMEOUT).get();
            Assertions.assertThat(taskExecutorRegistrationSuccess).isInstanceOf(TaskExecutorRegistrationSuccess.class);
            this.rmGateway.sendSlotReport(this.taskExecutorResourceID, taskExecutorRegistrationSuccess.getRegistrationId(), new SlotReport(new SlotStatus(new SlotID(this.taskExecutorResourceID, 0), ResourceProfile.ANY)), TIMEOUT).get();
            oneShotLatch2.trigger();
            Thread.sleep(1L);
            TaskManagerInfoWithSlots taskManagerInfoWithSlots = (TaskManagerInfoWithSlots) this.rmGateway.requestTaskManagerDetailsInfo(this.taskExecutorResourceID, TIMEOUT).get();
            Assertions.assertThat(taskManagerInfoWithSlots.getTaskManagerInfo().getResourceId()).isEqualTo(this.taskExecutorResourceID);
            Assertions.assertThat(taskManagerInfoWithSlots.getTaskManagerInfo().getNumberSlots()).isOne();
            rpcService.resetRpcGatewayFutureFunction();
        } catch (Throwable th) {
            rpcService.resetRpcGatewayFutureFunction();
            throw th;
        }
    }

    @Test
    void testDisconnectTaskExecutor() throws Exception {
        TaskExecutorRegistrationSuccess taskExecutorRegistrationSuccess = (RegistrationResponse) this.rmGateway.registerTaskExecutor(new TaskExecutorRegistration(this.taskExecutorGateway.getAddress(), this.taskExecutorResourceID, 1234, 23456, this.hardwareDescription, new TaskExecutorMemoryConfiguration(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L), DEFAULT_SLOT_PROFILE, DEFAULT_SLOT_PROFILE.multiply(10), this.taskExecutorGateway.getAddress()), TIMEOUT).get();
        Assertions.assertThat(taskExecutorRegistrationSuccess).isInstanceOf(TaskExecutorRegistrationSuccess.class);
        this.rmGateway.sendSlotReport(this.taskExecutorResourceID, taskExecutorRegistrationSuccess.getRegistrationId(), new SlotReport(createSlots(10)), TIMEOUT).get();
        ResourceOverview resourceOverview = (ResourceOverview) this.rmGateway.requestResourceOverview(TIMEOUT).get();
        Assertions.assertThat(resourceOverview.getNumberTaskManagers()).isOne();
        Assertions.assertThat(resourceOverview.getNumberRegisteredSlots()).isEqualTo(10);
        this.rmGateway.disconnectTaskManager(this.taskExecutorResourceID, new FlinkException("testDisconnectTaskExecutor"));
        ResourceOverview resourceOverview2 = (ResourceOverview) this.rmGateway.requestResourceOverview(TIMEOUT).get();
        Assertions.assertThat(resourceOverview2.getNumberTaskManagers()).isZero();
        Assertions.assertThat(resourceOverview2.getNumberRegisteredSlots()).isZero();
    }

    private Collection<SlotStatus> createSlots(int i) {
        return (Collection) IntStream.range(0, i).mapToObj(i2 -> {
            return new SlotStatus(new SlotID(this.taskExecutorResourceID, i2), ResourceProfile.ANY);
        }).collect(Collectors.toList());
    }

    @Test
    void testRegisterTaskExecutorWithUnmatchedLeaderSessionId() {
        FlinkAssertions.assertThatFuture(registerTaskExecutor(this.wronglyFencedGateway, this.taskExecutorGateway.getAddress())).withFailMessage("Should have failed because we are using a wrongly fenced ResourceManagerGateway.", new Object[0]).eventuallyFails().withThrowableOfType(ExecutionException.class).withCauseInstanceOf(FencingTokenException.class);
    }

    @Test
    void testRegisterTaskExecutorFromInvalidAddress() {
        FlinkAssertions.assertThatFuture(registerTaskExecutor(this.rmGateway, "/taskExecutor2")).eventuallySucceeds().isInstanceOf(RegistrationResponse.Failure.class);
    }

    private CompletableFuture<RegistrationResponse> registerTaskExecutor(ResourceManagerGateway resourceManagerGateway, String str) {
        return resourceManagerGateway.registerTaskExecutor(new TaskExecutorRegistration(str, this.taskExecutorResourceID, 1234, 23456, this.hardwareDescription, new TaskExecutorMemoryConfiguration(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L), DEFAULT_SLOT_PROFILE, DEFAULT_SLOT_PROFILE, str), TIMEOUT);
    }
}
