/*
 * Decompiled with CFR 0.152.
 */
package alluxio.master.journal.raft;

import alluxio.grpc.DownloadSnapshotPRequest;
import alluxio.grpc.DownloadSnapshotPResponse;
import alluxio.grpc.SnapshotData;
import alluxio.grpc.UploadSnapshotPRequest;
import alluxio.grpc.UploadSnapshotPResponse;
import alluxio.master.journal.raft.RaftJournalUtils;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.ClientResponseObserver;
import io.grpc.stub.StreamObserver;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import org.apache.ratis.io.MD5Hash;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.storage.FileInfo;
import org.apache.ratis.statemachine.SnapshotInfo;
import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
import org.apache.ratis.statemachine.impl.SingleFileSnapshotInfo;
import org.apache.ratis.util.MD5FileUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SnapshotDownloader<S, R>
implements ClientResponseObserver<S, R> {
    private static final Logger LOG = LoggerFactory.getLogger(SnapshotDownloader.class);
    private final SimpleStateMachineStorage mStorage;
    private final CompletableFuture<TermIndex> mFuture = new CompletableFuture();
    private final Function<Long, S> mMessageBuilder;
    private final Function<R, SnapshotData> mDataGetter;
    private final String mSource;
    private TermIndex mTermIndex;
    private File mTempFile;
    private FileOutputStream mOutputStream;
    private long mBytesWritten = 0L;
    private StreamObserver<S> mStream;
    private SnapshotInfo mSnapshotToInstall;

    public static SnapshotDownloader<UploadSnapshotPResponse, UploadSnapshotPRequest> forLeader(SimpleStateMachineStorage storage, StreamObserver<UploadSnapshotPResponse> stream, String source) {
        return new SnapshotDownloader<UploadSnapshotPResponse, UploadSnapshotPRequest>(storage, offset -> UploadSnapshotPResponse.newBuilder().setOffsetReceived(offset.longValue()).build(), UploadSnapshotPRequest::getData, stream, source);
    }

    public static SnapshotDownloader<DownloadSnapshotPRequest, DownloadSnapshotPResponse> forFollower(SimpleStateMachineStorage storage, String source) {
        return new SnapshotDownloader<DownloadSnapshotPRequest, DownloadSnapshotPResponse>(storage, offset -> DownloadSnapshotPRequest.newBuilder().setOffsetReceived(offset.longValue()).build(), DownloadSnapshotPResponse::getData, null, source);
    }

    private SnapshotDownloader(SimpleStateMachineStorage storage, Function<Long, S> messageBuilder, Function<R, SnapshotData> dataGetter, StreamObserver<S> stream, String source) {
        this.mStorage = storage;
        this.mMessageBuilder = messageBuilder;
        this.mDataGetter = dataGetter;
        this.mStream = stream;
        this.mSource = source;
    }

    public void onNext(R response) {
        try {
            this.onNextInternal(response);
        }
        catch (Exception e) {
            this.mStream.onError((Throwable)e);
            this.mFuture.completeExceptionally(e);
            this.cleanup();
        }
    }

    private void cleanup() {
        if (this.mOutputStream != null) {
            try {
                this.mOutputStream.close();
            }
            catch (IOException ioException) {
                LOG.error("Error closing snapshot file {}", (Object)this.mTempFile, (Object)ioException);
            }
        }
        if (this.mTempFile != null && !this.mTempFile.delete()) {
            LOG.error("Error deleting snapshot file {}", (Object)this.mTempFile.getPath());
        }
    }

    private void onNextInternal(R response) throws IOException {
        TermIndex termIndex = TermIndex.valueOf((long)this.mDataGetter.apply(response).getSnapshotTerm(), (long)this.mDataGetter.apply(response).getSnapshotIndex());
        if (this.mTermIndex == null) {
            LOG.info("Downloading new snapshot {} from {}", (Object)termIndex, (Object)this.mSource);
            this.mTermIndex = termIndex;
            this.mTempFile = RaftJournalUtils.createTempSnapshotFile(this.mStorage);
            this.mTempFile.deleteOnExit();
            this.mStream.onNext(this.mMessageBuilder.apply(0L));
        } else {
            long position;
            if (!termIndex.equals(this.mTermIndex)) {
                throw new IOException(String.format("Mismatched term index when downloading the snapshot. expected: %s actual: %s", this.mTermIndex, termIndex));
            }
            if (!this.mDataGetter.apply(response).hasChunk()) {
                throw new IOException(String.format("A chunk for file %s is missing from the response %s.", this.mTempFile, response));
            }
            if (this.mOutputStream == null) {
                LOG.info("Start writing to temporary file {}", (Object)this.mTempFile.getPath());
                this.mOutputStream = new FileOutputStream(this.mTempFile);
            }
            if ((position = this.mOutputStream.getChannel().position()) != this.mDataGetter.apply(response).getOffset()) {
                throw new IOException(String.format("Mismatched offset in file %d, expect %d, bytes written %d", position, this.mDataGetter.apply(response).getOffset(), this.mBytesWritten));
            }
            this.mOutputStream.write(this.mDataGetter.apply(response).getChunk().toByteArray());
            this.mBytesWritten += (long)this.mDataGetter.apply(response).getChunk().size();
            LOG.debug("Written {} bytes to snapshot file {}", (Object)this.mBytesWritten, (Object)this.mTempFile.getPath());
            if (this.mDataGetter.apply(response).getEof()) {
                LOG.debug("Completed writing to temporary file {} with size {}", (Object)this.mTempFile.getPath(), (Object)this.mOutputStream.getChannel().position());
                this.mOutputStream.close();
                this.mOutputStream = null;
                MD5Hash digest = MD5FileUtil.computeMd5ForFile((File)this.mTempFile);
                this.mSnapshotToInstall = new SingleFileSnapshotInfo(new FileInfo(this.mTempFile.toPath(), digest), this.mTermIndex.getTerm(), this.mTermIndex.getIndex());
                this.mFuture.complete(this.mTermIndex);
                LOG.info("Finished copying snapshot to local file {}.", (Object)this.mTempFile);
                this.mStream.onCompleted();
            } else {
                this.mStream.onNext(this.mMessageBuilder.apply(this.mBytesWritten));
            }
        }
    }

    public void onError(Throwable t) {
        this.mFuture.completeExceptionally(t);
        this.cleanup();
    }

    public void onCompleted() {
        if (this.mOutputStream != null) {
            this.mFuture.completeExceptionally(new IllegalStateException("Request completed with unfinished upload"));
            this.cleanup();
        }
    }

    public void beforeStart(ClientCallStreamObserver<S> requestStream) {
        this.mStream = requestStream;
    }

    public CompletableFuture<TermIndex> getFuture() {
        return this.mFuture;
    }

    public SnapshotInfo getSnapshotToInstall() {
        return this.mSnapshotToInstall;
    }
}

