package org.apache.flink.runtime.resourcemanager;

import java.util.Collections;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.entrypoint.ClusterInformation;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.leaderelection.LeaderInformation;
import org.apache.flink.runtime.leaderelection.TestingLeaderElection;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.util.TestingMetricRegistry;
import org.apache.flink.runtime.resourcemanager.TestingResourceManagerFactory;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.security.token.DelegationTokenManager;
import org.apache.flink.runtime.security.token.NoOpDelegationTokenManager;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.assertj.core.api.Assertions;
import org.assertj.core.util.Sets;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImplTest.class */
class ResourceManagerServiceImplTest {
    private static final HeartbeatServices heartbeatServices = new TestingHeartbeatServices();
    private static final DelegationTokenManager delegationTokenManager = new NoOpDelegationTokenManager();
    private static final ClusterInformation clusterInformation = new ClusterInformation("localhost", 1234);
    private static final MetricRegistry metricRegistry = TestingMetricRegistry.builder().build();
    private static TestingRpcService rpcService;
    private static TestingHighAvailabilityServices haService;
    private static TestingFatalErrorHandler fatalErrorHandler;
    private TestingResourceManagerFactory.Builder rmFactoryBuilder;
    private TestingLeaderElection leaderElection;
    private ResourceManagerServiceImpl resourceManagerService;

    ResourceManagerServiceImplTest() {
    }

    @BeforeAll
    static void setupClass() {
        rpcService = new TestingRpcService();
        haService = new TestingHighAvailabilityServices();
        fatalErrorHandler = new TestingFatalErrorHandler();
    }

    @BeforeEach
    void setup() {
        fatalErrorHandler.clearError();
        this.rmFactoryBuilder = new TestingResourceManagerFactory.Builder();
        this.leaderElection = new TestingLeaderElection();
        haService.setResourceManagerLeaderElection(this.leaderElection);
    }

    @AfterEach
    void teardown() throws Exception {
        this.leaderElection.close();
        if (this.resourceManagerService != null) {
            this.resourceManagerService.close();
        }
        if (fatalErrorHandler.hasExceptionOccurred()) {
            fatalErrorHandler.rethrowError();
        }
    }

    @AfterAll
    static void teardownClass() throws Exception {
        if (rpcService != null) {
            RpcUtils.terminateRpcService(new RpcService[]{rpcService});
        }
    }

    private void createAndStartResourceManager() throws Exception {
        createResourceManager();
        this.resourceManagerService.start();
    }

    private void createResourceManager() throws Exception {
        this.resourceManagerService = ResourceManagerServiceImpl.create(this.rmFactoryBuilder.build(), new Configuration(), ResourceID.generate(), rpcService, haService, heartbeatServices, delegationTokenManager, fatalErrorHandler, clusterInformation, (String) null, metricRegistry, "localhost", ForkJoinPool.commonPool());
    }

    @Test
    void grantLeadership_startRmAndConfirmLeaderSession() throws Exception {
        UUID randomUUID = UUID.randomUUID();
        CompletableFuture completableFuture = new CompletableFuture();
        TestingResourceManagerFactory.Builder builder = this.rmFactoryBuilder;
        Objects.requireNonNull(completableFuture);
        builder.setInitializeConsumer((v1) -> {
            r1.complete(v1);
        });
        createAndStartResourceManager();
        CompletableFuture<LeaderInformation> isLeader = this.leaderElection.isLeader(randomUUID);
        FlinkAssertions.assertThatFuture(completableFuture).eventuallySucceeds().isSameAs(randomUUID);
        Assertions.assertThat(isLeader.get().getLeaderSessionID()).isSameAs(randomUUID);
    }

    @Test
    void grantLeadership_confirmLeaderSessionAfterRmStarted() throws Exception {
        UUID randomUUID = UUID.randomUUID();
        CompletableFuture completableFuture = new CompletableFuture();
        this.rmFactoryBuilder.setInitializeConsumer(uuid -> {
            blockOnFuture(completableFuture);
        });
        createAndStartResourceManager();
        CompletableFuture<LeaderInformation> isLeader = this.leaderElection.isLeader(randomUUID);
        assertNotComplete(isLeader);
        completableFuture.complete(null);
        Assertions.assertThat(isLeader.get().getLeaderSessionID()).isSameAs(randomUUID);
    }

    @Test
    void grantLeadership_withExistingLeader_stopExistLeader() throws Exception {
        UUID randomUUID = UUID.randomUUID();
        UUID randomUUID2 = UUID.randomUUID();
        CompletableFuture completableFuture = new CompletableFuture();
        CompletableFuture completableFuture2 = new CompletableFuture();
        CompletableFuture completableFuture3 = new CompletableFuture();
        TestingResourceManagerFactory.Builder initializeConsumer = this.rmFactoryBuilder.setInitializeConsumer(uuid -> {
            if (completableFuture.isDone()) {
                completableFuture2.complete(uuid);
            } else {
                completableFuture.complete(uuid);
            }
        });
        Objects.requireNonNull(completableFuture3);
        initializeConsumer.setTerminateConsumer((v1) -> {
            r1.complete(v1);
        });
        createAndStartResourceManager();
        this.leaderElection.isLeader(randomUUID).join();
        CompletableFuture<LeaderInformation> isLeader = this.leaderElection.isLeader(randomUUID2);
        FlinkAssertions.assertThatFuture(completableFuture3).eventuallySucceeds().isSameAs(randomUUID);
        FlinkAssertions.assertThatFuture(completableFuture2).eventuallySucceeds().isSameAs(randomUUID2);
        Assertions.assertThat(isLeader.get().getLeaderSessionID()).isSameAs(randomUUID2);
    }

    @Test
    void grantLeadership_withExistingLeader_waitTerminationOfExistingLeader() throws Exception {
        UUID randomUUID = UUID.randomUUID();
        UUID randomUUID2 = UUID.randomUUID();
        CompletableFuture completableFuture = new CompletableFuture();
        CompletableFuture completableFuture2 = new CompletableFuture();
        CompletableFuture completableFuture3 = new CompletableFuture();
        this.rmFactoryBuilder.setInitializeConsumer(uuid -> {
            if (completableFuture.isDone()) {
                completableFuture2.complete(uuid);
            } else {
                completableFuture.complete(uuid);
            }
        }).setTerminateConsumer(uuid2 -> {
            blockOnFuture(completableFuture3);
        });
        createAndStartResourceManager();
        this.leaderElection.isLeader(randomUUID).join();
        CompletableFuture<LeaderInformation> isLeader = this.leaderElection.isLeader(randomUUID2);
        assertNotComplete(completableFuture2);
        completableFuture3.complete(null);
        FlinkAssertions.assertThatFuture(completableFuture2).eventuallySucceeds().isSameAs(randomUUID2);
        Assertions.assertThat(isLeader.get().getLeaderSessionID()).isSameAs(randomUUID2);
    }

    @Test
    void grantLeadership_notStarted_doesNotStartNewRm() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        TestingResourceManagerFactory.Builder builder = this.rmFactoryBuilder;
        Objects.requireNonNull(completableFuture);
        builder.setInitializeConsumer((v1) -> {
            r1.complete(v1);
        });
        createResourceManager();
        CompletableFuture<LeaderInformation> isLeader = this.leaderElection.isLeader(UUID.randomUUID());
        assertNotComplete(completableFuture);
        assertNotComplete(isLeader);
    }

    @Test
    void grantLeadership_stopped_doesNotStartNewRm() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        TestingResourceManagerFactory.Builder builder = this.rmFactoryBuilder;
        Objects.requireNonNull(completableFuture);
        builder.setInitializeConsumer((v1) -> {
            r1.complete(v1);
        });
        createAndStartResourceManager();
        this.resourceManagerService.close();
        CompletableFuture<LeaderInformation> isLeader = this.leaderElection.isLeader(UUID.randomUUID());
        assertNotComplete(completableFuture);
        assertNotComplete(isLeader);
    }

    @Test
    void revokeLeadership_stopExistLeader() throws Exception {
        UUID randomUUID = UUID.randomUUID();
        CompletableFuture completableFuture = new CompletableFuture();
        TestingResourceManagerFactory.Builder builder = this.rmFactoryBuilder;
        Objects.requireNonNull(completableFuture);
        builder.setTerminateConsumer((v1) -> {
            r1.complete(v1);
        });
        createAndStartResourceManager();
        this.leaderElection.isLeader(randomUUID).join();
        this.leaderElection.notLeader();
        FlinkAssertions.assertThatFuture(completableFuture).eventuallySucceeds().isSameAs(randomUUID);
    }

    @Test
    void revokeLeadership_terminateService_multiLeaderSessionNotSupported() throws Exception {
        this.rmFactoryBuilder.setSupportMultiLeaderSession(false);
        createAndStartResourceManager();
        this.leaderElection.isLeader(UUID.randomUUID()).join();
        this.leaderElection.notLeader();
        this.resourceManagerService.getTerminationFuture().get();
    }

    @Test
    void leaderRmTerminated_terminateService() throws Exception {
        UUID randomUUID = UUID.randomUUID();
        CompletableFuture completableFuture = new CompletableFuture();
        this.rmFactoryBuilder.setGetTerminationFutureFunction((resourceManager, completableFuture2) -> {
            return completableFuture;
        });
        createAndStartResourceManager();
        this.leaderElection.isLeader(randomUUID).join();
        completableFuture.complete(null);
        this.resourceManagerService.getTerminationFuture().get();
    }

    @Test
    void nonLeaderRmTerminated_doseNotTerminateService() throws Exception {
        UUID randomUUID = UUID.randomUUID();
        CompletableFuture completableFuture = new CompletableFuture();
        CompletableFuture completableFuture2 = new CompletableFuture();
        TestingResourceManagerFactory.Builder builder = this.rmFactoryBuilder;
        Objects.requireNonNull(completableFuture);
        builder.setTerminateConsumer((v1) -> {
            r1.complete(v1);
        }).setGetTerminationFutureFunction((resourceManager, completableFuture3) -> {
            return completableFuture2;
        });
        createAndStartResourceManager();
        this.leaderElection.isLeader(randomUUID).join();
        this.leaderElection.notLeader();
        FlinkAssertions.assertThatFuture(completableFuture).eventuallySucceeds().isSameAs(randomUUID);
        completableFuture2.complete(null);
        assertNotComplete(this.resourceManagerService.getTerminationFuture());
    }

    @Test
    void closeService_stopRmAndLeaderElection() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        TestingResourceManagerFactory.Builder builder = this.rmFactoryBuilder;
        Objects.requireNonNull(completableFuture);
        builder.setTerminateConsumer((v1) -> {
            r1.complete(v1);
        });
        createAndStartResourceManager();
        this.leaderElection.isLeader(UUID.randomUUID()).join();
        Assertions.assertThat(this.leaderElection.isStopped()).isFalse();
        this.resourceManagerService.close();
        FlinkAssertions.assertThatFuture(completableFuture).isDone();
        Assertions.assertThat(this.leaderElection.isStopped()).isTrue();
    }

    @Test
    void closeService_futureCompleteAfterRmTerminated() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        this.rmFactoryBuilder.setTerminateConsumer(uuid -> {
            blockOnFuture(completableFuture);
        });
        createAndStartResourceManager();
        this.leaderElection.isLeader(UUID.randomUUID()).join();
        CompletableFuture closeAsync = this.resourceManagerService.closeAsync();
        assertNotComplete(closeAsync);
        completableFuture.complete(null);
        closeAsync.get();
    }

    @Test
    void deregisterApplication_leaderRmNotStarted() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        CompletableFuture completableFuture2 = new CompletableFuture();
        this.rmFactoryBuilder.setInitializeConsumer(uuid -> {
            completableFuture.complete(null);
            blockOnFuture(completableFuture2);
        });
        createAndStartResourceManager();
        this.leaderElection.isLeader(UUID.randomUUID());
        completableFuture.get();
        CompletableFuture deregisterApplication = this.resourceManagerService.deregisterApplication(ApplicationStatus.CANCELED, (String) null);
        assertNotComplete(deregisterApplication);
        completableFuture2.complete(null);
        FlinkAssertions.assertThatFuture(deregisterApplication).eventuallySucceeds();
    }

    @Test
    void deregisterApplication_noLeaderRm() throws Exception {
        createAndStartResourceManager();
        FlinkAssertions.assertThatFuture(this.resourceManagerService.deregisterApplication(ApplicationStatus.CANCELED, (String) null)).eventuallySucceeds();
    }

    @Test
    void grantAndRevokeLeadership_verifyMetrics() throws Exception {
        Set newSetFromMap = Collections.newSetFromMap(new ConcurrentHashMap());
        this.resourceManagerService = ResourceManagerServiceImpl.create(this.rmFactoryBuilder.build(), new Configuration(), ResourceID.generate(), rpcService, haService, heartbeatServices, delegationTokenManager, fatalErrorHandler, clusterInformation, (String) null, TestingMetricRegistry.builder().setRegisterConsumer((metric, str, abstractMetricGroup) -> {
            newSetFromMap.add(str);
        }).setUnregisterConsumer((metric2, str2, abstractMetricGroup2) -> {
            newSetFromMap.remove(str2);
        }).build(), "localhost", ForkJoinPool.commonPool());
        this.resourceManagerService.start();
        Assertions.assertThat(newSetFromMap).isEmpty();
        this.leaderElection.isLeader(UUID.randomUUID()).join();
        Set set = Sets.set(new String[]{"numRegisteredTaskManagers", "taskSlotsTotal", "taskSlotsAvailable"});
        Assertions.assertThat(newSetFromMap).as("Expected RM to register leader metrics", new Object[0]).containsAll(set);
        revokeLeadership();
        HashSet hashSet = new HashSet(newSetFromMap);
        hashSet.retainAll(set);
        Assertions.assertThat(hashSet).as("Expected RM to unregister leader metrics", new Object[0]).isEmpty();
        this.leaderElection.isLeader(UUID.randomUUID()).join();
        Assertions.assertThat(newSetFromMap).as("Expected RM to re-register leader metrics", new Object[0]).containsAll(set);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void blockOnFuture(CompletableFuture<?> completableFuture) {
        try {
            completableFuture.get();
        } catch (Exception e) {
            e.printStackTrace();
            org.junit.jupiter.api.Assertions.fail();
        }
    }

    private static void assertNotComplete(CompletableFuture<?> completableFuture) {
        FlinkAssertions.assertThatFuture(completableFuture).failsWithin(50L, TimeUnit.MILLISECONDS).withThrowableOfType(TimeoutException.class);
    }

    private void revokeLeadership() {
        ResourceManager leaderResourceManager = this.resourceManagerService.getLeaderResourceManager();
        this.leaderElection.notLeader();
        blockOnFuture(leaderResourceManager.getTerminationFuture());
    }
}
