package alluxio.master.journal.raft;

import alluxio.conf.PropertyKey;
import alluxio.conf.ServerConfiguration;
import alluxio.exception.status.InvalidArgumentException;
import alluxio.grpc.DownloadSnapshotPRequest;
import alluxio.grpc.DownloadSnapshotPResponse;
import alluxio.grpc.SnapshotData;
import alluxio.grpc.UploadSnapshotPRequest;
import alluxio.grpc.UploadSnapshotPResponse;
import com.google.protobuf.UnsafeByteOperations;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.ClientResponseObserver;
import io.grpc.stub.StreamObserver;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import org.apache.commons.io.IOUtils;
import org.apache.ratis.statemachine.SnapshotInfo;
import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:alluxio/master/journal/raft/SnapshotUploader.class */
public class SnapshotUploader<S, R> implements StreamObserver<R>, ClientResponseObserver<S, R> {
    private static final Logger LOG = LoggerFactory.getLogger(SnapshotUploader.class);
    private static final int SNAPSHOT_CHUNK_SIZE = (int) ServerConfiguration.getBytes(PropertyKey.MASTER_EMBEDDED_JOURNAL_SNAPSHOT_REPLICATION_CHUNK_SIZE);
    private final Function<SnapshotData, S> mDataMessageBuilder;
    private final Function<R, Long> mOffsetGetter;
    private final File mSnapshotFile;
    private final long mLength;
    private final SnapshotInfo mSnapshotInfo;
    private StreamObserver<S> mStream;
    private long mOffset = 0;
    private final CompletableFuture<SnapshotInfo> mCompletionFuture = new CompletableFuture<>();

    public static SnapshotUploader<DownloadSnapshotPResponse, DownloadSnapshotPRequest> forLeader(SimpleStateMachineStorage simpleStateMachineStorage, SnapshotInfo snapshotInfo, StreamObserver<DownloadSnapshotPResponse> streamObserver) {
        return new SnapshotUploader<>(simpleStateMachineStorage, snapshotInfo, streamObserver, snapshotData -> {
            return DownloadSnapshotPResponse.getDefaultInstance().toBuilder().setData(snapshotData).build();
        }, (v0) -> {
            return v0.getOffsetReceived();
        });
    }

    public static SnapshotUploader<UploadSnapshotPRequest, UploadSnapshotPResponse> forFollower(SimpleStateMachineStorage simpleStateMachineStorage, SnapshotInfo snapshotInfo) {
        return new SnapshotUploader<>(simpleStateMachineStorage, snapshotInfo, null, snapshotData -> {
            return UploadSnapshotPRequest.getDefaultInstance().toBuilder().setData(snapshotData).build();
        }, (v0) -> {
            return v0.getOffsetReceived();
        });
    }

    private SnapshotUploader(SimpleStateMachineStorage simpleStateMachineStorage, SnapshotInfo snapshotInfo, StreamObserver<S> streamObserver, Function<SnapshotData, S> function, Function<R, Long> function2) {
        this.mSnapshotInfo = snapshotInfo;
        this.mDataMessageBuilder = function;
        this.mOffsetGetter = function2;
        this.mSnapshotFile = simpleStateMachineStorage.getSnapshotFile(snapshotInfo.getTerm(), snapshotInfo.getIndex());
        this.mLength = this.mSnapshotFile.length();
        this.mStream = streamObserver;
    }

    public void onNext(R r) {
        try {
            onNextInternal(r);
        } catch (Exception e) {
            LOG.error("Error occurred while sending snapshot", e);
            this.mStream.onError(e);
        }
    }

    private void onNextInternal(R r) throws IOException {
        LOG.debug("Received request {}", r);
        if (this.mStream == null) {
            throw new IllegalStateException("No request stream assigned");
        }
        if (!this.mSnapshotFile.exists()) {
            throw new FileNotFoundException(String.format("Snapshot file %s does not exist", this.mSnapshotFile.getPath()));
        }
        long longValue = this.mOffsetGetter.apply(r).longValue();
        if (this.mOffset != longValue) {
            throw new InvalidArgumentException(String.format("Received mismatched offset: %d. Expect %d", Long.valueOf(longValue), Long.valueOf(this.mOffset)));
        }
        LOG.debug("Streaming data at {}", Long.valueOf(this.mOffset));
        FileInputStream fileInputStream = new FileInputStream(this.mSnapshotFile);
        Throwable th = null;
        try {
            fileInputStream.skip(this.mOffset);
            boolean z = false;
            int i = SNAPSHOT_CHUNK_SIZE;
            long j = this.mLength - this.mOffset;
            if (j <= SNAPSHOT_CHUNK_SIZE) {
                z = true;
                i = (int) j;
            }
            byte[] bArr = new byte[i];
            IOUtils.readFully(fileInputStream, bArr);
            LOG.debug("Read {} bytes from file {}", Integer.valueOf(i), this.mSnapshotFile);
            this.mStream.onNext(this.mDataMessageBuilder.apply(SnapshotData.newBuilder().setOffset(this.mOffset).setEof(z).setChunk(UnsafeByteOperations.unsafeWrap(bArr)).setSnapshotTerm(this.mSnapshotInfo.getTerm()).setSnapshotIndex(this.mSnapshotInfo.getIndex()).build()));
            this.mOffset += i;
            LOG.debug("Uploaded total {} bytes of file {}", Long.valueOf(this.mOffset), this.mSnapshotFile);
            if (fileInputStream != null) {
                if (0 == 0) {
                    fileInputStream.close();
                    return;
                }
                try {
                    fileInputStream.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (fileInputStream != null) {
                if (0 != 0) {
                    try {
                        fileInputStream.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    fileInputStream.close();
                }
            }
            throw th3;
        }
    }

    public void onError(Throwable th) {
        LOG.error("Error sending snapshot {} at {}", new Object[]{this.mSnapshotFile, Long.valueOf(this.mOffset), th});
        this.mStream.onError(th);
        this.mCompletionFuture.completeExceptionally(th);
    }

    public void onCompleted() {
        LOG.debug("Received onComplete for {}", this.mSnapshotInfo);
        this.mStream.onCompleted();
        this.mCompletionFuture.complete(this.mSnapshotInfo);
    }

    public CompletableFuture<SnapshotInfo> getCompletionFuture() {
        return this.mCompletionFuture;
    }

    public void beforeStart(ClientCallStreamObserver<S> clientCallStreamObserver) {
        this.mStream = clientCallStreamObserver;
    }
}
