/*
 * Decompiled with CFR 0.152.
 */
package alluxio.master.journal.raft;

import alluxio.ClientContext;
import alluxio.conf.AlluxioConfiguration;
import alluxio.conf.PropertyKey;
import alluxio.conf.ServerConfiguration;
import alluxio.exception.status.AbortedException;
import alluxio.exception.status.AlluxioStatusException;
import alluxio.exception.status.NotFoundException;
import alluxio.exception.status.UnavailableException;
import alluxio.grpc.DownloadSnapshotPRequest;
import alluxio.grpc.DownloadSnapshotPResponse;
import alluxio.grpc.GetSnapshotInfoRequest;
import alluxio.grpc.GetSnapshotInfoResponse;
import alluxio.grpc.GetSnapshotRequest;
import alluxio.grpc.JournalQueryRequest;
import alluxio.grpc.JournalQueryResponse;
import alluxio.grpc.QuorumServerState;
import alluxio.grpc.SnapshotData;
import alluxio.grpc.SnapshotMetadata;
import alluxio.grpc.UploadSnapshotPRequest;
import alluxio.grpc.UploadSnapshotPResponse;
import alluxio.master.MasterClientContext;
import alluxio.master.journal.raft.RaftJournalServiceClient;
import alluxio.master.journal.raft.RaftJournalSystem;
import alluxio.master.journal.raft.RaftJournalUtils;
import alluxio.master.journal.raft.SnapshotDownloader;
import alluxio.master.journal.raft.SnapshotUploader;
import alluxio.metrics.MetricKey;
import alluxio.metrics.MetricsSystem;
import alluxio.security.authentication.ClientIpAddressInjector;
import alluxio.util.CommonUtils;
import alluxio.util.LogUtils;
import com.codahale.metrics.Timer;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.MessageLite;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.ratis.io.MD5Hash;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.storage.FileInfo;
import org.apache.ratis.statemachine.SnapshotInfo;
import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
import org.apache.ratis.statemachine.impl.SingleFileSnapshotInfo;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.thirdparty.com.google.protobuf.UnsafeByteOperations;
import org.apache.ratis.util.MD5FileUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SnapshotReplicationManager {
    private static final Logger LOG = LoggerFactory.getLogger(SnapshotReplicationManager.class);
    private static final long SNAPSHOT_REQUEST_TIMEOUT_MS = ServerConfiguration.getMs(PropertyKey.MASTER_EMBEDDED_JOURNAL_TRANSPORT_REQUEST_TIMEOUT_MS);
    private final SimpleStateMachineStorage mStorage;
    private final RaftJournalSystem mJournalSystem;
    private volatile long mSnapshotRequestTime = 0L;
    private volatile RaftJournalServiceClient mJournalServiceClient;
    private volatile SnapshotInfo mDownloadedSnapshot;
    private final AtomicReference<DownloadState> mDownloadState = new AtomicReference<DownloadState>(DownloadState.IDLE);

    public SnapshotReplicationManager(RaftJournalSystem journalSystem, SimpleStateMachineStorage storage) {
        this.mStorage = storage;
        this.mJournalSystem = journalSystem;
    }

    @VisibleForTesting
    SnapshotReplicationManager(RaftJournalSystem journalSystem, SimpleStateMachineStorage storage, RaftJournalServiceClient client) {
        this(journalSystem, storage);
        this.mJournalServiceClient = client;
    }

    public CompletableFuture<TermIndex> installSnapshotFromLeader() {
        if (this.mJournalSystem.isLeader()) {
            return RaftJournalUtils.completeExceptionally(new IllegalStateException("Abort snapshot installation after becoming a leader"));
        }
        if (!this.transitionState(DownloadState.IDLE, DownloadState.STREAM_DATA)) {
            return RaftJournalUtils.completeExceptionally(new IllegalStateException("State is not IDLE when starting a snapshot installation"));
        }
        try {
            RaftJournalServiceClient client = this.getJournalServiceClient();
            String address = String.valueOf(client.getAddress());
            SnapshotDownloader<DownloadSnapshotPRequest, DownloadSnapshotPResponse> observer = SnapshotDownloader.forFollower(this.mStorage, address);
            Timer.Context ctx = MetricsSystem.timer((String)MetricKey.MASTER_EMBEDDED_JOURNAL_SNAPSHOT_DOWNLOAD_TIMER.getName()).time();
            client.downloadSnapshot((StreamObserver<DownloadSnapshotPResponse>)observer);
            return ((CompletableFuture)observer.getFuture().thenApplyAsync(termIndex -> {
                ctx.close();
                this.mDownloadedSnapshot = observer.getSnapshotToInstall();
                this.transitionState(DownloadState.STREAM_DATA, DownloadState.DOWNLOADED);
                long index = this.installDownloadedSnapshot();
                if (index == -1L) {
                    throw new CompletionException(new RuntimeException(String.format("Failed to install the downloaded snapshot %s", termIndex)));
                }
                if (index != termIndex.getIndex()) {
                    throw new CompletionException(new IllegalStateException(String.format("Mismatched snapshot installed - downloaded %d, installed %d", termIndex.getIndex(), index)));
                }
                return termIndex;
            })).whenComplete((termIndex, throwable) -> {
                if (throwable != null) {
                    LOG.error("Unexpected exception downloading snapshot from leader {}.", (Object)address, throwable);
                    this.transitionState(DownloadState.STREAM_DATA, DownloadState.IDLE);
                }
            });
        }
        catch (Exception e) {
            this.transitionState(DownloadState.STREAM_DATA, DownloadState.IDLE);
            return RaftJournalUtils.completeExceptionally(e);
        }
    }

    public void sendSnapshotToLeader() throws IOException {
        if (this.mJournalSystem.isLeader()) {
            throw new IllegalStateException("Server is no longer a follower");
        }
        LOG.debug("Checking latest snapshot to send");
        SingleFileSnapshotInfo snapshot = this.mStorage.getLatestSnapshot();
        if (snapshot == null) {
            throw new NotFoundException("No snapshot available");
        }
        SnapshotUploader<UploadSnapshotPRequest, UploadSnapshotPResponse> responseObserver = SnapshotUploader.forFollower(this.mStorage, (SnapshotInfo)snapshot);
        RaftJournalServiceClient client = this.getJournalServiceClient();
        LOG.info("Sending stream request to {} for snapshot {}", (Object)client.getAddress(), (Object)snapshot.getTermIndex());
        StreamObserver<UploadSnapshotPRequest> requestObserver = this.getJournalServiceClient().uploadSnapshot(responseObserver);
        requestObserver.onNext((Object)UploadSnapshotPRequest.newBuilder().setData(SnapshotData.newBuilder().setSnapshotTerm(snapshot.getTerm()).setSnapshotIndex(snapshot.getIndex()).setOffset(0L)).build());
    }

    public long maybeCopySnapshotFromFollower() {
        if (this.mDownloadState.get() == DownloadState.DOWNLOADED) {
            return this.installDownloadedSnapshot();
        }
        if (this.mDownloadState.get() == DownloadState.REQUEST_DATA) {
            this.checkRequestTimeout();
        }
        if (this.mDownloadState.get() == DownloadState.IDLE) {
            CompletableFuture.runAsync(this::requestSnapshotFromFollowers);
        }
        return -1L;
    }

    public StreamObserver<UploadSnapshotPRequest> receiveSnapshotFromFollower(StreamObserver<UploadSnapshotPResponse> responseStreamObserver) {
        String followerIp = ClientIpAddressInjector.getIpAddress();
        LOG.info("Received upload snapshot request from follower {}", (Object)followerIp);
        SnapshotDownloader<UploadSnapshotPResponse, UploadSnapshotPRequest> observer = SnapshotDownloader.forLeader(this.mStorage, responseStreamObserver, followerIp);
        if (!this.transitionState(DownloadState.REQUEST_DATA, DownloadState.STREAM_DATA)) {
            responseStreamObserver.onCompleted();
            return observer;
        }
        ((CompletableFuture)observer.getFuture().thenApply(termIndex -> {
            this.mDownloadedSnapshot = observer.getSnapshotToInstall();
            this.transitionState(DownloadState.STREAM_DATA, DownloadState.DOWNLOADED);
            return termIndex;
        })).exceptionally(e -> {
            LOG.error("Unexpected exception downloading snapshot from follower {}.", (Object)followerIp, e);
            this.transitionState(DownloadState.STREAM_DATA, DownloadState.IDLE);
            return null;
        });
        return observer;
    }

    public Message handleRequest(JournalQueryRequest queryRequest) throws IOException {
        if (queryRequest.hasSnapshotInfoRequest()) {
            SingleFileSnapshotInfo latestSnapshot = this.mStorage.getLatestSnapshot();
            if (latestSnapshot == null) {
                LOG.debug("No snapshot to send");
                return SnapshotReplicationManager.toMessage((MessageLite)GetSnapshotInfoResponse.getDefaultInstance());
            }
            JournalQueryResponse response = JournalQueryResponse.newBuilder().setSnapshotInfoResponse(GetSnapshotInfoResponse.newBuilder().setLatest(this.toSnapshotMetadata(latestSnapshot.getTermIndex()))).build();
            LOG.debug("Sent snapshot info response {}", (Object)response);
            return SnapshotReplicationManager.toMessage((MessageLite)response);
        }
        if (queryRequest.hasSnapshotRequest()) {
            LOG.debug("Start sending snapshot to leader");
            this.sendSnapshotToLeader();
            return Message.EMPTY;
        }
        return null;
    }

    public StreamObserver<DownloadSnapshotPRequest> sendSnapshotToFollower(StreamObserver<DownloadSnapshotPResponse> responseObserver) {
        SingleFileSnapshotInfo snapshot = this.mStorage.getLatestSnapshot();
        LOG.debug("Received snapshot download request from {}", (Object)ClientIpAddressInjector.getIpAddress());
        SnapshotUploader<DownloadSnapshotPResponse, DownloadSnapshotPRequest> requestStreamObserver = SnapshotUploader.forLeader(this.mStorage, (SnapshotInfo)snapshot, responseObserver);
        if (snapshot == null) {
            responseObserver.onError((Throwable)Status.NOT_FOUND.withDescription("Cannot find a valid snapshot to download.").asException());
            return requestStreamObserver;
        }
        responseObserver.onNext((Object)DownloadSnapshotPResponse.newBuilder().setData(SnapshotData.newBuilder().setSnapshotTerm(snapshot.getTerm()).setSnapshotIndex(snapshot.getIndex()).setOffset(0L)).build());
        return requestStreamObserver;
    }

    private static Message toMessage(MessageLite value) {
        return Message.valueOf((ByteString)UnsafeByteOperations.unsafeWrap((ByteBuffer)value.toByteString().asReadOnlyByteBuffer()));
    }

    private SnapshotMetadata toSnapshotMetadata(TermIndex value) {
        return value == null ? null : SnapshotMetadata.newBuilder().setSnapshotTerm(value.getTerm()).setSnapshotIndex(value.getIndex()).build();
    }

    private boolean transitionState(DownloadState expected, DownloadState update) {
        if (!this.mDownloadState.compareAndSet(expected, update)) {
            LOG.warn("Failed to transition from {} to {}: current state is {}", new Object[]{expected, update, this.mDownloadState.get()});
            return false;
        }
        return true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Loose catch block
     */
    private long installDownloadedSnapshot() {
        if (!this.transitionState(DownloadState.DOWNLOADED, DownloadState.INSTALLING)) {
            return -1L;
        }
        File tempFile = null;
        try {
            long l;
            Throwable throwable;
            Timer.Context ctx;
            block23: {
                block24: {
                    ctx = MetricsSystem.timer((String)MetricKey.MASTER_EMBEDDED_JOURNAL_SNAPSHOT_INSTALL_TIMER.getName()).time();
                    throwable = null;
                    SnapshotInfo snapshot = this.mDownloadedSnapshot;
                    if (snapshot == null) {
                        throw new IllegalStateException("Snapshot is not completed");
                    }
                    FileInfo fileInfo = (FileInfo)snapshot.getFiles().get(0);
                    tempFile = fileInfo.getPath().toFile();
                    if (!tempFile.exists()) {
                        throw new FileNotFoundException(String.format("Snapshot file %s is not found", tempFile));
                    }
                    SingleFileSnapshotInfo latestSnapshot = this.mStorage.getLatestSnapshot();
                    TermIndex lastInstalled = latestSnapshot == null ? null : latestSnapshot.getTermIndex();
                    TermIndex downloaded = snapshot.getTermIndex();
                    if (lastInstalled != null && downloaded.compareTo(lastInstalled) < 0) {
                        throw new AbortedException(String.format("Snapshot to be installed %s is older than current snapshot %s", downloaded, lastInstalled));
                    }
                    File snapshotFile = this.mStorage.getSnapshotFile(downloaded.getTerm(), downloaded.getIndex());
                    LOG.debug("Moving temp snapshot {} to file {}", (Object)tempFile, (Object)snapshotFile);
                    MD5FileUtil.saveMD5File((File)snapshotFile, (MD5Hash)fileInfo.getFileDigest());
                    if (!tempFile.renameTo(snapshotFile)) {
                        throw new IOException(String.format("Failed to rename %s to %s", tempFile, snapshotFile));
                    }
                    this.mStorage.loadLatestSnapshot();
                    LOG.info("Completed storing snapshot at {} to file {}", (Object)downloaded, (Object)snapshotFile);
                    l = downloaded.getIndex();
                    if (ctx == null) break block23;
                    if (throwable == null) break block24;
                    try {
                        ctx.close();
                    }
                    catch (Throwable throwable2) {
                        throwable.addSuppressed(throwable2);
                    }
                    break block23;
                }
                ctx.close();
            }
            return l;
            catch (Throwable throwable3) {
                try {
                    try {
                        throwable = throwable3;
                        throw throwable3;
                    }
                    catch (Throwable throwable4) {
                        if (ctx != null) {
                            if (throwable != null) {
                                try {
                                    ctx.close();
                                }
                                catch (Throwable throwable5) {
                                    throwable.addSuppressed(throwable5);
                                }
                            } else {
                                ctx.close();
                            }
                        }
                        throw throwable4;
                    }
                }
                catch (Exception e) {
                    LOG.error("Failed to install snapshot", (Throwable)e);
                    if (tempFile != null) {
                        tempFile.delete();
                    }
                    long l2 = -1L;
                    return l2;
                }
            }
        }
        finally {
            this.transitionState(DownloadState.INSTALLING, DownloadState.IDLE);
        }
    }

    private void requestSnapshotFromFollowers() {
        if (!this.transitionState(DownloadState.IDLE, DownloadState.REQUEST_INFO)) {
            return;
        }
        RaftPeerId snapshotOwner = null;
        try {
            SingleFileSnapshotInfo latestSnapshot = this.mStorage.getLatestSnapshot();
            SnapshotMetadata snapshotMetadata = latestSnapshot == null ? null : SnapshotMetadata.newBuilder().setSnapshotTerm(latestSnapshot.getTerm()).setSnapshotIndex(latestSnapshot.getIndex()).build();
            Map jobs = this.mJournalSystem.getQuorumServerInfoList().stream().filter(server -> server.getServerState() == QuorumServerState.AVAILABLE).map(server -> RaftJournalUtils.getPeerId(server.getServerAddress().getHost(), server.getServerAddress().getRpcPort())).filter(peerId -> !peerId.equals((Object)this.mJournalSystem.getLocalPeerId())).collect(Collectors.toMap(Function.identity(), peerId -> this.mJournalSystem.sendMessageAsync((RaftPeerId)peerId, SnapshotReplicationManager.toMessage((MessageLite)JournalQueryRequest.newBuilder().setSnapshotInfoRequest(GetSnapshotInfoRequest.getDefaultInstance()).build()))));
            for (Map.Entry job : jobs.entrySet()) {
                JournalQueryResponse response;
                RaftClientReply reply;
                RaftPeerId peerId2 = (RaftPeerId)job.getKey();
                try {
                    reply = (RaftClientReply)job.getValue().get();
                }
                catch (Exception e) {
                    LOG.warn("Exception thrown while requesting snapshot info {}", (Object)e.toString());
                    continue;
                }
                if (reply.getException() != null) {
                    LOG.warn("Received exception requesting snapshot info {}", (Object)reply.getException().getMessage());
                    continue;
                }
                try {
                    response = JournalQueryResponse.parseFrom((ByteBuffer)reply.getMessage().getContent().asReadOnlyByteBuffer());
                }
                catch (InvalidProtocolBufferException e) {
                    LOG.warn("Failed to parse response {}", (Object)e.toString());
                    continue;
                }
                LOG.debug("Received snapshot info from follower {} - {}", (Object)peerId2, (Object)response);
                if (!response.hasSnapshotInfoResponse()) {
                    LOG.warn("Invalid response for GetSnapshotInfoRequest {}", (Object)response);
                    continue;
                }
                SnapshotMetadata latest = response.getSnapshotInfoResponse().getLatest();
                if (latest == null) {
                    LOG.debug("Follower {} does not have a snapshot", (Object)peerId2);
                    continue;
                }
                if (snapshotMetadata != null && (latest.getSnapshotTerm() < snapshotMetadata.getSnapshotTerm() || latest.getSnapshotIndex() <= snapshotMetadata.getSnapshotIndex())) continue;
                snapshotMetadata = latest;
                snapshotOwner = peerId2;
            }
            if (snapshotOwner == null) {
                throw new UnavailableException("No recent snapshot found from followers");
            }
        }
        catch (Exception e) {
            LogUtils.warnWithException((Logger)LOG, (String)"Failed to request snapshot info from followers", (Object[])new Object[]{e});
            this.transitionState(DownloadState.REQUEST_INFO, DownloadState.IDLE);
            return;
        }
        LOG.info("Request snapshot data from follower {}", snapshotOwner);
        this.mSnapshotRequestTime = CommonUtils.getCurrentMs();
        this.transitionState(DownloadState.REQUEST_INFO, DownloadState.REQUEST_DATA);
        try {
            RaftClientReply reply = this.mJournalSystem.sendMessageAsync(snapshotOwner, SnapshotReplicationManager.toMessage((MessageLite)JournalQueryRequest.newBuilder().setSnapshotRequest(GetSnapshotRequest.getDefaultInstance()).build())).get();
            if (reply.getException() != null) {
                throw reply.getException();
            }
        }
        catch (Exception e) {
            LOG.error("Failed to request snapshot data from {}", snapshotOwner, (Object)e);
            this.transitionState(DownloadState.REQUEST_DATA, DownloadState.IDLE);
        }
    }

    private void checkRequestTimeout() {
        if (CommonUtils.getCurrentMs() - this.mSnapshotRequestTime > SNAPSHOT_REQUEST_TIMEOUT_MS) {
            this.transitionState(DownloadState.REQUEST_DATA, DownloadState.IDLE);
        }
    }

    private synchronized RaftJournalServiceClient getJournalServiceClient() throws AlluxioStatusException {
        if (this.mJournalServiceClient == null) {
            this.mJournalServiceClient = new RaftJournalServiceClient(MasterClientContext.newBuilder((ClientContext)ClientContext.create((AlluxioConfiguration)ServerConfiguration.global())).build());
        }
        this.mJournalServiceClient.connect();
        return this.mJournalServiceClient;
    }

    public synchronized void close() {
        if (this.mJournalServiceClient != null) {
            this.mJournalServiceClient.close();
            this.mJournalServiceClient = null;
        }
    }

    private static enum DownloadState {
        IDLE,
        REQUEST_INFO,
        REQUEST_DATA,
        STREAM_DATA,
        DOWNLOADED,
        INSTALLING;

    }
}

