package org.apache.flink.runtime.resourcemanager;

import java.time.Duration;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.flink.api.common.JobID;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.core.testutils.FlinkCompletableFutureAssert;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess;
import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway;
import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
import org.apache.flink.runtime.leaderelection.TestingLeaderElection;
import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.registration.RegistrationResponse;
import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.rpc.exceptions.FencingTokenException;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
import org.apache.flink.util.FlinkRuntimeException;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.class */
class ResourceManagerJobMasterTest {
    private static final Duration TIMEOUT = Duration.ofSeconds(10);
    private TestingRpcService rpcService;
    private JobID jobId;
    private ResourceID jobMasterResourceId;
    private TestingJobMasterGateway jobMasterGateway;
    private SettableLeaderRetrievalService jobMasterLeaderRetrievalService;
    private TestingResourceManagerService resourceManagerService;
    private ResourceManagerGateway resourceManagerGateway;

    ResourceManagerJobMasterTest() {
    }

    @BeforeEach
    void setup() throws Exception {
        this.rpcService = new TestingRpcService();
        this.jobId = new JobID();
        this.jobMasterResourceId = ResourceID.generate();
        createAndRegisterJobMasterGateway();
        createAndStartResourceManagerService();
    }

    private void createAndRegisterJobMasterGateway() {
        this.jobMasterGateway = new TestingJobMasterGatewayBuilder().build();
        this.rpcService.registerGateway(this.jobMasterGateway.getAddress(), this.jobMasterGateway);
        this.jobMasterLeaderRetrievalService = new SettableLeaderRetrievalService(this.jobMasterGateway.getAddress(), this.jobMasterGateway.m242getFencingToken().toUUID());
    }

    private void createAndStartResourceManagerService() throws Exception {
        this.resourceManagerService = TestingResourceManagerService.newBuilder().setRpcService(this.rpcService).setJmLeaderRetrieverFunction(jobID -> {
            if (jobID.equals(this.jobId)) {
                return this.jobMasterLeaderRetrievalService;
            }
            throw new FlinkRuntimeException(String.format("Unknown job id %s", this.jobId));
        }).setRmLeaderElection(new TestingLeaderElection()).build();
        this.resourceManagerService.start();
        this.resourceManagerService.isLeader(UUID.randomUUID()).join();
        this.resourceManagerGateway = this.resourceManagerService.getResourceManagerGateway().orElseThrow(() -> {
            return new AssertionError("RM not available after confirming leadership.");
        });
    }

    @AfterEach
    void teardown() throws Exception {
        if (this.resourceManagerService != null) {
            this.resourceManagerService.rethrowFatalErrorIfAny();
            this.resourceManagerService.cleanUp();
        }
        if (this.rpcService != null) {
            RpcUtils.terminateRpcService(new RpcService[]{this.rpcService});
        }
    }

    @Test
    void testRegisterJobMaster() {
        FlinkAssertions.assertThatFuture(this.resourceManagerGateway.registerJobMaster(this.jobMasterGateway.m242getFencingToken(), this.jobMasterResourceId, this.jobMasterGateway.getAddress(), this.jobId, TIMEOUT)).succeedsWithin(TIMEOUT.toMillis(), TimeUnit.MILLISECONDS).isInstanceOf(JobMasterRegistrationSuccess.class);
    }

    @Test
    void testDisconnectTaskManagerInResourceManager() throws ExecutionException, InterruptedException, TimeoutException {
        ResourceID generate = ResourceID.generate();
        CompletableFuture completableFuture = new CompletableFuture();
        CompletableFuture completableFuture2 = new CompletableFuture();
        RpcGateway build = new TestingJobMasterGatewayBuilder().setDisconnectTaskManagerFunction(resourceID -> {
            completableFuture2.complete(resourceID);
            return CompletableFuture.completedFuture(Acknowledge.get());
        }).setAddress("pekko.tcp://flink@localhost:6130/user/jobmanager2").build();
        this.rpcService.registerGateway(build.getAddress(), build);
        ResourceManagerGateway orElseThrow = this.resourceManagerService.getResourceManagerGateway().orElseThrow(() -> {
            return new AssertionError("RM not available after confirming leadership.");
        });
        FlinkAssertions.assertThatFuture(orElseThrow.registerJobMaster(build.m242getFencingToken(), this.jobMasterResourceId, build.getAddress(), this.jobId, TIMEOUT)).succeedsWithin(TIMEOUT.toMillis(), TimeUnit.MILLISECONDS).isInstanceOf(JobMasterRegistrationSuccess.class);
        TestingTaskExecutorGatewayBuilder address = new TestingTaskExecutorGatewayBuilder().setAddress(UUID.randomUUID().toString());
        Objects.requireNonNull(completableFuture);
        RpcGateway createTestingTaskExecutorGateway = address.setDisconnectResourceManagerConsumer((v1) -> {
            r1.complete(v1);
        }).createTestingTaskExecutorGateway();
        this.rpcService.registerGateway(createTestingTaskExecutorGateway.getAddress(), createTestingTaskExecutorGateway);
        ResourceManagerPartitionLifecycleTest.registerTaskExecutor(orElseThrow, generate, createTestingTaskExecutorGateway.getAddress());
        orElseThrow.disconnectTaskManager(generate, new Exception("for test"));
        FlinkAssertions.assertThatFuture(completableFuture).succeedsWithin(TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
        Assertions.assertThat((ResourceID) completableFuture2.get(TIMEOUT.toMillis(), TimeUnit.MILLISECONDS)).isEqualTo(generate);
    }

    @Test
    void testRegisterJobMasterWithUnmatchedLeaderSessionId1() throws Exception {
        FlinkAssertions.assertThatFuture(((ResourceManagerGateway) this.rpcService.connect(this.resourceManagerGateway.getAddress(), ResourceManagerId.generate(), ResourceManagerGateway.class).get(TIMEOUT.toMillis(), TimeUnit.MILLISECONDS)).registerJobMaster(this.jobMasterGateway.m242getFencingToken(), this.jobMasterResourceId, this.jobMasterGateway.getAddress(), this.jobId, TIMEOUT)).withFailMessage("Should fail because we are using the wrong fencing token.", new Object[0]).failsWithin(5L, TimeUnit.SECONDS).withThrowableOfType(ExecutionException.class).withCauseInstanceOf(FencingTokenException.class);
    }

    @Test
    void testRegisterJobMasterWithUnmatchedLeaderSessionId2() {
        FlinkAssertions.assertThatFuture(this.resourceManagerGateway.registerJobMaster(JobMasterId.generate(), this.jobMasterResourceId, this.jobMasterGateway.getAddress(), this.jobId, TIMEOUT)).eventuallySucceeds().isInstanceOf(RegistrationResponse.Failure.class);
    }

    @Test
    void testRegisterJobMasterFromInvalidAddress() {
        FlinkAssertions.assertThatFuture(this.resourceManagerGateway.registerJobMaster(new JobMasterId(HighAvailabilityServices.DEFAULT_LEADER_ID), this.jobMasterResourceId, "/jobMasterAddress2", this.jobId, TIMEOUT)).succeedsWithin(5L, TimeUnit.SECONDS).isOfAnyClassIn(new Class[]{RegistrationResponse.Failure.class});
    }

    @Test
    void testRegisterJobMasterWithFailureLeaderListener() {
        ((FlinkCompletableFutureAssert) FlinkAssertions.assertThatFuture(this.resourceManagerGateway.registerJobMaster(this.jobMasterGateway.m242getFencingToken(), this.jobMasterResourceId, this.jobMasterGateway.getAddress(), new JobID(), TIMEOUT)).as("Expected to fail with a ResourceManagerException.", new Object[0])).failsWithin(TIMEOUT.toMillis(), TimeUnit.MILLISECONDS).withThrowableOfType(ExecutionException.class).withCauseInstanceOf(ResourceManagerException.class);
        this.resourceManagerService.ignoreFatalErrors();
    }
}
