/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.resourcemanager;

import java.time.Duration;
import java.util.Collection;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.core.testutils.FlinkCompletableFutureAssert;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.instance.HardwareDescription;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.leaderelection.TestingLeaderElection;
import org.apache.flink.runtime.registration.RegistrationResponse;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
import org.apache.flink.runtime.resourcemanager.ResourceOverview;
import org.apache.flink.runtime.resourcemanager.TaskExecutorRegistration;
import org.apache.flink.runtime.resourcemanager.TaskManagerInfoWithSlots;
import org.apache.flink.runtime.resourcemanager.TestingResourceManagerService;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.rpc.exceptions.FencingTokenException;
import org.apache.flink.runtime.taskexecutor.SlotReport;
import org.apache.flink.runtime.taskexecutor.SlotStatus;
import org.apache.flink.runtime.taskexecutor.TaskExecutorMemoryConfiguration;
import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.util.FlinkException;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

class ResourceManagerTaskExecutorTest {
    private static final Duration TIMEOUT = TestingUtils.infiniteTime();
    private static final ResourceProfile DEFAULT_SLOT_PROFILE = ResourceProfile.fromResources((double)1.0, (int)1234);
    @RegisterExtension
    static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_EXTENSION = TestingUtils.defaultExecutorExtension();
    private static TestingRpcService rpcService;
    private TestingTaskExecutorGateway taskExecutorGateway;
    private final int dataPort = 1234;
    private final int jmxPort = 23456;
    private final HardwareDescription hardwareDescription = new HardwareDescription(1, 2L, 3L, 4L);
    private ResourceID taskExecutorResourceID;
    private TestingResourceManagerService rmService;
    private ResourceManagerGateway rmGateway;
    private ResourceManagerGateway wronglyFencedGateway;

    ResourceManagerTaskExecutorTest() {
    }

    @BeforeAll
    static void setupClass() {
        rpcService = new TestingRpcService();
    }

    @BeforeEach
    void setup() throws Exception {
        rpcService = new TestingRpcService();
        this.createAndRegisterTaskExecutorGateway();
        this.taskExecutorResourceID = ResourceID.generate();
        this.createAndStartResourceManager();
        this.wronglyFencedGateway = rpcService.connect(this.rmGateway.getAddress(), ResourceManagerId.generate(), ResourceManagerGateway.class).get();
    }

    private void createAndRegisterTaskExecutorGateway() {
        this.taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway();
        rpcService.registerGateway(this.taskExecutorGateway.getAddress(), (RpcGateway)this.taskExecutorGateway);
    }

    private void createAndStartResourceManager() throws Exception {
        TestingLeaderElection leaderElection = new TestingLeaderElection();
        this.rmService = TestingResourceManagerService.newBuilder().setRpcService(rpcService).setRmLeaderElection(leaderElection).build();
        this.rmService.start();
        this.rmService.isLeader(UUID.randomUUID()).join();
        this.rmGateway = this.rmService.getResourceManagerGateway().orElseThrow(() -> new AssertionError((Object)"RM not available after confirming leadership."));
    }

    @AfterEach
    void teardown() throws Exception {
        if (this.rmService != null) {
            this.rmService.rethrowFatalErrorIfAny();
            this.rmService.cleanUp();
        }
    }

    @AfterAll
    static void teardownClass() throws Exception {
        if (rpcService != null) {
            RpcUtils.terminateRpcService((RpcService[])new RpcService[]{rpcService});
        }
    }

    @Test
    void testRegisterTaskExecutor() throws Exception {
        CompletableFuture<RegistrationResponse> successfulFuture = this.registerTaskExecutor(this.rmGateway, this.taskExecutorGateway.getAddress());
        RegistrationResponse response = successfulFuture.get();
        Assertions.assertThat((Object)response).isInstanceOf(TaskExecutorRegistrationSuccess.class);
        TaskManagerInfoWithSlots taskManagerInfoWithSlots = (TaskManagerInfoWithSlots)this.rmGateway.requestTaskManagerDetailsInfo(this.taskExecutorResourceID, TIMEOUT).get();
        Assertions.assertThat((Object)taskManagerInfoWithSlots.getTaskManagerInfo().getResourceId()).isEqualTo((Object)this.taskExecutorResourceID);
        CompletableFuture<RegistrationResponse> duplicateFuture = this.registerTaskExecutor(this.rmGateway, this.taskExecutorGateway.getAddress());
        RegistrationResponse duplicateResponse = duplicateFuture.get();
        Assertions.assertThat((Object)duplicateResponse).isInstanceOf(TaskExecutorRegistrationSuccess.class);
        Assertions.assertThat((Comparable)((TaskExecutorRegistrationSuccess)response).getRegistrationId()).isNotEqualTo((Object)((TaskExecutorRegistrationSuccess)duplicateResponse).getRegistrationId());
        Assertions.assertThat((int)((ResourceOverview)this.rmGateway.requestResourceOverview(TIMEOUT).get()).getNumberTaskManagers()).isOne();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testDelayedRegisterTaskExecutor() throws Exception {
        Duration fastTimeout = Duration.ofMillis(1L);
        try {
            OneShotLatch startConnection = new OneShotLatch();
            OneShotLatch finishConnection = new OneShotLatch();
            rpcService.setRpcGatewayFutureFunction(rpcGateway -> CompletableFuture.supplyAsync(() -> {
                startConnection.trigger();
                try {
                    finishConnection.await();
                }
                catch (InterruptedException interruptedException) {
                    // empty catch block
                }
                return rpcGateway;
            }, EXECUTOR_EXTENSION.getExecutor()));
            TaskExecutorRegistration taskExecutorRegistration = new TaskExecutorRegistration(this.taskExecutorGateway.getAddress(), this.taskExecutorResourceID, 1234, 23456, this.hardwareDescription, new TaskExecutorMemoryConfiguration(Long.valueOf(1L), Long.valueOf(2L), Long.valueOf(3L), Long.valueOf(4L), Long.valueOf(5L), Long.valueOf(6L), Long.valueOf(7L), Long.valueOf(8L), Long.valueOf(9L), Long.valueOf(10L)), DEFAULT_SLOT_PROFILE, DEFAULT_SLOT_PROFILE, this.taskExecutorGateway.getAddress());
            CompletableFuture firstFuture = this.rmGateway.registerTaskExecutor(taskExecutorRegistration, fastTimeout);
            ((FlinkCompletableFutureAssert)FlinkAssertions.assertThatFuture((CompletableFuture)firstFuture).as("Should have failed because connection to taskmanager is delayed beyond timeout", new Object[0])).eventuallyFails().withThrowableOfType(Exception.class).withCauseInstanceOf(TimeoutException.class).withMessageContaining("ResourceManagerGateway.registerTaskExecutor");
            startConnection.await();
            rpcService.resetRpcGatewayFutureFunction();
            CompletableFuture secondFuture = this.rmGateway.registerTaskExecutor(taskExecutorRegistration, TIMEOUT);
            RegistrationResponse response = (RegistrationResponse)secondFuture.get();
            Assertions.assertThat((Object)response).isInstanceOf(TaskExecutorRegistrationSuccess.class);
            SlotReport slotReport = new SlotReport(new SlotStatus(new SlotID(this.taskExecutorResourceID, 0), ResourceProfile.ANY));
            this.rmGateway.sendSlotReport(this.taskExecutorResourceID, ((TaskExecutorRegistrationSuccess)response).getRegistrationId(), slotReport, TIMEOUT).get();
            finishConnection.trigger();
            Thread.sleep(1L);
            TaskManagerInfoWithSlots taskManagerInfoWithSlots = (TaskManagerInfoWithSlots)this.rmGateway.requestTaskManagerDetailsInfo(this.taskExecutorResourceID, TIMEOUT).get();
            Assertions.assertThat((Object)taskManagerInfoWithSlots.getTaskManagerInfo().getResourceId()).isEqualTo((Object)this.taskExecutorResourceID);
            Assertions.assertThat((int)taskManagerInfoWithSlots.getTaskManagerInfo().getNumberSlots()).isOne();
        }
        finally {
            rpcService.resetRpcGatewayFutureFunction();
        }
    }

    @Test
    void testDisconnectTaskExecutor() throws Exception {
        int numberSlots = 10;
        TaskExecutorRegistration taskExecutorRegistration = new TaskExecutorRegistration(this.taskExecutorGateway.getAddress(), this.taskExecutorResourceID, 1234, 23456, this.hardwareDescription, new TaskExecutorMemoryConfiguration(Long.valueOf(1L), Long.valueOf(2L), Long.valueOf(3L), Long.valueOf(4L), Long.valueOf(5L), Long.valueOf(6L), Long.valueOf(7L), Long.valueOf(8L), Long.valueOf(9L), Long.valueOf(10L)), DEFAULT_SLOT_PROFILE, DEFAULT_SLOT_PROFILE.multiply(10), this.taskExecutorGateway.getAddress());
        RegistrationResponse registrationResponse = (RegistrationResponse)this.rmGateway.registerTaskExecutor(taskExecutorRegistration, TIMEOUT).get();
        Assertions.assertThat((Object)registrationResponse).isInstanceOf(TaskExecutorRegistrationSuccess.class);
        InstanceID registrationId = ((TaskExecutorRegistrationSuccess)registrationResponse).getRegistrationId();
        Collection<SlotStatus> slots = this.createSlots(10);
        SlotReport slotReport = new SlotReport(slots);
        this.rmGateway.sendSlotReport(this.taskExecutorResourceID, registrationId, slotReport, TIMEOUT).get();
        ResourceOverview resourceOverview = (ResourceOverview)this.rmGateway.requestResourceOverview(TIMEOUT).get();
        Assertions.assertThat((int)resourceOverview.getNumberTaskManagers()).isOne();
        Assertions.assertThat((int)resourceOverview.getNumberRegisteredSlots()).isEqualTo(10);
        this.rmGateway.disconnectTaskManager(this.taskExecutorResourceID, (Exception)new FlinkException("testDisconnectTaskExecutor"));
        ResourceOverview afterDisconnectResourceOverview = (ResourceOverview)this.rmGateway.requestResourceOverview(TIMEOUT).get();
        Assertions.assertThat((int)afterDisconnectResourceOverview.getNumberTaskManagers()).isZero();
        Assertions.assertThat((int)afterDisconnectResourceOverview.getNumberRegisteredSlots()).isZero();
    }

    private Collection<SlotStatus> createSlots(int numberSlots) {
        return IntStream.range(0, numberSlots).mapToObj(index -> new SlotStatus(new SlotID(this.taskExecutorResourceID, index), ResourceProfile.ANY)).collect(Collectors.toList());
    }

    @Test
    void testRegisterTaskExecutorWithUnmatchedLeaderSessionId() {
        CompletableFuture<RegistrationResponse> unMatchedLeaderFuture = this.registerTaskExecutor(this.wronglyFencedGateway, this.taskExecutorGateway.getAddress());
        ((FlinkCompletableFutureAssert)FlinkAssertions.assertThatFuture(unMatchedLeaderFuture).withFailMessage("Should have failed because we are using a wrongly fenced ResourceManagerGateway.", new Object[0])).eventuallyFails().withThrowableOfType(ExecutionException.class).withCauseInstanceOf(FencingTokenException.class);
    }

    @Test
    void testRegisterTaskExecutorFromInvalidAddress() {
        String invalidAddress = "/taskExecutor2";
        CompletableFuture<RegistrationResponse> invalidAddressFuture = this.registerTaskExecutor(this.rmGateway, invalidAddress);
        FlinkAssertions.assertThatFuture(invalidAddressFuture).eventuallySucceeds().isInstanceOf(RegistrationResponse.Failure.class);
    }

    private CompletableFuture<RegistrationResponse> registerTaskExecutor(ResourceManagerGateway resourceManagerGateway, String taskExecutorAddress) {
        return resourceManagerGateway.registerTaskExecutor(new TaskExecutorRegistration(taskExecutorAddress, this.taskExecutorResourceID, 1234, 23456, this.hardwareDescription, new TaskExecutorMemoryConfiguration(Long.valueOf(1L), Long.valueOf(2L), Long.valueOf(3L), Long.valueOf(4L), Long.valueOf(5L), Long.valueOf(6L), Long.valueOf(7L), Long.valueOf(8L), Long.valueOf(9L), Long.valueOf(10L)), DEFAULT_SLOT_PROFILE, DEFAULT_SLOT_PROFILE, taskExecutorAddress), TIMEOUT);
    }
}

