/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.resourcemanager;

import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.instance.HardwareDescription;
import org.apache.flink.runtime.io.network.partition.NoOpResourceManagerPartitionTracker;
import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway;
import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.registration.RegistrationResponse;
import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
import org.apache.flink.runtime.resourcemanager.TaskExecutorRegistration;
import org.apache.flink.runtime.resourcemanager.TestingResourceManager;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerBuilder;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl;
import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerInfo;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.ThrowingConsumer;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

public class ResourceManagerTest
extends TestLogger {
    private static final Time TIMEOUT = Time.minutes((long)2L);
    private static final HeartbeatServices heartbeatServices = new HeartbeatServices(1000L, 10000L);
    private static final HeartbeatServices fastHeartbeatServices = new HeartbeatServices(1L, 1L);
    private static final HardwareDescription hardwareDescription = new HardwareDescription(42, 1337L, 1337L, 0L);
    private static final int dataPort = 1234;
    private static TestingRpcService rpcService;
    private TestingHighAvailabilityServices highAvailabilityServices;
    private TestingLeaderElectionService resourceManagerLeaderElectionService;
    private TestingFatalErrorHandler testingFatalErrorHandler;
    private ResourceID resourceManagerResourceId;
    private TestingResourceManager resourceManager;
    private ResourceManagerId resourceManagerId;

    @BeforeClass
    public static void setupClass() {
        rpcService = new TestingRpcService();
    }

    @Before
    public void setup() throws Exception {
        this.highAvailabilityServices = new TestingHighAvailabilityServices();
        this.resourceManagerLeaderElectionService = new TestingLeaderElectionService();
        this.highAvailabilityServices.setResourceManagerLeaderElectionService(this.resourceManagerLeaderElectionService);
        this.testingFatalErrorHandler = new TestingFatalErrorHandler();
        this.resourceManagerResourceId = ResourceID.generate();
    }

    @After
    public void after() throws Exception {
        if (this.resourceManager != null) {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint)this.resourceManager, (Time)TIMEOUT);
        }
        if (this.highAvailabilityServices != null) {
            this.highAvailabilityServices.closeAndCleanupAllData();
        }
        if (this.testingFatalErrorHandler.hasExceptionOccurred()) {
            this.testingFatalErrorHandler.rethrowError();
        }
    }

    @AfterClass
    public static void tearDownClass() throws Exception {
        if (rpcService != null) {
            RpcUtils.terminateRpcServices((Time)TIMEOUT, (RpcService[])new RpcService[]{rpcService});
        }
    }

    @Test
    public void testRequestTaskManagerInfo() throws Exception {
        ResourceID taskManagerId = ResourceID.generate();
        TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setAddress(UUID.randomUUID().toString()).createTestingTaskExecutorGateway();
        rpcService.registerGateway(taskExecutorGateway.getAddress(), (RpcGateway)taskExecutorGateway);
        this.resourceManager = this.createAndStartResourceManager(heartbeatServices);
        ResourceManagerGateway resourceManagerGateway = (ResourceManagerGateway)this.resourceManager.getSelfGateway(ResourceManagerGateway.class);
        this.registerTaskExecutor(resourceManagerGateway, taskManagerId, taskExecutorGateway.getAddress());
        CompletableFuture taskManagerInfoFuture = resourceManagerGateway.requestTaskManagerInfo(taskManagerId, TestingUtils.TIMEOUT());
        TaskManagerInfo taskManagerInfo = (TaskManagerInfo)taskManagerInfoFuture.get();
        Assert.assertEquals((Object)taskManagerId, (Object)taskManagerInfo.getResourceId());
        Assert.assertEquals((Object)hardwareDescription, (Object)taskManagerInfo.getHardwareDescription());
        Assert.assertEquals((Object)taskExecutorGateway.getAddress(), (Object)taskManagerInfo.getAddress());
        Assert.assertEquals((long)1234L, (long)taskManagerInfo.getDataPort());
        Assert.assertEquals((long)0L, (long)taskManagerInfo.getNumberSlots());
        Assert.assertEquals((long)0L, (long)taskManagerInfo.getNumberAvailableSlots());
    }

    private void registerTaskExecutor(ResourceManagerGateway resourceManagerGateway, ResourceID taskExecutorId, String taskExecutorAddress) throws Exception {
        TaskExecutorRegistration taskExecutorRegistration = new TaskExecutorRegistration(taskExecutorAddress, taskExecutorId, 1234, hardwareDescription, ResourceProfile.ZERO, ResourceProfile.ZERO);
        CompletableFuture registrationFuture = resourceManagerGateway.registerTaskExecutor(taskExecutorRegistration, TestingUtils.TIMEOUT());
        Assert.assertThat(registrationFuture.get(), (Matcher)Matchers.instanceOf(RegistrationResponse.Success.class));
    }

    @Test
    public void testHeartbeatTimeoutWithJobMaster() throws Exception {
        CompletableFuture heartbeatRequestFuture = new CompletableFuture();
        CompletableFuture disconnectFuture = new CompletableFuture();
        TestingJobMasterGateway jobMasterGateway = new TestingJobMasterGatewayBuilder().setResourceManagerHeartbeatConsumer(heartbeatRequestFuture::complete).setDisconnectResourceManagerConsumer(disconnectFuture::complete).build();
        rpcService.registerGateway(jobMasterGateway.getAddress(), (RpcGateway)jobMasterGateway);
        JobID jobId = new JobID();
        ResourceID jobMasterResourceId = ResourceID.generate();
        SettableLeaderRetrievalService jobMasterLeaderRetrievalService = new SettableLeaderRetrievalService(jobMasterGateway.getAddress(), jobMasterGateway.getFencingToken().toUUID());
        this.highAvailabilityServices.setJobMasterLeaderRetrieverFunction(requestedJobId -> {
            Assert.assertThat((Object)requestedJobId, (Matcher)Matchers.is((Matcher)Matchers.equalTo((Object)jobId)));
            return jobMasterLeaderRetrievalService;
        });
        this.runHeartbeatTimeoutTest((ThrowingConsumer<ResourceManagerGateway, Exception>)((ThrowingConsumer)resourceManagerGateway -> {
            CompletableFuture registrationFuture = resourceManagerGateway.registerJobManager(jobMasterGateway.getFencingToken(), jobMasterResourceId, jobMasterGateway.getAddress(), jobId, TIMEOUT);
            Assert.assertThat(registrationFuture.get(), (Matcher)Matchers.instanceOf(RegistrationResponse.Success.class));
        }), (ThrowingConsumer<ResourceID, Exception>)((ThrowingConsumer)resourceManagerResourceId -> {
            ResourceID optionalHeartbeatRequestOrigin = heartbeatRequestFuture.getNow(null);
            Assert.assertThat((Object)optionalHeartbeatRequestOrigin, (Matcher)Matchers.anyOf((Matcher)Matchers.is((Object)resourceManagerResourceId), (Matcher)Matchers.is((Matcher)Matchers.nullValue())));
            Assert.assertThat(disconnectFuture.get(), (Matcher)Matchers.is((Matcher)Matchers.equalTo((Object)this.resourceManagerId)));
        }));
    }

    @Test
    public void testHeartbeatTimeoutWithTaskExecutor() throws Exception {
        ResourceID taskExecutorId = ResourceID.generate();
        CompletableFuture heartbeatRequestFuture = new CompletableFuture();
        CompletableFuture disconnectFuture = new CompletableFuture();
        TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setDisconnectResourceManagerConsumer(disconnectFuture::complete).setHeartbeatResourceManagerConsumer(heartbeatRequestFuture::complete).createTestingTaskExecutorGateway();
        rpcService.registerGateway(taskExecutorGateway.getAddress(), (RpcGateway)taskExecutorGateway);
        this.runHeartbeatTimeoutTest((ThrowingConsumer<ResourceManagerGateway, Exception>)((ThrowingConsumer)resourceManagerGateway -> this.registerTaskExecutor((ResourceManagerGateway)resourceManagerGateway, taskExecutorId, taskExecutorGateway.getAddress())), (ThrowingConsumer<ResourceID, Exception>)((ThrowingConsumer)resourceManagerResourceId -> {
            ResourceID optionalHeartbeatRequestOrigin = heartbeatRequestFuture.getNow(null);
            Assert.assertThat((Object)optionalHeartbeatRequestOrigin, (Matcher)Matchers.anyOf((Matcher)Matchers.is((Object)resourceManagerResourceId), (Matcher)Matchers.is((Matcher)Matchers.nullValue())));
            Assert.assertThat(disconnectFuture.get(), (Matcher)Matchers.instanceOf(TimeoutException.class));
        }));
    }

    private void runHeartbeatTimeoutTest(ThrowingConsumer<ResourceManagerGateway, Exception> registerComponentAtResourceManager, ThrowingConsumer<ResourceID, Exception> verifyHeartbeatTimeout) throws Exception {
        this.resourceManager = this.createAndStartResourceManager(fastHeartbeatServices);
        ResourceManagerGateway resourceManagerGateway = (ResourceManagerGateway)this.resourceManager.getSelfGateway(ResourceManagerGateway.class);
        registerComponentAtResourceManager.accept((Object)resourceManagerGateway);
        verifyHeartbeatTimeout.accept((Object)this.resourceManagerResourceId);
    }

    private TestingResourceManager createAndStartResourceManager(HeartbeatServices heartbeatServices) throws Exception {
        SlotManagerImpl slotManager = SlotManagerBuilder.newBuilder().setScheduledExecutor(rpcService.getScheduledExecutor()).build();
        JobLeaderIdService jobLeaderIdService = new JobLeaderIdService((HighAvailabilityServices)this.highAvailabilityServices, rpcService.getScheduledExecutor(), TestingUtils.infiniteTime());
        TestingResourceManager resourceManager = new TestingResourceManager((RpcService)rpcService, this.resourceManagerResourceId, this.highAvailabilityServices, heartbeatServices, (SlotManager)slotManager, NoOpResourceManagerPartitionTracker::get, jobLeaderIdService, this.testingFatalErrorHandler, UnregisteredMetricGroups.createUnregisteredResourceManagerMetricGroup());
        resourceManager.start();
        this.resourceManagerId = ResourceManagerId.generate();
        this.resourceManagerLeaderElectionService.isLeader(this.resourceManagerId.toUUID()).get();
        return resourceManager;
    }
}

