package org.apache.flink.fs.osshadoop.common;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.time.Duration;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
import org.apache.flink.core.fs.RecoverableWriter;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.shell.Truncate;
import org.apache.hadoop.fs.viewfs.ViewFileSystem;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.util.VersionInfo;

/* JADX INFO: Access modifiers changed from: package-private */
@Internal
/* loaded from: input_file:org/apache/flink/fs/osshadoop/common/HadoopRecoverableFsDataOutputStream.class */
public class HadoopRecoverableFsDataOutputStream extends BaseHadoopFsRecoverableFsDataOutputStream {
    private static final long LEASE_TIMEOUT = 100000;
    private static Method truncateHandle;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/fs/osshadoop/common/HadoopRecoverableFsDataOutputStream$HadoopFsCommitter.class */
    public static class HadoopFsCommitter implements RecoverableFsDataOutputStream.Committer {
        private final FileSystem fs;
        private final HadoopFsRecoverable recoverable;

        /* JADX INFO: Access modifiers changed from: package-private */
        public HadoopFsCommitter(FileSystem fileSystem, HadoopFsRecoverable hadoopFsRecoverable) {
            this.fs = (FileSystem) Preconditions.checkNotNull(fileSystem);
            this.recoverable = (HadoopFsRecoverable) Preconditions.checkNotNull(hadoopFsRecoverable);
        }

        public void commit() throws IOException {
            Path tempFile = this.recoverable.tempFile();
            Path targetFile = this.recoverable.targetFile();
            try {
                if (this.fs.getFileStatus(tempFile).getLen() != this.recoverable.offset()) {
                    throw new IOException("Cannot clean commit: File has trailing junk data.");
                }
                try {
                    this.fs.rename(tempFile, targetFile);
                } catch (IOException e) {
                    throw new IOException("Committing file by rename failed: " + tempFile + " to " + targetFile, e);
                }
            } catch (IOException e2) {
                throw new IOException("Cannot clean commit: Staging file does not exist.");
            }
        }

        public void commitAfterRecovery() throws IOException {
            Path tempFile = this.recoverable.tempFile();
            Path targetFile = this.recoverable.targetFile();
            long offset = this.recoverable.offset();
            FileStatus fileStatus = null;
            try {
                fileStatus = this.fs.getFileStatus(tempFile);
            } catch (FileNotFoundException e) {
            } catch (IOException e2) {
                throw new IOException("Committing during recovery failed: Could not access status of source file.");
            }
            if (fileStatus == null) {
                if (!this.fs.exists(targetFile)) {
                }
                return;
            }
            if (fileStatus.getLen() > offset) {
                HadoopRecoverableFsDataOutputStream.safelyTruncateFile(this.fs, tempFile, this.recoverable);
            }
            try {
                this.fs.rename(tempFile, targetFile);
            } catch (IOException e3) {
                throw new IOException("Committing file by rename failed: " + tempFile + " to " + targetFile, e3);
            }
        }

        public RecoverableWriter.CommitRecoverable getRecoverable() {
            return this.recoverable;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public HadoopRecoverableFsDataOutputStream(FileSystem fileSystem, Path path, Path path2) throws IOException {
        ensureTruncateInitialized();
        this.fs = (FileSystem) Preconditions.checkNotNull(fileSystem);
        this.targetFile = (Path) Preconditions.checkNotNull(path);
        this.tempFile = (Path) Preconditions.checkNotNull(path2);
        this.out = fileSystem.create(path2);
    }

    @VisibleForTesting
    HadoopRecoverableFsDataOutputStream(FileSystem fileSystem, Path path, Path path2, FSDataOutputStream fSDataOutputStream) {
        this.fs = (FileSystem) Preconditions.checkNotNull(fileSystem);
        this.targetFile = (Path) Preconditions.checkNotNull(path);
        this.tempFile = (Path) Preconditions.checkNotNull(path2);
        this.out = fSDataOutputStream;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public HadoopRecoverableFsDataOutputStream(FileSystem fileSystem, HadoopFsRecoverable hadoopFsRecoverable) throws IOException {
        ensureTruncateInitialized();
        this.fs = (FileSystem) Preconditions.checkNotNull(fileSystem);
        this.targetFile = (Path) Preconditions.checkNotNull(hadoopFsRecoverable.targetFile());
        this.tempFile = (Path) Preconditions.checkNotNull(hadoopFsRecoverable.tempFile());
        safelyTruncateFile(fileSystem, this.tempFile, hadoopFsRecoverable);
        this.out = fileSystem.append(this.tempFile);
        long pos = this.out.getPos();
        if (pos != hadoopFsRecoverable.offset()) {
            IOUtils.closeQuietly(this.out);
            throw new IOException("Truncate failed: " + this.tempFile + " (requested=" + hadoopFsRecoverable.offset() + " ,size=" + pos + ')');
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public RecoverableFsDataOutputStream.Committer createCommitterFromResumeRecoverable(HadoopFsRecoverable hadoopFsRecoverable) {
        return new HadoopFsCommitter(this.fs, hadoopFsRecoverable);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void safelyTruncateFile(FileSystem fileSystem, Path path, HadoopFsRecoverable hadoopFsRecoverable) throws IOException {
        ensureTruncateInitialized();
        revokeLeaseByFileSystem(fileSystem, path);
        try {
            if (truncate(fileSystem, path, hadoopFsRecoverable.offset())) {
                return;
            }
            revokeLeaseByFileSystem(fileSystem, path);
        } catch (Exception e) {
            throw new IOException("Problem while truncating file: " + path, e);
        }
    }

    private static void ensureTruncateInitialized() throws FlinkRuntimeException {
        if (HadoopUtils.isMinHadoopVersion(2, 7) && truncateHandle == null) {
            try {
                Method method = FileSystem.class.getMethod(Truncate.NAME, Path.class, Long.TYPE);
                if (!Modifier.isPublic(method.getModifiers())) {
                    throw new FlinkRuntimeException("Could not find a public truncate method on the Hadoop File System.");
                }
                truncateHandle = method;
            } catch (NoSuchMethodException e) {
                throw new FlinkRuntimeException("Could not find a public truncate method on the Hadoop File System.");
            }
        }
    }

    private static boolean truncate(FileSystem fileSystem, Path path, long j) throws IOException {
        if (!HadoopUtils.isMinHadoopVersion(2, 7)) {
            throw new IllegalStateException("Truncation is not available in hadoop version < 2.7 , You are on Hadoop " + VersionInfo.getVersion());
        }
        if (truncateHandle == null) {
            throw new IllegalStateException("Truncation handle has not been initialized");
        }
        try {
            return ((Boolean) truncateHandle.invoke(fileSystem, path, Long.valueOf(j))).booleanValue();
        } catch (InvocationTargetException e) {
            ExceptionUtils.rethrowIOException(e.getTargetException());
            return false;
        } catch (Throwable th) {
            throw new IOException("Truncation of file failed because of access/linking problems with Hadoop's truncate call. This is most likely a dependency conflict or class loading problem.");
        }
    }

    private static boolean revokeLeaseByFileSystem(FileSystem fileSystem, Path path) throws IOException {
        if (!(fileSystem instanceof ViewFileSystem)) {
            return waitUntilLeaseIsRevoked(fileSystem, path);
        }
        Path resolvePath = ((ViewFileSystem) fileSystem).resolvePath(path);
        return waitUntilLeaseIsRevoked(resolvePath.getFileSystem(fileSystem.getConf()), resolvePath);
    }

    private static boolean waitUntilLeaseIsRevoked(FileSystem fileSystem, Path path) throws IOException {
        boolean z;
        Preconditions.checkState(fileSystem instanceof DistributedFileSystem);
        DistributedFileSystem distributedFileSystem = (DistributedFileSystem) fileSystem;
        distributedFileSystem.recoverLease(path);
        Deadline plus = Deadline.now().plus(Duration.ofMillis(LEASE_TIMEOUT));
        boolean isFileClosed = distributedFileSystem.isFileClosed(path);
        while (true) {
            z = isFileClosed;
            if (z || !plus.hasTimeLeft()) {
                break;
            }
            try {
                Thread.sleep(500L);
                isFileClosed = distributedFileSystem.isFileClosed(path);
            } catch (InterruptedException e) {
                throw new IOException("Recovering the lease failed: ", e);
            }
        }
        return z;
    }
}
