package alluxio.master.backup;

import alluxio.collections.ConcurrentHashSet;
import alluxio.conf.PropertyKey;
import alluxio.conf.ServerConfiguration;
import alluxio.exception.AlluxioException;
import alluxio.exception.BackupAbortedException;
import alluxio.exception.BackupDelegationException;
import alluxio.exception.BackupException;
import alluxio.grpc.BackupPRequest;
import alluxio.grpc.BackupState;
import alluxio.grpc.BackupStatusPRequest;
import alluxio.grpc.GrpcService;
import alluxio.grpc.ServiceType;
import alluxio.master.CoreMasterContext;
import alluxio.master.StateLockManager;
import alluxio.master.StateLockOptions;
import alluxio.master.transport.GrpcMessagingConnection;
import alluxio.master.transport.GrpcMessagingServiceClientHandler;
import alluxio.resource.LockResource;
import alluxio.security.authentication.ClientIpAddressInjector;
import alluxio.util.ConfigurationUtils;
import alluxio.util.network.NetworkAddressUtils;
import alluxio.wire.BackupStatus;
import io.grpc.ServerInterceptor;
import io.grpc.ServerInterceptors;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:alluxio/master/backup/BackupLeaderRole.class */
public class BackupLeaderRole extends AbstractBackupRole {
    private static final Logger LOG = LoggerFactory.getLogger(BackupLeaderRole.class);
    private final long mBackupAbandonTimeout;
    private StateLockManager mStateLockManager;
    private ScheduledFuture<?> mTimeoutBackupFuture;
    private Instant mLastHeartBeat;
    private Future<?> mLocalBackupFuture;
    private GrpcMessagingConnection mRemoteBackupConnection;
    private Set<GrpcMessagingConnection> mBackupWorkerConnections;
    private Map<GrpcMessagingConnection, String> mBackupWorkerHostNames;
    private Lock mBackupInitiateLock;

    public BackupLeaderRole(CoreMasterContext coreMasterContext) {
        super(coreMasterContext);
        this.mBackupWorkerConnections = new ConcurrentHashSet();
        this.mBackupWorkerHostNames = new ConcurrentHashMap();
        this.mBackupInitiateLock = new ReentrantLock(true);
        LOG.info("Creating backup-leader role.");
        this.mStateLockManager = coreMasterContext.getStateLockManager();
        this.mBackupAbandonTimeout = ServerConfiguration.getMs(PropertyKey.MASTER_BACKUP_ABANDON_TIMEOUT);
    }

    @Override // alluxio.master.backup.AbstractBackupRole, java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        if (this.mRoleClosed) {
            return;
        }
        LOG.info("Closing backup-leader role.");
        ArrayList arrayList = new ArrayList(this.mBackupWorkerConnections.size());
        Iterator<GrpcMessagingConnection> it = this.mBackupWorkerConnections.iterator();
        while (it.hasNext()) {
            arrayList.add(it.next().close());
        }
        try {
            CompletableFuture.allOf((CompletableFuture[]) arrayList.toArray(new CompletableFuture[0])).get();
        } catch (Exception e) {
            LOG.warn("Failed to close {} backup-worker connections. Error: {}", Integer.valueOf(arrayList.size()), e);
        }
        this.mBackupWorkerConnections.clear();
        this.mBackupWorkerHostNames.clear();
        if (this.mLocalBackupFuture != null && !this.mLocalBackupFuture.isDone()) {
            this.mLocalBackupFuture.cancel(true);
        }
        super.close();
    }

    @Override // alluxio.master.backup.BackupRole
    public Map<ServiceType, GrpcService> getRoleServices() {
        HashMap hashMap = new HashMap();
        hashMap.put(ServiceType.META_MASTER_BACKUP_MESSAGING_SERVICE, new GrpcService(ServerInterceptors.intercept(new GrpcMessagingServiceClientHandler(NetworkAddressUtils.getConnectAddress(NetworkAddressUtils.ServiceType.MASTER_RPC, ServerConfiguration.global()), grpcMessagingConnection -> {
            activateWorkerConnection(grpcMessagingConnection);
        }, this.mGrpcMessagingContext, this.mExecutorService, this.mCatalystRequestTimeout), new ServerInterceptor[]{new ClientIpAddressInjector()})).withCloseable(this));
        return hashMap;
    }

    @Override // alluxio.master.backup.BackupOps
    public BackupStatus backup(BackupPRequest backupPRequest, StateLockOptions stateLockOptions) throws AlluxioException {
        LockResource lockResource = new LockResource(this.mBackupInitiateLock);
        Throwable th = null;
        try {
            if (this.mBackupTracker.inProgress()) {
                throw new BackupException("Backup in progress");
            }
            boolean z = ServerConfiguration.getBoolean(PropertyKey.MASTER_BACKUP_DELEGATION_ENABLED) && ConfigurationUtils.isHaMode(ServerConfiguration.global());
            if (z && backupPRequest.getOptions().getBypassDelegation()) {
                LOG.info("Back-up delegation is suppressed by back-up client.");
                z = false;
            }
            if (z && this.mBackupWorkerHostNames.size() == 0) {
                if (!backupPRequest.getOptions().getAllowLeader()) {
                    throw new BackupDelegationException("No master found to delegate backup.");
                }
                LOG.info("Back-up delegation is deactivated for cluster with no followers.");
                z = false;
            }
            this.mBackupTracker.reset();
            this.mBackupTracker.updateState(BackupState.Initiating);
            this.mBackupTracker.updateHostname(NetworkAddressUtils.getLocalHostName((int) ServerConfiguration.global().getMs(PropertyKey.NETWORK_HOST_RESOLUTION_TIMEOUT_MS)));
            UUID backupId = this.mBackupTracker.getCurrentStatus().getBackupId();
            if (lockResource != null) {
                if (0 != 0) {
                    try {
                        lockResource.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    lockResource.close();
                }
            }
            if (!z) {
                scheduleLocalBackup(backupPRequest, stateLockOptions);
            } else if (!scheduleRemoteBackup(backupId, backupPRequest, stateLockOptions)) {
                LOG.error("Failed to schedule remote backup.");
                AlluxioException backupDelegationException = new BackupDelegationException("Failed to delegate the backup.");
                this.mBackupTracker.updateError(backupDelegationException);
                throw backupDelegationException;
            }
            if (backupPRequest.getOptions().getRunAsync()) {
                return new BackupStatus(backupId, BackupState.Initiating);
            }
            this.mBackupTracker.waitUntilFinished();
            return this.mBackupTracker.getStatus(backupId);
        } catch (Throwable th3) {
            if (lockResource != null) {
                if (0 != 0) {
                    try {
                        lockResource.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    lockResource.close();
                }
            }
            throw th3;
        }
    }

    @Override // alluxio.master.backup.BackupOps
    public BackupStatus getBackupStatus(BackupStatusPRequest backupStatusPRequest) throws AlluxioException {
        return this.mBackupTracker.getStatus(UUID.fromString(backupStatusPRequest.getBackupId()));
    }

    private void activateWorkerConnection(GrpcMessagingConnection grpcMessagingConnection) {
        LOG.info("Backup-leader connected with backup-worker: {}", grpcMessagingConnection);
        grpcMessagingConnection.handler(BackupHandshakeMessage.class, backupHandshakeMessage -> {
            backupHandshakeMessage.setConnection(grpcMessagingConnection);
            return handleHandshakeMessage(backupHandshakeMessage);
        });
        grpcMessagingConnection.handler(BackupHeartbeatMessage.class, this::handleHeartbeatMessage);
        grpcMessagingConnection.onException(th -> {
            LOG.warn(String.format("Backup-worker connection failed for %s.", grpcMessagingConnection), th);
        });
        grpcMessagingConnection.onClose(grpcMessagingConnection2 -> {
            LOG.info("Backup-worker connection closed for {}.", grpcMessagingConnection);
            this.mBackupWorkerConnections.remove(grpcMessagingConnection2);
            String remove = this.mBackupWorkerHostNames.remove(grpcMessagingConnection2);
            if (this.mBackupTracker.inProgress() && this.mRemoteBackupConnection != null && this.mRemoteBackupConnection.equals(grpcMessagingConnection2)) {
                LOG.warn("Abandoning current backup as backup-worker: {} is lost.", remove);
                this.mBackupTracker.updateError(new BackupAbortedException("Backup-worker is lost."));
                this.mRemoteBackupConnection = null;
            }
        });
        this.mBackupWorkerConnections.add(grpcMessagingConnection);
    }

    private void scheduleLocalBackup(BackupPRequest backupPRequest, StateLockOptions stateLockOptions) {
        LOG.info("Scheduling backup at the backup-leader.");
        this.mLocalBackupFuture = this.mExecutorService.submit(() -> {
            try {
                LockResource lockExclusive = this.mStateLockManager.lockExclusive(stateLockOptions);
                Throwable th = null;
                try {
                    try {
                        this.mBackupTracker.updateState(BackupState.Running);
                        this.mBackupTracker.updateBackupUri(takeBackup(backupPRequest, this.mBackupTracker.getEntryCounter()));
                        this.mBackupTracker.updateState(BackupState.Completed);
                        if (lockExclusive != null) {
                            if (0 != 0) {
                                try {
                                    lockExclusive.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                lockExclusive.close();
                            }
                        }
                    } catch (Throwable th3) {
                        th = th3;
                        throw th3;
                    }
                } finally {
                }
            } catch (Exception e) {
                LOG.error("Local backup failed at the backup-leader.", e);
                this.mBackupTracker.updateError(new BackupException(String.format("Local backup failed: %s", e.getMessage()), e));
            }
        });
    }

    private boolean scheduleRemoteBackup(UUID uuid, BackupPRequest backupPRequest, StateLockOptions stateLockOptions) {
        LOG.info("Scheduling backup at remote backup-worker.");
        for (Map.Entry<GrpcMessagingConnection, String> entry : this.mBackupWorkerHostNames.entrySet()) {
            try {
                LockResource lockExclusive = this.mStateLockManager.lockExclusive(stateLockOptions, () -> {
                    LOG.info("Suspending journals at backup-worker: {}", entry.getValue());
                    sendMessageBlocking((GrpcMessagingConnection) entry.getKey(), new BackupSuspendMessage());
                });
                Throwable th = null;
                try {
                    try {
                        Map currentSequenceNumbers = this.mJournalSystem.getCurrentSequenceNumbers();
                        if (lockExclusive != null) {
                            if (0 != 0) {
                                try {
                                    lockExclusive.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                lockExclusive.close();
                            }
                        }
                        BackupRequestMessage backupRequestMessage = new BackupRequestMessage(uuid, backupPRequest, currentSequenceNumbers);
                        LOG.info("Sending backup request to backup-worker: {}. Request message: {}", entry.getValue(), backupRequestMessage);
                        sendMessageBlocking(entry.getKey(), backupRequestMessage);
                        this.mRemoteBackupConnection = entry.getKey();
                        adjustAbandonTimeout(false);
                        LOG.info("Delegated the backup to backup-worker: {}", entry.getValue());
                        return true;
                    } finally {
                    }
                } catch (Throwable th3) {
                    th = th3;
                    throw th3;
                    break;
                }
            } catch (Exception e) {
                LOG.warn(String.format("Failed to delegate backup to a backup-worker: %s", entry.getValue()), e);
            }
        }
        return false;
    }

    private synchronized CompletableFuture<Void> handleHandshakeMessage(BackupHandshakeMessage backupHandshakeMessage) {
        LOG.info("Received handshake message:{}", backupHandshakeMessage);
        this.mBackupWorkerHostNames.put(backupHandshakeMessage.getConnection(), backupHandshakeMessage.getBackupWorkerHostname());
        return CompletableFuture.completedFuture(null);
    }

    private synchronized CompletableFuture<Void> handleHeartbeatMessage(BackupHeartbeatMessage backupHeartbeatMessage) {
        LOG.info("Received heartbeat message:{}", backupHeartbeatMessage);
        if (backupHeartbeatMessage.getBackupStatus() != null) {
            this.mBackupTracker.update(backupHeartbeatMessage.getBackupStatus());
            adjustAbandonTimeout(!this.mBackupTracker.inProgress());
        }
        return CompletableFuture.completedFuture(null);
    }

    private void adjustAbandonTimeout(boolean z) {
        this.mLastHeartBeat = Instant.now();
        if (this.mTimeoutBackupFuture != null) {
            this.mTimeoutBackupFuture.cancel(true);
        }
        if (z) {
            return;
        }
        this.mTimeoutBackupFuture = this.mTaskScheduler.schedule(() -> {
            if (Duration.between(this.mLastHeartBeat, Instant.now()).toMillis() >= this.mBackupAbandonTimeout) {
                LOG.error("Abandoning the backup after not hearing for {}ms.", Long.valueOf(this.mBackupAbandonTimeout));
                this.mBackupTracker.updateError(new BackupAbortedException("Backup timed out"));
            }
        }, this.mBackupAbandonTimeout, TimeUnit.MILLISECONDS);
    }
}
