package org.apache.flink.runtime.resourcemanager;

import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeoutException;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.entrypoint.ClusterInformation;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.util.TestingMetricRegistry;
import org.apache.flink.runtime.resourcemanager.TestingResourceManagerFactory;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImplTest.class */
public class ResourceManagerServiceImplTest extends TestLogger {
    private static final Time TIMEOUT = Time.seconds(10);
    private static final Time FAST_TIMEOUT = Time.milliseconds(50);
    private static final HeartbeatServices heartbeatServices = new TestingHeartbeatServices();
    private static final ClusterInformation clusterInformation = new ClusterInformation("localhost", 1234);
    private static final MetricRegistry metricRegistry = TestingMetricRegistry.builder().build();
    private static TestingRpcService rpcService;
    private static TestingHighAvailabilityServices haService;
    private static TestingFatalErrorHandler fatalErrorHandler;
    private TestingResourceManagerFactory.Builder rmFactoryBuilder;
    private TestingLeaderElectionService leaderElectionService;
    private ResourceManagerServiceImpl resourceManagerService;

    @BeforeClass
    public static void setupClass() {
        rpcService = new TestingRpcService();
        haService = new TestingHighAvailabilityServices();
        fatalErrorHandler = new TestingFatalErrorHandler();
    }

    @Before
    public void setup() throws Exception {
        fatalErrorHandler.clearError();
        this.rmFactoryBuilder = new TestingResourceManagerFactory.Builder();
        this.leaderElectionService = new TestingLeaderElectionService();
        haService.setResourceManagerLeaderElectionService(this.leaderElectionService);
    }

    @After
    public void teardown() throws Exception {
        if (this.resourceManagerService != null) {
            this.resourceManagerService.close();
        }
        if (this.leaderElectionService != null) {
            this.leaderElectionService.stop();
        }
        if (fatalErrorHandler.hasExceptionOccurred()) {
            fatalErrorHandler.rethrowError();
        }
    }

    @AfterClass
    public static void teardownClass() throws Exception {
        if (rpcService != null) {
            RpcUtils.terminateRpcService(rpcService, TIMEOUT);
        }
    }

    private void createAndStartResourceManager() throws Exception {
        createResourceManager();
        this.resourceManagerService.start();
    }

    private void createResourceManager() throws Exception {
        this.resourceManagerService = ResourceManagerServiceImpl.create(this.rmFactoryBuilder.build(), new Configuration(), rpcService, haService, heartbeatServices, fatalErrorHandler, clusterInformation, (String) null, metricRegistry, "localhost", ForkJoinPool.commonPool());
    }

    @Test
    public void grantLeadership_startRmAndConfirmLeaderSession() throws Exception {
        UUID randomUUID = UUID.randomUUID();
        CompletableFuture completableFuture = new CompletableFuture();
        TestingResourceManagerFactory.Builder builder = this.rmFactoryBuilder;
        completableFuture.getClass();
        builder.setInitializeConsumer((v1) -> {
            r1.complete(v1);
        });
        createAndStartResourceManager();
        this.leaderElectionService.isLeader(randomUUID);
        Assert.assertThat(completableFuture.get(TIMEOUT.getSize(), TIMEOUT.getUnit()), Matchers.is(randomUUID));
        Assert.assertThat(this.leaderElectionService.getConfirmationFuture().get(TIMEOUT.getSize(), TIMEOUT.getUnit()).getLeaderSessionId(), Matchers.is(randomUUID));
    }

    @Test
    public void grantLeadership_confirmLeaderSessionAfterRmStarted() throws Exception {
        UUID randomUUID = UUID.randomUUID();
        CompletableFuture completableFuture = new CompletableFuture();
        this.rmFactoryBuilder.setInitializeConsumer(uuid -> {
            blockOnFuture(completableFuture);
        });
        createAndStartResourceManager();
        this.leaderElectionService.isLeader(randomUUID);
        assertNotComplete(this.leaderElectionService.getConfirmationFuture());
        completableFuture.complete(null);
        Assert.assertThat(this.leaderElectionService.getConfirmationFuture().get(TIMEOUT.getSize(), TIMEOUT.getUnit()).getLeaderSessionId(), Matchers.is(randomUUID));
    }

    @Test
    public void grantLeadership_withExistingLeader_stopExistLeader() throws Exception {
        UUID randomUUID = UUID.randomUUID();
        UUID randomUUID2 = UUID.randomUUID();
        CompletableFuture completableFuture = new CompletableFuture();
        CompletableFuture completableFuture2 = new CompletableFuture();
        CompletableFuture completableFuture3 = new CompletableFuture();
        TestingResourceManagerFactory.Builder initializeConsumer = this.rmFactoryBuilder.setInitializeConsumer(uuid -> {
            if (completableFuture.isDone()) {
                completableFuture2.complete(uuid);
            } else {
                completableFuture.complete(uuid);
            }
        });
        completableFuture3.getClass();
        initializeConsumer.setTerminateConsumer((v1) -> {
            r1.complete(v1);
        });
        createAndStartResourceManager();
        this.leaderElectionService.isLeader(randomUUID);
        assertRmStarted();
        this.leaderElectionService.isLeader(randomUUID2);
        Assert.assertThat(completableFuture3.get(TIMEOUT.getSize(), TIMEOUT.getUnit()), Matchers.is(randomUUID));
        Assert.assertThat(completableFuture2.get(TIMEOUT.getSize(), TIMEOUT.getUnit()), Matchers.is(randomUUID2));
        Assert.assertThat(this.leaderElectionService.getConfirmationFuture().get(TIMEOUT.getSize(), TIMEOUT.getUnit()).getLeaderSessionId(), Matchers.is(randomUUID2));
    }

    @Test
    public void grantLeadership_withExistingLeader_waitTerminationOfExistingLeader() throws Exception {
        UUID randomUUID = UUID.randomUUID();
        UUID randomUUID2 = UUID.randomUUID();
        CompletableFuture completableFuture = new CompletableFuture();
        CompletableFuture completableFuture2 = new CompletableFuture();
        CompletableFuture completableFuture3 = new CompletableFuture();
        this.rmFactoryBuilder.setInitializeConsumer(uuid -> {
            if (completableFuture.isDone()) {
                completableFuture2.complete(uuid);
            } else {
                completableFuture.complete(uuid);
            }
        }).setTerminateConsumer(uuid2 -> {
            blockOnFuture(completableFuture3);
        });
        createAndStartResourceManager();
        this.leaderElectionService.isLeader(randomUUID);
        assertRmStarted();
        this.leaderElectionService.isLeader(randomUUID2);
        assertNotComplete(completableFuture2);
        completableFuture3.complete(null);
        Assert.assertThat(completableFuture2.get(TIMEOUT.getSize(), TIMEOUT.getUnit()), Matchers.is(randomUUID2));
        Assert.assertThat(this.leaderElectionService.getConfirmationFuture().get(TIMEOUT.getSize(), TIMEOUT.getUnit()).getLeaderSessionId(), Matchers.is(randomUUID2));
    }

    @Test
    public void grantLeadership_notStarted_doesNotStartNewRm() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        TestingResourceManagerFactory.Builder builder = this.rmFactoryBuilder;
        completableFuture.getClass();
        builder.setInitializeConsumer((v1) -> {
            r1.complete(v1);
        });
        createResourceManager();
        this.leaderElectionService.isLeader(UUID.randomUUID());
        assertNotComplete(completableFuture);
        assertNotComplete(this.leaderElectionService.getConfirmationFuture());
    }

    @Test
    public void grantLeadership_stopped_doesNotStartNewRm() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        TestingResourceManagerFactory.Builder builder = this.rmFactoryBuilder;
        completableFuture.getClass();
        builder.setInitializeConsumer((v1) -> {
            r1.complete(v1);
        });
        createAndStartResourceManager();
        this.resourceManagerService.close();
        this.leaderElectionService.isLeader(UUID.randomUUID());
        assertNotComplete(completableFuture);
        assertNotComplete(this.leaderElectionService.getConfirmationFuture());
    }

    @Test
    public void revokeLeadership_stopExistLeader() throws Exception {
        UUID randomUUID = UUID.randomUUID();
        CompletableFuture completableFuture = new CompletableFuture();
        TestingResourceManagerFactory.Builder builder = this.rmFactoryBuilder;
        completableFuture.getClass();
        builder.setTerminateConsumer((v1) -> {
            r1.complete(v1);
        });
        createAndStartResourceManager();
        this.leaderElectionService.isLeader(randomUUID);
        assertRmStarted();
        this.leaderElectionService.notLeader();
        Assert.assertThat(completableFuture.get(TIMEOUT.getSize(), TIMEOUT.getUnit()), Matchers.is(randomUUID));
    }

    @Test
    public void revokeLeadership_terminateService_multiLeaderSessionNotSupported() throws Exception {
        this.rmFactoryBuilder.setSupportMultiLeaderSession(false);
        createAndStartResourceManager();
        this.leaderElectionService.isLeader(UUID.randomUUID());
        assertRmStarted();
        this.leaderElectionService.notLeader();
        this.resourceManagerService.getTerminationFuture().get(TIMEOUT.getSize(), TIMEOUT.getUnit());
    }

    @Test
    public void leaderRmTerminated_terminateService() throws Exception {
        UUID randomUUID = UUID.randomUUID();
        CompletableFuture completableFuture = new CompletableFuture();
        this.rmFactoryBuilder.setGetTerminationFutureFunction((resourceManager, completableFuture2) -> {
            return completableFuture;
        });
        createAndStartResourceManager();
        this.leaderElectionService.isLeader(randomUUID);
        assertRmStarted();
        completableFuture.complete(null);
        this.resourceManagerService.getTerminationFuture().get(TIMEOUT.getSize(), TIMEOUT.getUnit());
    }

    @Test
    public void nonLeaderRmTerminated_doseNotTerminateService() throws Exception {
        UUID randomUUID = UUID.randomUUID();
        CompletableFuture completableFuture = new CompletableFuture();
        CompletableFuture completableFuture2 = new CompletableFuture();
        TestingResourceManagerFactory.Builder builder = this.rmFactoryBuilder;
        completableFuture.getClass();
        builder.setTerminateConsumer((v1) -> {
            r1.complete(v1);
        }).setGetTerminationFutureFunction((resourceManager, completableFuture3) -> {
            return completableFuture2;
        });
        createAndStartResourceManager();
        this.leaderElectionService.isLeader(randomUUID);
        assertRmStarted();
        this.leaderElectionService.notLeader();
        Assert.assertThat(completableFuture.get(TIMEOUT.getSize(), TIMEOUT.getUnit()), Matchers.is(randomUUID));
        completableFuture2.complete(null);
        assertNotComplete(this.resourceManagerService.getTerminationFuture());
    }

    @Test
    public void closeService_stopRmAndLeaderElection() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        TestingResourceManagerFactory.Builder builder = this.rmFactoryBuilder;
        completableFuture.getClass();
        builder.setTerminateConsumer((v1) -> {
            r1.complete(v1);
        });
        createAndStartResourceManager();
        this.leaderElectionService.isLeader(UUID.randomUUID());
        assertRmStarted();
        Assert.assertFalse(this.leaderElectionService.isStopped());
        this.resourceManagerService.close();
        Assert.assertTrue(completableFuture.isDone());
        Assert.assertTrue(this.leaderElectionService.isStopped());
    }

    @Test
    public void closeService_futureCompleteAfterRmTerminated() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        this.rmFactoryBuilder.setTerminateConsumer(uuid -> {
            blockOnFuture(completableFuture);
        });
        createAndStartResourceManager();
        this.leaderElectionService.isLeader(UUID.randomUUID());
        assertRmStarted();
        CompletableFuture closeAsync = this.resourceManagerService.closeAsync();
        assertNotComplete(closeAsync);
        completableFuture.complete(null);
        closeAsync.get(TIMEOUT.getSize(), TIMEOUT.getUnit());
    }

    @Test
    public void deregisterApplication_leaderRmNotStarted() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        CompletableFuture completableFuture2 = new CompletableFuture();
        this.rmFactoryBuilder.setInitializeConsumer(uuid -> {
            completableFuture.complete(null);
            blockOnFuture(completableFuture2);
        });
        createAndStartResourceManager();
        this.leaderElectionService.isLeader(UUID.randomUUID());
        completableFuture.get(TIMEOUT.getSize(), TIMEOUT.getUnit());
        CompletableFuture deregisterApplication = this.resourceManagerService.deregisterApplication(ApplicationStatus.CANCELED, (String) null);
        assertNotComplete(deregisterApplication);
        completableFuture2.complete(null);
        deregisterApplication.get(TIMEOUT.getSize(), TIMEOUT.getUnit());
    }

    @Test
    public void deregisterApplication_noLeaderRm() throws Exception {
        createAndStartResourceManager();
        this.resourceManagerService.deregisterApplication(ApplicationStatus.CANCELED, (String) null).get(TIMEOUT.getSize(), TIMEOUT.getUnit());
    }

    @Test
    public void grantAndRevokeLeadership_verifyMetrics() throws Exception {
        Set newSetFromMap = Collections.newSetFromMap(new ConcurrentHashMap());
        this.resourceManagerService = ResourceManagerServiceImpl.create(this.rmFactoryBuilder.build(), new Configuration(), rpcService, haService, heartbeatServices, fatalErrorHandler, clusterInformation, (String) null, TestingMetricRegistry.builder().setRegisterConsumer((metric, str, abstractMetricGroup) -> {
            newSetFromMap.add(str);
        }).setUnregisterConsumer((metric2, str2, abstractMetricGroup2) -> {
            newSetFromMap.remove(str2);
        }).build(), "localhost", ForkJoinPool.commonPool());
        this.resourceManagerService.start();
        Assert.assertEquals(0L, newSetFromMap.size());
        this.leaderElectionService.isLeader(UUID.randomUUID());
        assertRmStarted();
        HashSet hashSet = new HashSet();
        hashSet.add("numRegisteredTaskManagers");
        hashSet.add("taskSlotsTotal");
        hashSet.add("taskSlotsAvailable");
        Assert.assertTrue("Expected RM to register leader metrics", newSetFromMap.containsAll(hashSet));
        revokeLeadership();
        HashSet hashSet2 = new HashSet(newSetFromMap);
        hashSet2.retainAll(hashSet);
        Assert.assertTrue("Expected RM to unregister leader metrics", hashSet2.isEmpty());
        this.leaderElectionService.isLeader(UUID.randomUUID());
        assertRmStarted();
        Assert.assertTrue("Expected RM to re-register leader metrics", newSetFromMap.containsAll(hashSet));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void blockOnFuture(CompletableFuture<?> completableFuture) {
        try {
            completableFuture.get();
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail();
        }
    }

    private static void assertNotComplete(CompletableFuture<?> completableFuture) throws Exception {
        try {
            completableFuture.get(FAST_TIMEOUT.getSize(), FAST_TIMEOUT.getUnit());
            Assert.fail();
        } catch (TimeoutException e) {
        }
    }

    private void assertRmStarted() throws Exception {
        this.leaderElectionService.getConfirmationFuture().get(TIMEOUT.getSize(), TIMEOUT.getUnit());
    }

    private void revokeLeadership() {
        ResourceManager leaderResourceManager = this.resourceManagerService.getLeaderResourceManager();
        this.leaderElectionService.notLeader();
        blockOnFuture(leaderResourceManager.getTerminationFuture());
    }
}
