package org.apache.flink.fs.azurefs;

import java.io.FileNotFoundException;
import java.io.IOException;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
import org.apache.flink.core.fs.RecoverableWriter;
import org.apache.flink.fs.azure.common.hadoop.BaseHadoopFsRecoverableFsDataOutputStream;
import org.apache.flink.fs.azure.common.hadoop.HadoopFsRecoverable;
import org.apache.flink.fs.shaded.hadoop3.com.ctc.wstx.cfg.InputConfigFlags;
import org.apache.flink.util.Preconditions;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/fs/azurefs/AzureBlobFsRecoverableDataOutputStream.class */
public class AzureBlobFsRecoverableDataOutputStream extends BaseHadoopFsRecoverableFsDataOutputStream {
    private static final String RENAME = ".rename";
    private static final Logger LOG = LoggerFactory.getLogger(AzureBlobFsRecoverableDataOutputStream.class);

    @VisibleForTesting
    static int minBufferLength = InputConfigFlags.CFG_XMLID_TYPING;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/fs/azurefs/AzureBlobFsRecoverableDataOutputStream$ABFSCommitter.class */
    public static class ABFSCommitter implements RecoverableFsDataOutputStream.Committer {
        private final FileSystem fs;
        private final HadoopFsRecoverable recoverable;

        /* JADX INFO: Access modifiers changed from: package-private */
        public ABFSCommitter(FileSystem fileSystem, HadoopFsRecoverable hadoopFsRecoverable) {
            this.fs = (FileSystem) Preconditions.checkNotNull(fileSystem);
            this.recoverable = (HadoopFsRecoverable) Preconditions.checkNotNull(hadoopFsRecoverable);
        }

        public void commit() throws IOException {
            Path tempFile = this.recoverable.tempFile();
            Path targetFile = this.recoverable.targetFile();
            long offset = this.recoverable.offset();
            FileStatus fileStatus = null;
            try {
                fileStatus = this.fs.getFileStatus(tempFile);
            } catch (FileNotFoundException e) {
            } catch (IOException e2) {
                throw new IOException("Cannot clean commit: Staging file does not exist.");
            }
            if (fileStatus == null) {
                if (!this.fs.exists(targetFile)) {
                    throw new IOException("Unrecoverable exception while trying to recover " + this.recoverable.tempFile());
                }
                return;
            }
            AzureBlobFsRecoverableDataOutputStream.LOG.debug("The srcStatus is {} and exp length is {}", Long.valueOf(fileStatus.getLen()), Long.valueOf(offset));
            if (fileStatus.getLen() != offset) {
                AzureBlobFsRecoverableDataOutputStream.LOG.error("The src file {} with length {} does not match the expected length {}", new Object[]{tempFile, Long.valueOf(fileStatus.getLen()), Long.valueOf(offset)});
                IOException iOException = new IOException("The src file " + tempFile + " with length " + fileStatus.getLen() + " does not match the expected length " + iOException);
                throw iOException;
            }
            try {
                this.fs.rename(tempFile, targetFile);
            } catch (IOException e3) {
                throw new IOException("Committing file by rename failed: " + tempFile + " to " + targetFile, e3);
            }
        }

        public void commitAfterRecovery() throws IOException {
            commit();
        }

        public RecoverableWriter.CommitRecoverable getRecoverable() {
            return this.recoverable;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public AzureBlobFsRecoverableDataOutputStream(FileSystem fileSystem, Path path, Path path2) throws IOException {
        this.fs = (FileSystem) Preconditions.checkNotNull(fileSystem);
        this.targetFile = (Path) Preconditions.checkNotNull(path);
        LOG.debug("The targetFile is {}", path.getName());
        this.tempFile = (Path) Preconditions.checkNotNull(path2);
        LOG.debug("The tempFile is {}", path2.getName());
        this.out = fileSystem.create(path2);
    }

    @VisibleForTesting
    AzureBlobFsRecoverableDataOutputStream(FileSystem fileSystem, Path path, Path path2, FSDataOutputStream fSDataOutputStream) {
        this.fs = (FileSystem) Preconditions.checkNotNull(fileSystem);
        this.targetFile = (Path) Preconditions.checkNotNull(path);
        this.tempFile = (Path) Preconditions.checkNotNull(path2);
        this.out = fSDataOutputStream;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public AzureBlobFsRecoverableDataOutputStream(FileSystem fileSystem, HadoopFsRecoverable hadoopFsRecoverable) throws IOException {
        this.fs = (FileSystem) Preconditions.checkNotNull(fileSystem);
        this.targetFile = (Path) Preconditions.checkNotNull(hadoopFsRecoverable.targetFile());
        this.tempFile = (Path) Preconditions.checkNotNull(hadoopFsRecoverable.tempFile());
        if (fileSystem.exists(this.tempFile)) {
            long len = fileSystem.getFileStatus(this.tempFile).getLen();
            LOG.info("The recoverable offset is {} and the file len is {}", Long.valueOf(hadoopFsRecoverable.offset()), Long.valueOf(len));
            if (len > hadoopFsRecoverable.offset()) {
                truncate(fileSystem, hadoopFsRecoverable);
            } else if (len < hadoopFsRecoverable.offset()) {
                LOG.error("Temp file length {} is less than the expected recoverable offset {}", Long.valueOf(len), Long.valueOf(hadoopFsRecoverable.offset()));
                hadoopFsRecoverable.offset();
                IOException iOException = new IOException("Unable to create recoverable outputstream as length of file " + len + " is less than recoverable offset " + iOException);
                throw iOException;
            }
        } else {
            LOG.error("The temp file is not found {}", this.tempFile);
            Path path = new Path(this.tempFile.toString() + ".rename");
            if (!fileSystem.exists(path)) {
                LOG.error("Unrecoverable error. As the required {} file is not found", this.tempFile);
                throw new IOException("Unable to recover the job as the expected " + this.tempFile + " file is not found");
            }
            LOG.info("Found the rename file. Probably a case where the rename did not happen {}", path);
            if (fileSystem.getFileStatus(path).getLen() != hadoopFsRecoverable.offset()) {
                LOG.error("Unrecoverable error. As the required {} file is not found", this.tempFile);
                throw new IOException("Unable to recover the job as the expected " + this.tempFile + " file is not found");
            }
            rename(fileSystem, path);
        }
        this.out = fileSystem.append(this.tempFile);
        if (this.out.getPos() == 0) {
            this.initialFileSize = fileSystem.getFileStatus(this.tempFile).getLen();
        }
        LOG.debug("Created a new OS for appending {}", this.tempFile);
    }

    private void truncate(FileSystem fileSystem, HadoopFsRecoverable hadoopFsRecoverable) throws IOException {
        Path path = new Path(this.tempFile.toString() + ".rename");
        try {
            LOG.info("Creating the temp rename file {} for truncating the tempFile {}", path, this.tempFile);
            FSDataOutputStream create = fileSystem.create(path);
            LOG.info("Opening the tempFile {} for truncate", this.tempFile);
            FSDataInputStream open = fileSystem.open(this.tempFile);
            long offset = hadoopFsRecoverable.offset();
            long j = 0;
            while (offset != 0) {
                byte[] bArr = ((long) minBufferLength) < offset ? new byte[minBufferLength] : new byte[(int) offset];
                int read = open.read(bArr, 0, bArr.length);
                if (read != -1) {
                    offset -= read;
                    LOG.info("Bytes remaining to read {}", Long.valueOf(offset));
                    create.write(bArr, 0, read);
                    j += read;
                    LOG.info("Successfully wrote {} bytes of data", Long.valueOf(j));
                } else {
                    LOG.debug("Reached the end of the file");
                    offset = 0;
                }
            }
            LOG.info("Closing the temp rename file {}", path);
            create.close();
            if (open != null) {
                LOG.debug("Closing the input stream");
                open.close();
            }
            try {
                LOG.info("Deleting the actual temp file {}", this.tempFile);
                fileSystem.delete(this.tempFile, false);
                rename(fileSystem, path);
            } catch (IOException e) {
                LOG.error("Unable to recover. Error while deleting the temp file {}", this.tempFile);
                throw e;
            }
        } catch (IOException e2) {
            LOG.error("Unable to recover. Exception while trying to truncate the temp file {}", this.tempFile);
            throw e2;
        }
    }

    private void rename(FileSystem fileSystem, Path path) throws IOException {
        LOG.info("Renaming the temp rename file {} back to tempFile {}", path, this.tempFile);
        try {
            if (fileSystem.rename(path, this.tempFile)) {
                LOG.info("Rename was successful");
            } else {
                LOG.error("Unable to recover. Rename operation failed {} to {}", path, this.tempFile);
                throw new IOException("Unable to recover. Rename operation failed");
            }
        } catch (IOException e) {
            LOG.error("Unable to recover. Renaming of tempFile did not happen after truncating {} to {}", path, this.tempFile);
            throw e;
        }
    }

    @Override // org.apache.flink.fs.azure.common.hadoop.BaseHadoopFsRecoverableFsDataOutputStream
    public void sync() throws IOException {
        this.out.hsync();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public RecoverableFsDataOutputStream.Committer createCommitterFromResumeRecoverable(HadoopFsRecoverable hadoopFsRecoverable) {
        return new ABFSCommitter(this.fs, hadoopFsRecoverable);
    }
}
