package org.apache.flink.runtime.state.filesystem;

import java.io.IOException;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.filemerging.SegmentFileStateHandle;
import org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/state/filesystem/FileMergingCheckpointStateOutputStream.class */
public class FileMergingCheckpointStateOutputStream extends FsCheckpointStreamFactory.FsCheckpointStateOutputStream {
    private static final Logger LOG = LoggerFactory.getLogger(FileMergingCheckpointStateOutputStream.class);
    private final FileMergingSnapshotManagerProxy fileMergingSnapshotManagerProxy;
    private volatile boolean closed;
    private Path filePath;

    @Nullable
    private FSDataOutputStream outputStream;
    private long startPos;
    private long curPosRelative;
    private final byte[] writeBuffer;
    private int bufferPos;

    /* loaded from: input_file:org/apache/flink/runtime/state/filesystem/FileMergingCheckpointStateOutputStream$FileMergingSnapshotManagerProxy.class */
    public interface FileMergingSnapshotManagerProxy {
        Tuple2<FSDataOutputStream, Path> providePhysicalFile() throws IOException;

        SegmentFileStateHandle closeStreamAndCreateStateHandle(Path path, long j, long j2) throws IOException;

        void closeStreamExceptionally() throws IOException;
    }

    public FileMergingCheckpointStateOutputStream(int i, FileMergingSnapshotManagerProxy fileMergingSnapshotManagerProxy) {
        super(null, null, i, -1);
        this.curPosRelative = 0L;
        this.fileMergingSnapshotManagerProxy = fileMergingSnapshotManagerProxy;
        this.writeBuffer = new byte[i];
    }

    private void initializeOutputStream() throws IOException {
        Tuple2<FSDataOutputStream, Path> providePhysicalFile = this.fileMergingSnapshotManagerProxy.providePhysicalFile();
        this.outputStream = (FSDataOutputStream) providePhysicalFile.f0;
        this.startPos = this.outputStream.getPos();
        this.filePath = (Path) providePhysicalFile.f1;
    }

    @Override // org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.FsCheckpointStateOutputStream
    public long getPos() throws IOException {
        return this.bufferPos + this.curPosRelative;
    }

    @Override // org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.FsCheckpointStateOutputStream
    public void write(int i) throws IOException {
        if (this.bufferPos >= this.writeBuffer.length) {
            flushToFile();
        }
        byte[] bArr = this.writeBuffer;
        int i2 = this.bufferPos;
        this.bufferPos = i2 + 1;
        bArr[i2] = (byte) i;
    }

    @Override // org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.FsCheckpointStateOutputStream
    public void write(byte[] bArr, int i, int i2) throws IOException {
        if (i2 >= this.writeBuffer.length) {
            flushToFile();
            this.outputStream.write(bArr, i, i2);
            this.curPosRelative += i2;
            return;
        }
        int length = this.writeBuffer.length - this.bufferPos;
        if (i2 > length) {
            System.arraycopy(bArr, i, this.writeBuffer, this.bufferPos, length);
            i += length;
            i2 -= length;
            this.bufferPos += length;
            flushToFile();
        }
        System.arraycopy(bArr, i, this.writeBuffer, this.bufferPos, i2);
        this.bufferPos += i2;
    }

    @Override // org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.FsCheckpointStateOutputStream
    public void flush() throws IOException {
        if (this.outputStream != null) {
            flushToFile();
        }
    }

    @Override // org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.FsCheckpointStateOutputStream
    public void sync() throws IOException {
        if (this.outputStream != null) {
            this.outputStream.sync();
        }
    }

    @Override // org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.FsCheckpointStateOutputStream, org.apache.flink.runtime.state.CheckpointStateOutputStream
    @Nullable
    public SegmentFileStateHandle closeAndGetHandle() throws IOException {
        SegmentFileStateHandle closeStreamAndCreateStateHandle;
        if (this.outputStream == null && this.bufferPos == 0) {
            return null;
        }
        synchronized (this) {
            try {
                if (this.closed) {
                    throw new IOException("Stream has already been closed and discarded.");
                }
                try {
                    flushToFile();
                    this.bufferPos = this.writeBuffer.length;
                    closeStreamAndCreateStateHandle = this.fileMergingSnapshotManagerProxy.closeStreamAndCreateStateHandle(this.filePath, this.startPos, this.curPosRelative);
                    this.closed = true;
                } catch (Exception e) {
                    this.fileMergingSnapshotManagerProxy.closeStreamExceptionally();
                    throw new IOException("Could not flush to file and close the file system output stream to " + this.filePath + " in order to obtain the stream state handle", e);
                }
            } catch (Throwable th) {
                this.closed = true;
                throw th;
            }
        }
        return closeStreamAndCreateStateHandle;
    }

    @Override // org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.FsCheckpointStateOutputStream, org.apache.flink.runtime.state.CheckpointStateOutputStream
    public void close() {
        if (this.closed) {
            return;
        }
        this.closed = true;
        this.bufferPos = this.writeBuffer.length;
        try {
            this.fileMergingSnapshotManagerProxy.closeStreamExceptionally();
        } catch (Throwable th) {
            LOG.warn("Could not close the state stream for {}.", this.filePath, th);
        }
    }

    @Override // org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.FsCheckpointStateOutputStream
    public void flushToFile() throws IOException {
        if (this.closed) {
            throw new IOException("Cannot call flushToFile() to a closed stream.");
        }
        if (this.outputStream == null) {
            initializeOutputStream();
        }
        if (this.bufferPos > 0) {
            this.outputStream.write(this.writeBuffer, 0, this.bufferPos);
            this.curPosRelative += this.bufferPos;
            this.bufferPos = 0;
        }
    }

    @VisibleForTesting
    @Nullable
    public Path getFilePath() {
        return this.filePath;
    }
}
