package org.apache.flink.runtime.leaderelection;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.leaderelection.DefaultLeaderElection;
import org.apache.flink.runtime.leaderelection.LeaderElectionDriver;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.apache.flink.util.concurrent.FutureUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.class */
public class DefaultLeaderElectionService extends DefaultLeaderElection.ParentService implements LeaderElectionService, LeaderElectionDriver.Listener, AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger(DefaultLeaderElectionService.class);
    private static final String LEADER_ACQUISITION_EVENT_LOG_NAME = "Leader Acquisition";
    private static final String LEADER_REVOCATION_EVENT_LOG_NAME = "Leader Revocation";
    private final Object lock;
    private final LeaderElectionDriverFactory leaderElectionDriverFactory;

    @GuardedBy("lock")
    private final Map<String, LeaderContender> leaderContenderRegistry;

    @GuardedBy("lock")
    @Nullable
    private UUID issuedLeaderSessionID;

    @GuardedBy("lock")
    private LeaderInformationRegister confirmedLeaderInformation;

    @GuardedBy("lock")
    private boolean running;

    @GuardedBy("lock")
    private LeaderElectionDriver leaderElectionDriver;
    private final ExecutorService leadershipOperationExecutor;
    private final FatalErrorHandler fallbackErrorHandler;

    public DefaultLeaderElectionService(LeaderElectionDriverFactory leaderElectionDriverFactory) {
        this(leaderElectionDriverFactory, th -> {
            LOG.debug("Ignoring error notification since there's no contender registered.");
        });
    }

    @VisibleForTesting
    public DefaultLeaderElectionService(LeaderElectionDriverFactory leaderElectionDriverFactory, FatalErrorHandler fatalErrorHandler) {
        this(leaderElectionDriverFactory, fatalErrorHandler, Executors.newSingleThreadExecutor(new ExecutorThreadFactory("DefaultLeaderElectionService-leadershipOperationExecutor")));
    }

    @VisibleForTesting
    DefaultLeaderElectionService(LeaderElectionDriverFactory leaderElectionDriverFactory, FatalErrorHandler fatalErrorHandler, ExecutorService executorService) {
        this.lock = new Object();
        this.leaderContenderRegistry = new HashMap();
        this.leaderElectionDriverFactory = (LeaderElectionDriverFactory) Preconditions.checkNotNull(leaderElectionDriverFactory);
        this.fallbackErrorHandler = (FatalErrorHandler) Preconditions.checkNotNull(fatalErrorHandler);
        this.issuedLeaderSessionID = null;
        this.leaderElectionDriver = null;
        this.confirmedLeaderInformation = LeaderInformationRegister.empty();
        this.leadershipOperationExecutor = (ExecutorService) Preconditions.checkNotNull(executorService);
        this.running = true;
    }

    @Override // org.apache.flink.runtime.leaderelection.LeaderElectionService
    public LeaderElection createLeaderElection(String str) {
        DefaultLeaderElection defaultLeaderElection;
        synchronized (this.lock) {
            Preconditions.checkState(!this.leadershipOperationExecutor.isShutdown(), "The service was already closed and cannot be reused.");
            Preconditions.checkState(!this.leaderContenderRegistry.containsKey(str), "There shouldn't be any contender registered under the passed component '%s'.", str);
            defaultLeaderElection = new DefaultLeaderElection(this, str);
        }
        return defaultLeaderElection;
    }

    @GuardedBy("lock")
    private void createLeaderElectionDriver() throws Exception {
        Preconditions.checkState(this.leaderContenderRegistry.isEmpty(), "No LeaderContender should have been registered, yet.");
        Preconditions.checkState(this.leaderElectionDriver == null, "This DefaultLeaderElectionService cannot be reused. Calling startLeaderElectionBackend can only be called once to establish the connection to the HA backend.");
        this.leaderElectionDriver = this.leaderElectionDriverFactory.create(this);
        LOG.info("A connection to the HA backend was established through LeaderElectionDriver {}.", this.leaderElectionDriver);
    }

    @Override // org.apache.flink.runtime.leaderelection.DefaultLeaderElection.ParentService
    protected void register(String str, LeaderContender leaderContender) throws Exception {
        Preconditions.checkNotNull(str, "componentId must not be null.");
        Preconditions.checkNotNull(leaderContender, "Contender must not be null.");
        synchronized (this.lock) {
            Preconditions.checkState(this.running, "The DefaultLeaderElectionService should have established a connection to the backend before it's started.");
            if (this.leaderElectionDriver == null) {
                createLeaderElectionDriver();
            }
            Preconditions.checkState(this.leaderContenderRegistry.put(str, leaderContender) == null, "There shouldn't be any contender registered under the passed component '%s'.", str);
            LOG.info("LeaderContender has been registered under component '{}' for {}.", str, this.leaderElectionDriver);
            if (this.issuedLeaderSessionID != null) {
                runInLeaderEventThread(LEADER_ACQUISITION_EVENT_LOG_NAME, () -> {
                    notifyLeaderContenderOfLeadership(str, this.issuedLeaderSessionID);
                });
            }
        }
    }

    @Override // org.apache.flink.runtime.leaderelection.DefaultLeaderElection.ParentService
    protected final void remove(String str) throws Exception {
        AutoCloseable autoCloseable = null;
        synchronized (this.lock) {
            if (!this.leaderContenderRegistry.containsKey(str)) {
                LOG.debug("There is no contender registered under component '{}' anymore. No action necessary.", str);
                return;
            }
            Preconditions.checkState(this.leaderElectionDriver != null, "The LeaderElectionDriver should be instantiated.");
            LOG.info("Deregistering contender with component '{}' from the DefaultLeaderElectionService.", str);
            LeaderContender remove = this.leaderContenderRegistry.remove(str);
            Preconditions.checkNotNull(remove, "There should be a LeaderContender registered under the given component '%s'.", str);
            if (this.issuedLeaderSessionID != null) {
                notifyLeaderContenderOfLeadershipLoss(str, remove);
                LOG.debug("The contender associated with component '{}' is deregistered while the service has the leadership acquired. The revoke event is forwarded to the LeaderContender.", str);
                if (this.leaderElectionDriver.hasLeadership()) {
                    this.leaderElectionDriver.deleteLeaderInformation(str);
                    LOG.debug("Leader information is cleaned up while deregistering the contender for component '{}' from the service.", str);
                }
            } else {
                Preconditions.checkState(this.confirmedLeaderInformation.hasNoLeaderInformation(), "The confirmed leader information should have been cleared during leadership revocation.");
                LOG.debug("Contender associated with component '{}' is deregistered while the service doesn't have the leadership acquired. No cleanup necessary.", str);
            }
            if (this.leaderContenderRegistry.isEmpty()) {
                autoCloseable = deregisterDriver();
            }
            if (autoCloseable != null) {
                autoCloseable.close();
            }
        }
    }

    @GuardedBy("lock")
    private AutoCloseable deregisterDriver() {
        Preconditions.checkState(this.leaderContenderRegistry.isEmpty(), "No contender should be registered when deregistering the driver.");
        Preconditions.checkState(this.leaderElectionDriver != null, "There should be a driver instantiated that's ready to be closed.");
        this.issuedLeaderSessionID = null;
        LeaderElectionDriver leaderElectionDriver = this.leaderElectionDriver;
        this.leaderElectionDriver = null;
        return leaderElectionDriver;
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        synchronized (this.lock) {
            Preconditions.checkState(this.leaderContenderRegistry.isEmpty(), "The DefaultLeaderElectionService should have been stopped before closing the instance.");
            Preconditions.checkState(this.leaderElectionDriver == null, "The driver should have been closed.");
            if (!this.running) {
                LOG.debug("The HA backend connection isn't established. No actions taken.");
                return;
            }
            this.running = false;
            List<Runnable> shutdownNow = this.leadershipOperationExecutor.shutdownNow();
            if (shutdownNow.isEmpty()) {
                return;
            }
            LOG.debug("The DefaultLeaderElectionService was closed with {} event(s) still not being processed. No further action necessary.", Integer.valueOf(shutdownNow.size()));
        }
    }

    @Override // org.apache.flink.runtime.leaderelection.DefaultLeaderElection.ParentService
    protected CompletableFuture<Void> confirmLeadershipAsync(String str, UUID uuid, String str2) {
        Preconditions.checkArgument(this.leaderContenderRegistry.containsKey(str));
        LOG.debug("The leader session for component '{}' is confirmed with session ID {} and address {}.", new Object[]{str, uuid, str2});
        Preconditions.checkNotNull(uuid);
        return CompletableFuture.runAsync(() -> {
            synchronized (this.lock) {
                if (hasLeadershipInternal(str, uuid)) {
                    Preconditions.checkState(this.leaderElectionDriver != null, "The leadership check should only return true if a driver is instantiated.");
                    Preconditions.checkState(!this.confirmedLeaderInformation.hasLeaderInformation(str), "No confirmation should have happened, yet.");
                    LeaderInformation known = LeaderInformation.known(uuid, str2);
                    this.confirmedLeaderInformation = LeaderInformationRegister.merge(this.confirmedLeaderInformation, str, known);
                    this.leaderElectionDriver.publishLeaderInformation(str, known);
                } else if (uuid.equals(this.issuedLeaderSessionID)) {
                    LOG.warn("The leader session ID {} for component '{}' was confirmed even though the corresponding service was not elected as the leader or has been stopped already.", str, uuid);
                } else {
                    LOG.debug("Received an old confirmation call of leader session ID {} for component '{}' (current issued session ID is {}).", new Object[]{uuid, str, this.issuedLeaderSessionID});
                }
            }
        }, this.leadershipOperationExecutor);
    }

    @Override // org.apache.flink.runtime.leaderelection.DefaultLeaderElection.ParentService
    protected CompletableFuture<Boolean> hasLeadershipAsync(String str, UUID uuid) {
        return CompletableFuture.supplyAsync(() -> {
            Boolean valueOf;
            synchronized (this.lock) {
                valueOf = Boolean.valueOf(hasLeadershipInternal(str, uuid));
            }
            return valueOf;
        }, this.leadershipOperationExecutor);
    }

    @GuardedBy("lock")
    private boolean hasLeadershipInternal(String str, UUID uuid) {
        if (this.leaderElectionDriver == null) {
            LOG.debug("hasLeadership is called after the service is closed, returning false.");
            return false;
        }
        if (this.leaderContenderRegistry.containsKey(str)) {
            return this.leaderElectionDriver.hasLeadership() && uuid.equals(this.issuedLeaderSessionID);
        }
        LOG.debug("hasLeadership is called for component '{}' while there is no contender registered under that ID in the service, returning false.", str);
        return false;
    }

    @VisibleForTesting
    @Nullable
    public UUID getLeaderSessionID(String str) {
        UUID leaderSessionID;
        synchronized (this.lock) {
            leaderSessionID = this.leaderContenderRegistry.containsKey(str) ? this.confirmedLeaderInformation.forComponentIdOrEmpty(str).getLeaderSessionID() : null;
        }
        return leaderSessionID;
    }

    @GuardedBy("lock")
    private void onGrantLeadershipInternal(UUID uuid) {
        Preconditions.checkNotNull(uuid);
        Preconditions.checkState(this.issuedLeaderSessionID == null, "The leadership should have been granted while not having the leadership acquired.");
        this.issuedLeaderSessionID = uuid;
        this.leaderContenderRegistry.keySet().forEach(str -> {
            notifyLeaderContenderOfLeadership(str, this.issuedLeaderSessionID);
        });
    }

    @GuardedBy("lock")
    private void notifyLeaderContenderOfLeadership(String str, UUID uuid) {
        if (!this.leaderContenderRegistry.containsKey(str)) {
            LOG.debug("The grant leadership notification for session ID {} is not forwarded because the DefaultLeaderElectionService ({}) has no contender registered.", uuid, this.leaderElectionDriver);
        } else {
            if (!uuid.equals(this.issuedLeaderSessionID)) {
                LOG.debug("An out-dated leadership-acquired event with session ID {} was triggered. The current leader session ID is {}. The event will be ignored.", uuid, this.issuedLeaderSessionID);
                return;
            }
            Preconditions.checkState(!this.confirmedLeaderInformation.hasLeaderInformation(str), "The leadership should have been granted while not having the leadership acquired.");
            LOG.debug("Granting leadership to the contender registered under component '{}' with session ID {}.", str, this.issuedLeaderSessionID);
            this.leaderContenderRegistry.get(str).grantLeadership(this.issuedLeaderSessionID);
        }
    }

    @GuardedBy("lock")
    private void onRevokeLeadershipInternal() {
        Preconditions.checkState(this.issuedLeaderSessionID != null, "The leadership should have been revoked while having the leadership acquired.");
        if (this.leaderContenderRegistry.isEmpty()) {
            LOG.debug("The revoke leadership notification for session {} is not forwarded because the DefaultLeaderElectionService({}) has no contender registered.", this.issuedLeaderSessionID, this.leaderElectionDriver);
        } else {
            this.leaderContenderRegistry.forEach(this::notifyLeaderContenderOfLeadershipLoss);
        }
        this.issuedLeaderSessionID = null;
    }

    @GuardedBy("lock")
    private void notifyLeaderContenderOfLeadershipLoss(String str, LeaderContender leaderContender) {
        Preconditions.checkState(leaderContender != null, "The LeaderContender should be always set when calling this method.");
        if (this.confirmedLeaderInformation.hasLeaderInformation(str)) {
            LOG.debug("Revoking leadership to component '{}' for previously confirmed leader information {}.", str, LeaderElectionUtils.convertToString(this.confirmedLeaderInformation.forComponentIdOrEmpty(str)));
        } else {
            LOG.debug("Revoking leadership for component '{}' while a previous leadership grant wasn't confirmed, yet.", str);
        }
        this.confirmedLeaderInformation = LeaderInformationRegister.clear(this.confirmedLeaderInformation, str);
        leaderContender.revokeLeadership();
    }

    @GuardedBy("lock")
    private void notifyLeaderInformationChangeInternal(String str, LeaderInformation leaderInformation, LeaderInformation leaderInformation2) {
        if (this.leaderElectionDriver == null) {
            LOG.debug("The LeaderElectionDriver was disconnected. Any incoming events will be ignored.");
            return;
        }
        if (leaderInformation2.equals(leaderInformation)) {
            LOG.trace("LeaderInformation change event received but changed LeaderInformation actually matches the locally confirmed one: {}", leaderInformation2);
            return;
        }
        if (leaderInformation2.isEmpty()) {
            LOG.trace("Leader information changed while there's no confirmation available by the contender for component '{}', yet. Changed leader information {} will be reset.", str, LeaderElectionUtils.convertToString(leaderInformation));
        } else if (leaderInformation.isEmpty()) {
            LOG.debug("Re-writing leader information ({}) for component '{}' to overwrite the empty leader information in the external storage.", LeaderElectionUtils.convertToString(leaderInformation2), str);
        } else {
            LOG.debug("Correcting leader information for component '{}' (local: {}, external storage: {}).", new Object[]{str, LeaderElectionUtils.convertToString(leaderInformation2), LeaderElectionUtils.convertToString(leaderInformation)});
        }
        this.leaderElectionDriver.publishLeaderInformation(str, leaderInformation2);
    }

    private void runInLeaderEventThread(String str, Runnable runnable) {
        synchronized (this.lock) {
            if (this.running) {
                LOG.debug("'{}' event processing triggered.", str);
                FutureUtils.handleUncaughtException(CompletableFuture.runAsync(() -> {
                    synchronized (this.lock) {
                        if (!this.running) {
                            LOG.debug("Processing '{}' event omitted due to the service not being in running state, anymore.", str);
                        } else if (this.leaderElectionDriver == null) {
                            Preconditions.checkState(this.leaderContenderRegistry.isEmpty(), "All contenders should be deregistered when the driver is removed.");
                            LOG.debug("All contenders have been deregistered and the driver was shut down. Any incoming leadership event will be ignored.");
                        } else {
                            LOG.debug("Processing '{}' event.", str);
                            runnable.run();
                        }
                    }
                }, this.leadershipOperationExecutor), (thread, th) -> {
                    forwardErrorToLeaderContender(th);
                });
            } else {
                LOG.debug("'{}' event processing was triggered while the DefaultLeaderElectionService is closed. The event will be ignored.", str);
            }
        }
    }

    private void forwardErrorToLeaderContender(Throwable th) {
        synchronized (this.lock) {
            if (this.leaderContenderRegistry.isEmpty()) {
                this.fallbackErrorHandler.onFatalError(th);
            } else {
                this.leaderContenderRegistry.values().forEach(leaderContender -> {
                    if (th instanceof LeaderElectionException) {
                        leaderContender.handleError((LeaderElectionException) th);
                    } else {
                        leaderContender.handleError(new LeaderElectionException(th));
                    }
                });
            }
        }
    }

    @Override // org.apache.flink.runtime.leaderelection.LeaderElectionDriver.Listener
    public void onGrantLeadership(UUID uuid) {
        runInLeaderEventThread(LEADER_ACQUISITION_EVENT_LOG_NAME, () -> {
            onGrantLeadershipInternal(uuid);
        });
    }

    @Override // org.apache.flink.runtime.leaderelection.LeaderElectionDriver.Listener
    public void onRevokeLeadership() {
        runInLeaderEventThread(LEADER_REVOCATION_EVENT_LOG_NAME, this::onRevokeLeadershipInternal);
    }

    @Override // org.apache.flink.runtime.leaderelection.LeaderElectionDriver.Listener
    public void onLeaderInformationChange(String str, LeaderInformation leaderInformation) {
        synchronized (this.lock) {
            notifyLeaderInformationChangeInternal(str, leaderInformation, this.confirmedLeaderInformation.forComponentIdOrEmpty(str));
        }
    }

    @Override // org.apache.flink.runtime.leaderelection.LeaderElectionDriver.Listener
    public void onLeaderInformationChange(LeaderInformationRegister leaderInformationRegister) {
        synchronized (this.lock) {
            this.leaderContenderRegistry.forEach((str, leaderContender) -> {
                notifyLeaderInformationChangeInternal(str, leaderInformationRegister.forComponentId(str).orElse(LeaderInformation.empty()), this.confirmedLeaderInformation.forComponentIdOrEmpty(str));
            });
        }
    }

    @Override // org.apache.flink.runtime.leaderelection.LeaderElectionDriver.Listener
    public void onError(Throwable th) {
        forwardErrorToLeaderContender(th);
    }
}
