/*
 * Decompiled with CFR 0.152.
 */
package alluxio.master.journal.raft;

import alluxio.conf.PropertyKey;
import alluxio.conf.ServerConfiguration;
import alluxio.exception.status.InvalidArgumentException;
import alluxio.grpc.DownloadSnapshotPRequest;
import alluxio.grpc.DownloadSnapshotPResponse;
import alluxio.grpc.SnapshotData;
import alluxio.grpc.UploadSnapshotPRequest;
import alluxio.grpc.UploadSnapshotPResponse;
import com.google.protobuf.UnsafeByteOperations;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.ClientResponseObserver;
import io.grpc.stub.StreamObserver;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.util.function.Function;
import org.apache.commons.io.IOUtils;
import org.apache.ratis.statemachine.SnapshotInfo;
import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SnapshotUploader<S, R>
implements StreamObserver<R>,
ClientResponseObserver<S, R> {
    private static final Logger LOG = LoggerFactory.getLogger(SnapshotUploader.class);
    private static final int SNAPSHOT_CHUNK_SIZE = (int)ServerConfiguration.getBytes(PropertyKey.MASTER_EMBEDDED_JOURNAL_SNAPSHOT_REPLICATION_CHUNK_SIZE);
    private final Function<SnapshotData, S> mDataMessageBuilder;
    private final Function<R, Long> mOffsetGetter;
    private final File mSnapshotFile;
    private final long mLength;
    private final SnapshotInfo mSnapshotInfo;
    private long mOffset = 0L;
    private StreamObserver<S> mStream;

    public static SnapshotUploader<DownloadSnapshotPResponse, DownloadSnapshotPRequest> forLeader(SimpleStateMachineStorage storage, SnapshotInfo snapshot, StreamObserver<DownloadSnapshotPResponse> stream) {
        return new SnapshotUploader<DownloadSnapshotPResponse, DownloadSnapshotPRequest>(storage, snapshot, stream, data -> DownloadSnapshotPResponse.getDefaultInstance().toBuilder().setData(data).build(), DownloadSnapshotPRequest::getOffsetReceived);
    }

    public static SnapshotUploader<UploadSnapshotPRequest, UploadSnapshotPResponse> forFollower(SimpleStateMachineStorage storage, SnapshotInfo snapshot) {
        return new SnapshotUploader<UploadSnapshotPRequest, UploadSnapshotPResponse>(storage, snapshot, null, data -> UploadSnapshotPRequest.getDefaultInstance().toBuilder().setData(data).build(), UploadSnapshotPResponse::getOffsetReceived);
    }

    private SnapshotUploader(SimpleStateMachineStorage storage, SnapshotInfo snapshot, StreamObserver<S> stream, Function<SnapshotData, S> buildFunc, Function<R, Long> offsetGetter) {
        this.mSnapshotInfo = snapshot;
        this.mDataMessageBuilder = buildFunc;
        this.mOffsetGetter = offsetGetter;
        this.mSnapshotFile = storage.getSnapshotFile(snapshot.getTerm(), snapshot.getIndex());
        this.mLength = this.mSnapshotFile.length();
        this.mStream = stream;
    }

    public void onNext(R value) {
        try {
            this.onNextInternal(value);
        }
        catch (Exception e) {
            LOG.error("Error occurred while sending snapshot", (Throwable)e);
            this.mStream.onError((Throwable)e);
        }
    }

    private void onNextInternal(R value) throws IOException {
        LOG.debug("Received request {}", value);
        if (this.mStream == null) {
            throw new IllegalStateException("No request stream assigned");
        }
        if (!this.mSnapshotFile.exists()) {
            throw new FileNotFoundException(String.format("Snapshot file %s does not exist", this.mSnapshotFile.getPath()));
        }
        long offsetReceived = this.mOffsetGetter.apply(value);
        if (this.mOffset != offsetReceived) {
            throw new InvalidArgumentException(String.format("Received mismatched offset: %d. Expect %d", offsetReceived, this.mOffset));
        }
        LOG.debug("Streaming data at {}", (Object)this.mOffset);
        try (FileInputStream is = new FileInputStream(this.mSnapshotFile);){
            ((InputStream)is).skip(this.mOffset);
            boolean eof = false;
            int chunkSize = SNAPSHOT_CHUNK_SIZE;
            long available = this.mLength - this.mOffset;
            if (available <= (long)SNAPSHOT_CHUNK_SIZE) {
                eof = true;
                chunkSize = (int)available;
            }
            byte[] buffer = new byte[chunkSize];
            IOUtils.readFully((InputStream)is, (byte[])buffer);
            LOG.debug("Read {} bytes from file {}", (Object)chunkSize, (Object)this.mSnapshotFile);
            this.mStream.onNext(this.mDataMessageBuilder.apply(SnapshotData.newBuilder().setOffset(this.mOffset).setEof(eof).setChunk(UnsafeByteOperations.unsafeWrap((byte[])buffer)).setSnapshotTerm(this.mSnapshotInfo.getTerm()).setSnapshotIndex(this.mSnapshotInfo.getIndex()).build()));
            this.mOffset += (long)chunkSize;
            LOG.debug("Uploaded total {} bytes of file {}", (Object)this.mOffset, (Object)this.mSnapshotFile);
        }
    }

    public void onError(Throwable t) {
        LOG.error("Error sending snapshot {} at {}", new Object[]{this.mSnapshotFile, this.mOffset, t});
    }

    public void onCompleted() {
        LOG.debug("Received onComplete");
        this.mStream.onCompleted();
    }

    public void beforeStart(ClientCallStreamObserver<S> requestStream) {
        this.mStream = requestStream;
    }
}

