package org.apache.flink.runtime.resourcemanager;

import java.util.Collections;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.instance.HardwareDescription;
import org.apache.flink.runtime.io.network.partition.NoOpResourceManagerPartitionTracker;
import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.registration.RegistrationResponse;
import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
import org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManagerBuilder;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
import org.apache.flink.runtime.resourcemanager.slotmanager.TestingSlotManagerBuilder;
import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerInfo;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.rpc.exceptions.RecipientUnreachableException;
import org.apache.flink.runtime.slots.ResourceRequirement;
import org.apache.flink.runtime.slots.ResourceRequirements;
import org.apache.flink.runtime.taskexecutor.TaskExecutorMemoryConfiguration;
import org.apache.flink.runtime.taskexecutor.TaskExecutorThreadInfoGateway;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.function.ThrowingConsumer;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/resourcemanager/ResourceManagerTest.class */
public class ResourceManagerTest extends TestLogger {
    private static final Time TIMEOUT = Time.minutes(2);
    private static final HeartbeatServices heartbeatServices = new HeartbeatServices(1000, 10000);
    private static final HeartbeatServices fastHeartbeatServices = new HeartbeatServices(1, 1);
    private static final HeartbeatServices failedRpcEnabledHeartbeatServices = new HeartbeatServices(1, 10000000, 1);
    private static final HardwareDescription hardwareDescription = new HardwareDescription(42, 1337, 1337, 0);
    private static final int dataPort = 1234;
    private static final int jmxPort = 23456;
    private static TestingRpcService rpcService;
    private TestingHighAvailabilityServices highAvailabilityServices;
    private TestingFatalErrorHandler testingFatalErrorHandler;
    private ResourceID resourceManagerResourceId;
    private TestingResourceManager resourceManager;
    private ResourceManagerId resourceManagerId;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/resourcemanager/ResourceManagerTest$ResourceManagerBuilder.class */
    public class ResourceManagerBuilder {
        private HeartbeatServices heartbeatServices;
        private JobLeaderIdService jobLeaderIdService;
        private SlotManager slotManager;
        private Function<ResourceID, Boolean> stopWorkerFunction;

        private ResourceManagerBuilder() {
            this.heartbeatServices = null;
            this.jobLeaderIdService = null;
            this.slotManager = null;
            this.stopWorkerFunction = null;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public ResourceManagerBuilder withHeartbeatServices(HeartbeatServices heartbeatServices) {
            this.heartbeatServices = heartbeatServices;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public ResourceManagerBuilder withJobLeaderIdService(JobLeaderIdService jobLeaderIdService) {
            this.jobLeaderIdService = jobLeaderIdService;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public ResourceManagerBuilder withSlotManager(SlotManager slotManager) {
            this.slotManager = slotManager;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public ResourceManagerBuilder withStopWorkerFunction(Function<ResourceID, Boolean> function) {
            this.stopWorkerFunction = function;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public TestingResourceManager buildAndStart() throws Exception {
            if (this.heartbeatServices == null) {
                this.heartbeatServices = ResourceManagerTest.heartbeatServices;
            }
            if (this.jobLeaderIdService == null) {
                this.jobLeaderIdService = new DefaultJobLeaderIdService(ResourceManagerTest.this.highAvailabilityServices, ResourceManagerTest.rpcService.getScheduledExecutor(), TestingUtils.infiniteTime());
            }
            if (this.slotManager == null) {
                this.slotManager = DeclarativeSlotManagerBuilder.newBuilder().setScheduledExecutor(ResourceManagerTest.rpcService.getScheduledExecutor()).build();
            }
            if (this.stopWorkerFunction == null) {
                this.stopWorkerFunction = resourceID -> {
                    return false;
                };
            }
            ResourceManagerTest.this.resourceManagerId = ResourceManagerId.generate();
            TestingResourceManager testingResourceManager = new TestingResourceManager(ResourceManagerTest.rpcService, ResourceManagerTest.this.resourceManagerId.toUUID(), ResourceManagerTest.this.resourceManagerResourceId, this.heartbeatServices, this.slotManager, NoOpResourceManagerPartitionTracker::get, this.jobLeaderIdService, ResourceManagerTest.this.testingFatalErrorHandler, UnregisteredMetricGroups.createUnregisteredResourceManagerMetricGroup(), this.stopWorkerFunction);
            testingResourceManager.start();
            testingResourceManager.getStartedFuture().get(ResourceManagerTest.TIMEOUT.getSize(), ResourceManagerTest.TIMEOUT.getUnit());
            return testingResourceManager;
        }
    }

    @BeforeClass
    public static void setupClass() {
        rpcService = new TestingRpcService();
    }

    @Before
    public void setup() throws Exception {
        this.highAvailabilityServices = new TestingHighAvailabilityServices();
        this.highAvailabilityServices.setResourceManagerLeaderElectionService(new TestingLeaderElectionService());
        this.testingFatalErrorHandler = new TestingFatalErrorHandler();
        this.resourceManagerResourceId = ResourceID.generate();
    }

    @After
    public void after() throws Exception {
        if (this.resourceManager != null) {
            RpcUtils.terminateRpcEndpoint(this.resourceManager, TIMEOUT);
        }
        if (this.highAvailabilityServices != null) {
            this.highAvailabilityServices.closeAndCleanupAllData();
        }
        if (this.testingFatalErrorHandler.hasExceptionOccurred()) {
            this.testingFatalErrorHandler.rethrowError();
        }
        if (rpcService != null) {
            rpcService.clearGateways();
        }
    }

    @AfterClass
    public static void tearDownClass() throws Exception {
        if (rpcService != null) {
            RpcUtils.terminateRpcServices(TIMEOUT, new RpcService[]{rpcService});
        }
    }

    @Test
    public void testRequestTaskManagerInfo() throws Exception {
        ResourceID generate = ResourceID.generate();
        RpcGateway createTestingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setAddress(UUID.randomUUID().toString()).createTestingTaskExecutorGateway();
        rpcService.registerGateway(createTestingTaskExecutorGateway.getAddress(), createTestingTaskExecutorGateway);
        this.resourceManager = new ResourceManagerBuilder().buildAndStart();
        ResourceManagerGateway resourceManagerGateway = (ResourceManagerGateway) this.resourceManager.getSelfGateway(ResourceManagerGateway.class);
        registerTaskExecutor(resourceManagerGateway, generate, createTestingTaskExecutorGateway.getAddress());
        TaskManagerInfoWithSlots taskManagerInfoWithSlots = (TaskManagerInfoWithSlots) resourceManagerGateway.requestTaskManagerDetailsInfo(generate, TestingUtils.TIMEOUT).get();
        TaskManagerInfo taskManagerInfo = taskManagerInfoWithSlots.getTaskManagerInfo();
        Assert.assertEquals(generate, taskManagerInfo.getResourceId());
        Assert.assertEquals(hardwareDescription, taskManagerInfo.getHardwareDescription());
        Assert.assertEquals(createTestingTaskExecutorGateway.getAddress(), taskManagerInfo.getAddress());
        Assert.assertEquals(1234L, taskManagerInfo.getDataPort());
        Assert.assertEquals(23456L, taskManagerInfo.getJmxPort());
        Assert.assertEquals(0L, taskManagerInfo.getNumberSlots());
        Assert.assertEquals(0L, taskManagerInfo.getNumberAvailableSlots());
        MatcherAssert.assertThat(taskManagerInfoWithSlots.getAllocatedSlots(), Matchers.is(Matchers.empty()));
    }

    @Test
    public void testRequestTaskExecutorGateway() throws Exception {
        ResourceID generate = ResourceID.generate();
        RpcGateway createTestingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setAddress(UUID.randomUUID().toString()).createTestingTaskExecutorGateway();
        rpcService.registerGateway(createTestingTaskExecutorGateway.getAddress(), createTestingTaskExecutorGateway);
        this.resourceManager = new ResourceManagerBuilder().buildAndStart();
        ResourceManagerGateway resourceManagerGateway = (ResourceManagerGateway) this.resourceManager.getSelfGateway(ResourceManagerGateway.class);
        registerTaskExecutor(resourceManagerGateway, generate, createTestingTaskExecutorGateway.getAddress());
        Assert.assertEquals(createTestingTaskExecutorGateway, (TaskExecutorThreadInfoGateway) resourceManagerGateway.requestTaskExecutorThreadInfoGateway(generate, TestingUtils.TIMEOUT).get());
    }

    private void registerTaskExecutor(ResourceManagerGateway resourceManagerGateway, ResourceID resourceID, String str) throws Exception {
        MatcherAssert.assertThat(resourceManagerGateway.registerTaskExecutor(new TaskExecutorRegistration(str, resourceID, dataPort, jmxPort, hardwareDescription, new TaskExecutorMemoryConfiguration(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L), ResourceProfile.ZERO, ResourceProfile.ZERO), TestingUtils.TIMEOUT).get(), Matchers.instanceOf(RegistrationResponse.Success.class));
    }

    @Test
    public void testDisconnectJobManagerClearsRequirements() throws Exception {
        RpcGateway build = new TestingJobMasterGatewayBuilder().setAddress(UUID.randomUUID().toString()).build();
        rpcService.registerGateway(build.getAddress(), build);
        TestingJobLeaderIdService build2 = TestingJobLeaderIdService.newBuilder().setGetLeaderIdFunction(jobID -> {
            return CompletableFuture.completedFuture(build.m203getFencingToken());
        }).build();
        CompletableFuture completableFuture = new CompletableFuture();
        TestingSlotManagerBuilder testingSlotManagerBuilder = new TestingSlotManagerBuilder();
        completableFuture.getClass();
        this.resourceManager = new ResourceManagerBuilder().withJobLeaderIdService(build2).withSlotManager(testingSlotManagerBuilder.setClearRequirementsConsumer((v1) -> {
            r1.complete(v1);
        }).createSlotManager()).buildAndStart();
        JobID generate = JobID.generate();
        ResourceManagerGateway selfGateway = this.resourceManager.getSelfGateway(ResourceManagerGateway.class);
        selfGateway.registerJobMaster(build.m203getFencingToken(), ResourceID.generate(), build.getAddress(), generate, TIMEOUT).get();
        selfGateway.declareRequiredResources(build.m203getFencingToken(), ResourceRequirements.create(generate, build.getAddress(), Collections.singleton(ResourceRequirement.create(ResourceProfile.UNKNOWN, 1))), TIMEOUT).get();
        selfGateway.disconnectJobManager(generate, JobStatus.FINISHED, new FlinkException("Test exception"));
        MatcherAssert.assertThat(completableFuture.get(5L, TimeUnit.SECONDS), Matchers.is(generate));
    }

    @Test
    public void testHeartbeatTimeoutWithJobMaster() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        CompletableFuture completableFuture2 = new CompletableFuture();
        TestingJobMasterGatewayBuilder resourceManagerHeartbeatFunction = new TestingJobMasterGatewayBuilder().setResourceManagerHeartbeatFunction(resourceID -> {
            completableFuture.complete(resourceID);
            return FutureUtils.completedVoidFuture();
        });
        completableFuture2.getClass();
        RpcGateway build = resourceManagerHeartbeatFunction.setDisconnectResourceManagerConsumer((v1) -> {
            r1.complete(v1);
        }).build();
        rpcService.registerGateway(build.getAddress(), build);
        JobID jobID = new JobID();
        ResourceID generate = ResourceID.generate();
        SettableLeaderRetrievalService settableLeaderRetrievalService = new SettableLeaderRetrievalService(build.getAddress(), build.m203getFencingToken().toUUID());
        this.highAvailabilityServices.setJobMasterLeaderRetrieverFunction(jobID2 -> {
            MatcherAssert.assertThat(jobID2, Matchers.is(Matchers.equalTo(jobID)));
            return settableLeaderRetrievalService;
        });
        runHeartbeatTimeoutTest(resourceManagerBuilder -> {
        }, resourceManagerGateway -> {
            MatcherAssert.assertThat(resourceManagerGateway.registerJobMaster(build.m203getFencingToken(), generate, build.getAddress(), jobID, TIMEOUT).get(), Matchers.instanceOf(RegistrationResponse.Success.class));
        }, resourceID2 -> {
            MatcherAssert.assertThat((ResourceID) completableFuture.getNow(null), Matchers.anyOf(Matchers.is(resourceID2), Matchers.is(Matchers.nullValue())));
            MatcherAssert.assertThat(completableFuture2.get(), Matchers.is(Matchers.equalTo(this.resourceManagerId)));
        });
    }

    @Test
    public void testJobMasterBecomesUnreachableTriggersDisconnect() throws Exception {
        JobID jobID = new JobID();
        ResourceID generate = ResourceID.generate();
        CompletableFuture completableFuture = new CompletableFuture();
        TestingJobMasterGatewayBuilder resourceManagerHeartbeatFunction = new TestingJobMasterGatewayBuilder().setAddress(UUID.randomUUID().toString()).setResourceManagerHeartbeatFunction(resourceID -> {
            return FutureUtils.completedExceptionally(new RecipientUnreachableException("sender", "recipient", "task executor is unreachable"));
        });
        completableFuture.getClass();
        RpcGateway build = resourceManagerHeartbeatFunction.setDisconnectResourceManagerConsumer((v1) -> {
            r1.complete(v1);
        }).build();
        rpcService.registerGateway(build.getAddress(), build);
        SettableLeaderRetrievalService settableLeaderRetrievalService = new SettableLeaderRetrievalService(build.getAddress(), build.m203getFencingToken().toUUID());
        this.highAvailabilityServices.setJobMasterLeaderRetrieverFunction(jobID2 -> {
            MatcherAssert.assertThat(jobID2, Matchers.is(Matchers.equalTo(jobID)));
            return settableLeaderRetrievalService;
        });
        runHeartbeatTargetBecomesUnreachableTest(resourceManagerBuilder -> {
        }, resourceManagerGateway -> {
            MatcherAssert.assertThat(resourceManagerGateway.registerJobMaster(build.m203getFencingToken(), generate, build.getAddress(), jobID, TIMEOUT).get(), Matchers.instanceOf(RegistrationResponse.Success.class));
        }, resourceID2 -> {
            MatcherAssert.assertThat(completableFuture.get(), Matchers.is(Matchers.equalTo(this.resourceManagerId)));
        });
    }

    @Test
    public void testHeartbeatTimeoutWithTaskExecutor() throws Exception {
        ResourceID generate = ResourceID.generate();
        CompletableFuture completableFuture = new CompletableFuture();
        CompletableFuture completableFuture2 = new CompletableFuture();
        CompletableFuture completableFuture3 = new CompletableFuture();
        TestingTaskExecutorGatewayBuilder testingTaskExecutorGatewayBuilder = new TestingTaskExecutorGatewayBuilder();
        completableFuture2.getClass();
        RpcGateway createTestingTaskExecutorGateway = testingTaskExecutorGatewayBuilder.setDisconnectResourceManagerConsumer((v1) -> {
            r1.complete(v1);
        }).setHeartbeatResourceManagerFunction(resourceID -> {
            completableFuture.complete(resourceID);
            return FutureUtils.completedVoidFuture();
        }).createTestingTaskExecutorGateway();
        rpcService.registerGateway(createTestingTaskExecutorGateway.getAddress(), createTestingTaskExecutorGateway);
        runHeartbeatTimeoutTest(resourceManagerBuilder -> {
            resourceManagerBuilder.withStopWorkerFunction(resourceID2 -> {
                completableFuture3.complete(resourceID2);
                return true;
            });
        }, resourceManagerGateway -> {
            registerTaskExecutor(resourceManagerGateway, generate, createTestingTaskExecutorGateway.getAddress());
        }, resourceID2 -> {
            MatcherAssert.assertThat((ResourceID) completableFuture.getNow(null), Matchers.anyOf(Matchers.is(resourceID2), Matchers.is(Matchers.nullValue())));
            MatcherAssert.assertThat(completableFuture2.get(), Matchers.instanceOf(TimeoutException.class));
            MatcherAssert.assertThat(completableFuture3.get(), Matchers.is(generate));
        });
    }

    @Test
    public void testTaskExecutorBecomesUnreachableTriggersDisconnect() throws Exception {
        ResourceID generate = ResourceID.generate();
        CompletableFuture completableFuture = new CompletableFuture();
        CompletableFuture completableFuture2 = new CompletableFuture();
        TestingTaskExecutorGatewayBuilder address = new TestingTaskExecutorGatewayBuilder().setAddress(UUID.randomUUID().toString());
        completableFuture.getClass();
        RpcGateway createTestingTaskExecutorGateway = address.setDisconnectResourceManagerConsumer((v1) -> {
            r1.complete(v1);
        }).setHeartbeatResourceManagerFunction(resourceID -> {
            return FutureUtils.completedExceptionally(new RecipientUnreachableException("sender", "recipient", "task executor is unreachable"));
        }).createTestingTaskExecutorGateway();
        rpcService.registerGateway(createTestingTaskExecutorGateway.getAddress(), createTestingTaskExecutorGateway);
        runHeartbeatTargetBecomesUnreachableTest(resourceManagerBuilder -> {
            resourceManagerBuilder.withStopWorkerFunction(resourceID2 -> {
                completableFuture2.complete(resourceID2);
                return true;
            });
        }, resourceManagerGateway -> {
            registerTaskExecutor(resourceManagerGateway, generate, createTestingTaskExecutorGateway.getAddress());
        }, resourceID2 -> {
            MatcherAssert.assertThat(completableFuture.get(), Matchers.instanceOf(ResourceManagerException.class));
            MatcherAssert.assertThat(completableFuture2.get(), Matchers.is(generate));
        });
    }

    @Test
    public void testDisconnectJobManagerWithTerminalStatusShouldRemoveJob() throws Exception {
        testDisconnectJobManager(JobStatus.CANCELED);
    }

    @Test
    public void testDisconnectJobManagerWithNonTerminalStatusShouldNotRemoveJob() throws Exception {
        testDisconnectJobManager(JobStatus.FAILING);
    }

    @Test
    public void testDisconnectTaskManager() throws Exception {
        ResourceID generate = ResourceID.generate();
        CompletableFuture completableFuture = new CompletableFuture();
        CompletableFuture completableFuture2 = new CompletableFuture();
        TestingTaskExecutorGatewayBuilder testingTaskExecutorGatewayBuilder = new TestingTaskExecutorGatewayBuilder();
        completableFuture.getClass();
        RpcGateway createTestingTaskExecutorGateway = testingTaskExecutorGatewayBuilder.setDisconnectResourceManagerConsumer((v1) -> {
            r1.complete(v1);
        }).createTestingTaskExecutorGateway();
        rpcService.registerGateway(createTestingTaskExecutorGateway.getAddress(), createTestingTaskExecutorGateway);
        ResourceManagerBuilder resourceManagerBuilder = new ResourceManagerBuilder();
        completableFuture2.getClass();
        this.resourceManager = resourceManagerBuilder.withStopWorkerFunction((v1) -> {
            return r2.complete(v1);
        }).buildAndStart();
        registerTaskExecutor(this.resourceManager, generate, createTestingTaskExecutorGateway.getAddress());
        this.resourceManager.disconnectTaskManager(generate, new FlinkException("Test exception"));
        MatcherAssert.assertThat(completableFuture.get(), Matchers.instanceOf(FlinkException.class));
        MatcherAssert.assertThat(completableFuture2.get(), Matchers.is(generate));
    }

    private void testDisconnectJobManager(JobStatus jobStatus) throws Exception {
        RpcGateway build = new TestingJobMasterGatewayBuilder().setAddress(UUID.randomUUID().toString()).build();
        rpcService.registerGateway(build.getAddress(), build);
        OneShotLatch oneShotLatch = new OneShotLatch();
        OneShotLatch oneShotLatch2 = new OneShotLatch();
        this.resourceManager = new ResourceManagerBuilder().withJobLeaderIdService(TestingJobLeaderIdService.newBuilder().setAddJobConsumer(jobID -> {
            oneShotLatch.trigger();
        }).setRemoveJobConsumer(jobID2 -> {
            oneShotLatch2.trigger();
        }).build()).buildAndStart();
        this.highAvailabilityServices.setJobMasterLeaderRetrieverFunction(jobID3 -> {
            return new SettableLeaderRetrievalService(build.getAddress(), build.m203getFencingToken().toUUID());
        });
        JobID generate = JobID.generate();
        ResourceManagerGateway selfGateway = this.resourceManager.getSelfGateway(ResourceManagerGateway.class);
        selfGateway.registerJobMaster(build.m203getFencingToken(), ResourceID.generate(), build.getAddress(), generate, TIMEOUT);
        oneShotLatch.await();
        selfGateway.disconnectJobManager(generate, jobStatus, new FlinkException("Test exception"));
        if (jobStatus.isGloballyTerminalState()) {
            oneShotLatch2.await();
            return;
        }
        try {
            oneShotLatch2.await(10L, TimeUnit.MILLISECONDS);
            Assert.fail("We should not have removed the job.");
        } catch (TimeoutException e) {
        }
    }

    private void runHeartbeatTimeoutTest(Consumer<ResourceManagerBuilder> consumer, ThrowingConsumer<ResourceManagerGateway, Exception> throwingConsumer, ThrowingConsumer<ResourceID, Exception> throwingConsumer2) throws Exception {
        ResourceManagerBuilder resourceManagerBuilder = new ResourceManagerBuilder();
        consumer.accept(resourceManagerBuilder);
        this.resourceManager = resourceManagerBuilder.withHeartbeatServices(fastHeartbeatServices).buildAndStart();
        throwingConsumer.accept(this.resourceManager.getSelfGateway(ResourceManagerGateway.class));
        throwingConsumer2.accept(this.resourceManagerResourceId);
    }

    private void runHeartbeatTargetBecomesUnreachableTest(Consumer<ResourceManagerBuilder> consumer, ThrowingConsumer<ResourceManagerGateway, Exception> throwingConsumer, ThrowingConsumer<ResourceID, Exception> throwingConsumer2) throws Exception {
        ResourceManagerBuilder resourceManagerBuilder = new ResourceManagerBuilder();
        consumer.accept(resourceManagerBuilder);
        this.resourceManager = resourceManagerBuilder.withHeartbeatServices(failedRpcEnabledHeartbeatServices).buildAndStart();
        throwingConsumer.accept(this.resourceManager.getSelfGateway(ResourceManagerGateway.class));
        throwingConsumer2.accept(this.resourceManagerResourceId);
    }
}
