package org.apache.flink.runtime.taskexecutor;

import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import org.apache.flink.api.common.JobID;
import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServicesBuilder;
import org.apache.flink.runtime.jobmaster.JMTMRegistrationRejection;
import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess;
import org.apache.flink.runtime.jobmaster.JobMasterGateway;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway;
import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
import org.apache.flink.runtime.registration.RetryingRegistrationConfiguration;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.TestingRpcServiceResource;
import org.apache.flink.runtime.taskmanager.LocalUnresolvedTaskManagerLocation;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
import org.hamcrest.CoreMatchers;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/taskexecutor/DefaultJobLeaderServiceTest.class */
public class DefaultJobLeaderServiceTest extends TestLogger {

    @Rule
    public final TestingRpcServiceResource rpcServiceResource = new TestingRpcServiceResource();

    /* loaded from: input_file:org/apache/flink/runtime/taskexecutor/DefaultJobLeaderServiceTest$FailingSettableLeaderRetrievalService.class */
    private static final class FailingSettableLeaderRetrievalService extends SettableLeaderRetrievalService {
        private FailingSettableLeaderRetrievalService() {
        }

        @Override // org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService
        public void stop() throws FlinkException {
            throw new FlinkException("Test exception");
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/taskexecutor/DefaultJobLeaderServiceTest$TestingJobLeaderListener.class */
    private static final class TestingJobLeaderListener implements JobLeaderListener {
        private final CountDownLatch jobManagerLostLeadership;
        private final Consumer<JobID> jobManagerGainedLeadership;
        private final Consumer<JobID> jobManagerRejectedRegistrationConsumer;

        private TestingJobLeaderListener() {
            this((Consumer<JobID>) jobID -> {
            });
        }

        private TestingJobLeaderListener(Consumer<JobID> consumer) {
            this(consumer, (Consumer<JobID>) jobID -> {
            });
        }

        private TestingJobLeaderListener(Consumer<JobID> consumer, Consumer<JobID> consumer2) {
            this.jobManagerLostLeadership = new CountDownLatch(1);
            this.jobManagerGainedLeadership = consumer;
            this.jobManagerRejectedRegistrationConsumer = consumer2;
        }

        public void jobManagerGainedLeadership(JobID jobID, JobMasterGateway jobMasterGateway, JMTMRegistrationSuccess jMTMRegistrationSuccess) {
            this.jobManagerGainedLeadership.accept(jobID);
        }

        public void jobManagerLostLeadership(JobID jobID, JobMasterId jobMasterId) {
            this.jobManagerLostLeadership.countDown();
        }

        public void handleError(Throwable th) {
        }

        public void jobManagerRejectedRegistration(JobID jobID, String str, JMTMRegistrationRejection jMTMRegistrationRejection) {
            this.jobManagerRejectedRegistrationConsumer.accept(jobID);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void waitUntilJobManagerLostLeadership() throws InterruptedException {
            this.jobManagerLostLeadership.await();
        }
    }

    @Test
    public void handlesConcurrentJobAdditionsAndLeaderChanges() throws Exception {
        final DefaultJobLeaderService defaultJobLeaderService = new DefaultJobLeaderService(new LocalUnresolvedTaskManagerLocation(), RetryingRegistrationConfiguration.defaultConfiguration());
        TestingJobLeaderListener testingJobLeaderListener = new TestingJobLeaderListener();
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(20);
        defaultJobLeaderService.start("foobar", this.rpcServiceResource.getTestingRpcService(), new TestingHighAvailabilityServicesBuilder().setJobMasterLeaderRetrieverFunction(jobID -> {
            SettableLeaderRetrievalService settableLeaderRetrievalService = new SettableLeaderRetrievalService();
            arrayBlockingQueue.offer(settableLeaderRetrievalService);
            return settableLeaderRetrievalService;
        }).build(), testingJobLeaderListener);
        CheckedThread checkedThread = new CheckedThread() { // from class: org.apache.flink.runtime.taskexecutor.DefaultJobLeaderServiceTest.1
            public void go() throws Exception {
                for (int i = 0; i < 20; i++) {
                    JobID generate = JobID.generate();
                    defaultJobLeaderService.addJob(generate, "foobar");
                    Thread.yield();
                    defaultJobLeaderService.removeJob(generate);
                }
            }
        };
        checkedThread.start();
        for (int i = 0; i < 20; i++) {
            ((SettableLeaderRetrievalService) arrayBlockingQueue.take()).notifyListener("foobar", UUID.randomUUID());
        }
        checkedThread.sync();
    }

    @Test
    public void doesNotReconnectAfterTargetLostLeadership() throws Exception {
        JobID jobID = new JobID();
        SettableLeaderRetrievalService settableLeaderRetrievalService = new SettableLeaderRetrievalService();
        HighAvailabilityServices build = new TestingHighAvailabilityServicesBuilder().setJobMasterLeaderRetrieverFunction(jobID2 -> {
            return settableLeaderRetrievalService;
        }).build();
        TestingJobMasterGateway registerJobMaster = registerJobMaster();
        OneShotLatch oneShotLatch = new OneShotLatch();
        TestingJobLeaderListener testingJobLeaderListener = new TestingJobLeaderListener(jobID3 -> {
            oneShotLatch.trigger();
        });
        JobLeaderService createAndStartJobLeaderService = createAndStartJobLeaderService(build, testingJobLeaderListener);
        try {
            createAndStartJobLeaderService.addJob(jobID, registerJobMaster.getAddress());
            settableLeaderRetrievalService.notifyListener(registerJobMaster.getAddress(), UUID.randomUUID());
            oneShotLatch.await();
            settableLeaderRetrievalService.notifyListener(null, null);
            testingJobLeaderListener.waitUntilJobManagerLostLeadership();
            createAndStartJobLeaderService.reconnect(jobID);
            createAndStartJobLeaderService.stop();
        } catch (Throwable th) {
            createAndStartJobLeaderService.stop();
            throw th;
        }
    }

    @Test
    public void canReconnectToOldLeaderWithSameLeaderAddress() throws Exception {
        JobID jobID = new JobID();
        SettableLeaderRetrievalService settableLeaderRetrievalService = new SettableLeaderRetrievalService();
        HighAvailabilityServices build = new TestingHighAvailabilityServicesBuilder().setJobMasterLeaderRetrieverFunction(jobID2 -> {
            return settableLeaderRetrievalService;
        }).build();
        TestingJobMasterGateway registerJobMaster = registerJobMaster();
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(1);
        arrayBlockingQueue.getClass();
        TestingJobLeaderListener testingJobLeaderListener = new TestingJobLeaderListener((v1) -> {
            r2.offer(v1);
        });
        JobLeaderService createAndStartJobLeaderService = createAndStartJobLeaderService(build, testingJobLeaderListener);
        try {
            createAndStartJobLeaderService.addJob(jobID, registerJobMaster.getAddress());
            UUID randomUUID = UUID.randomUUID();
            settableLeaderRetrievalService.notifyListener(registerJobMaster.getAddress(), randomUUID);
            Assert.assertThat(arrayBlockingQueue.take(), CoreMatchers.is(jobID));
            settableLeaderRetrievalService.notifyListener(null, null);
            testingJobLeaderListener.waitUntilJobManagerLostLeadership();
            settableLeaderRetrievalService.notifyListener(registerJobMaster.getAddress(), randomUUID);
            Assert.assertThat(arrayBlockingQueue.take(), CoreMatchers.is(jobID));
            createAndStartJobLeaderService.stop();
        } catch (Throwable th) {
            createAndStartJobLeaderService.stop();
            throw th;
        }
    }

    @Test
    public void removeJobWithFailingLeaderRetrievalServiceStopWillStopListeningToLeaderNotifications() throws Exception {
        FailingSettableLeaderRetrievalService failingSettableLeaderRetrievalService = new FailingSettableLeaderRetrievalService();
        TestingHighAvailabilityServices build = new TestingHighAvailabilityServicesBuilder().setJobMasterLeaderRetrieverFunction(jobID -> {
            return failingSettableLeaderRetrievalService;
        }).build();
        JobID jobID2 = new JobID();
        CompletableFuture completableFuture = new CompletableFuture();
        completableFuture.getClass();
        TestingJobLeaderListener testingJobLeaderListener = new TestingJobLeaderListener((v1) -> {
            r2.complete(v1);
        });
        RpcGateway build2 = new TestingJobMasterGatewayBuilder().build();
        this.rpcServiceResource.getTestingRpcService().registerGateway(build2.getAddress(), build2);
        JobLeaderService createAndStartJobLeaderService = createAndStartJobLeaderService(build, testingJobLeaderListener);
        try {
            createAndStartJobLeaderService.addJob(jobID2, "foobar");
            createAndStartJobLeaderService.removeJob(jobID2);
            failingSettableLeaderRetrievalService.notifyListener(build2.getAddress(), build2.m185getFencingToken().toUUID());
            try {
                completableFuture.get(10L, TimeUnit.MILLISECONDS);
                Assert.fail("The leader future should not be completed.");
            } catch (TimeoutException e) {
            }
        } finally {
            createAndStartJobLeaderService.stop();
        }
    }

    @Test
    public void rejectedJobManagerRegistrationCallsJobLeaderListener() throws Exception {
        SettableLeaderRetrievalService settableLeaderRetrievalService = new SettableLeaderRetrievalService();
        TestingHighAvailabilityServices build = new TestingHighAvailabilityServicesBuilder().setJobMasterLeaderRetrieverFunction(jobID -> {
            return settableLeaderRetrievalService;
        }).build();
        JobID jobID2 = new JobID();
        CompletableFuture completableFuture = new CompletableFuture();
        Consumer consumer = jobID3 -> {
        };
        completableFuture.getClass();
        JobLeaderService createAndStartJobLeaderService = createAndStartJobLeaderService(build, new TestingJobLeaderListener(consumer, (v1) -> {
            r3.complete(v1);
        }));
        RpcGateway build2 = new TestingJobMasterGatewayBuilder().setRegisterTaskManagerFunction((str, unresolvedTaskManagerLocation, jobID4) -> {
            return CompletableFuture.completedFuture(new JMTMRegistrationRejection("foobar"));
        }).build();
        this.rpcServiceResource.getTestingRpcService().registerGateway(build2.getAddress(), build2);
        try {
            createAndStartJobLeaderService.addJob(jobID2, "foobar");
            settableLeaderRetrievalService.notifyListener(build2.getAddress(), build2.m185getFencingToken().toUUID());
            Assert.assertThat(completableFuture.get(), CoreMatchers.is(jobID2));
            createAndStartJobLeaderService.stop();
        } catch (Throwable th) {
            createAndStartJobLeaderService.stop();
            throw th;
        }
    }

    private JobLeaderService createAndStartJobLeaderService(HighAvailabilityServices highAvailabilityServices, JobLeaderListener jobLeaderListener) {
        DefaultJobLeaderService defaultJobLeaderService = new DefaultJobLeaderService(new LocalUnresolvedTaskManagerLocation(), RetryingRegistrationConfiguration.defaultConfiguration());
        defaultJobLeaderService.start("foobar", this.rpcServiceResource.getTestingRpcService(), highAvailabilityServices, jobLeaderListener);
        return defaultJobLeaderService;
    }

    private TestingJobMasterGateway registerJobMaster() {
        RpcGateway build = new TestingJobMasterGatewayBuilder().build();
        this.rpcServiceResource.getTestingRpcService().registerGateway(build.getAddress(), build);
        return build;
    }
}
