package org.apache.hudi.common.table.log;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StorageSchemes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hudi/common/table/log/HoodieLogFormatWriter.class */
public class HoodieLogFormatWriter implements HoodieLogFormat.Writer {
    private static final Logger LOG = LoggerFactory.getLogger(HoodieLogFormatWriter.class);
    private HoodieLogFile logFile;
    private FSDataOutputStream output;
    private final HoodieStorage storage;
    private final long sizeThreshold;
    private final Integer bufferSize;
    private final Short replication;
    private final String rolloverLogWriteToken;
    final HoodieLogFileWriteCallback logFileWriteCallback;
    private boolean closed = false;
    private transient Thread shutdownThread = null;
    private static final String APPEND_UNAVAILABLE_EXCEPTION_MESSAGE = "not sufficiently replicated yet";

    public HoodieLogFormatWriter(HoodieStorage hoodieStorage, HoodieLogFile hoodieLogFile, Integer num, Short sh, Long l, String str, HoodieLogFileWriteCallback hoodieLogFileWriteCallback) {
        this.storage = hoodieStorage;
        this.logFile = hoodieLogFile;
        this.sizeThreshold = l.longValue();
        this.bufferSize = Integer.valueOf(num != null ? num.intValue() : hoodieStorage.getDefaultBufferSize());
        this.replication = Short.valueOf(sh != null ? sh.shortValue() : hoodieStorage.getDefaultReplication(hoodieLogFile.getPath().getParent()));
        this.rolloverLogWriteToken = str;
        this.logFileWriteCallback = hoodieLogFileWriteCallback;
        addShutDownHook();
    }

    @Override // org.apache.hudi.common.table.log.HoodieLogFormat.Writer
    public HoodieLogFile getLogFile() {
        return this.logFile;
    }

    public long getSizeThreshold() {
        return this.sizeThreshold;
    }

    private FSDataOutputStream getOutputStream() throws IOException, InterruptedException {
        if (this.output == null) {
            Path path = new Path(this.logFile.getPath().toUri());
            FileSystem fileSystem = (FileSystem) this.storage.getFileSystem();
            if (fileSystem.exists(path)) {
                boolean isAppendSupported = StorageSchemes.isAppendSupported(fileSystem.getScheme());
                boolean preLogFileOpen = isAppendSupported ? this.logFileWriteCallback.preLogFileOpen(this.logFile) : false;
                if (preLogFileOpen) {
                    LOG.info(this.logFile + " exists. Appending to existing file");
                    try {
                        this.output = fileSystem.append(path, this.bufferSize.intValue());
                    } catch (IOException e) {
                        if (!e.getMessage().toLowerCase().contains("not supported")) {
                            close();
                            throw e;
                        }
                        isAppendSupported = false;
                    } catch (RemoteException e2) {
                        LOG.warn("Remote Exception, attempting to handle or recover lease", e2);
                        handleAppendExceptionOrRecoverLease(path, e2);
                    }
                }
                if (!isAppendSupported || !preLogFileOpen) {
                    rollOver();
                    createNewFile();
                    LOG.info((isAppendSupported ? "Append not supported" : "Callback failed") + ". Rolling over to " + this.logFile);
                }
            } else {
                LOG.info(this.logFile + " does not exist. Create a new file");
                createNewFile();
            }
        }
        return this.output;
    }

    @Override // org.apache.hudi.common.table.log.HoodieLogFormat.Writer
    public AppendResult appendBlock(HoodieLogBlock hoodieLogBlock) throws IOException, InterruptedException {
        return appendBlocks(Collections.singletonList(hoodieLogBlock));
    }

    @Override // org.apache.hudi.common.table.log.HoodieLogFormat.Writer
    public AppendResult appendBlocks(List<HoodieLogBlock> list) throws IOException, InterruptedException {
        HoodieLogFormatVersion hoodieLogFormatVersion = new HoodieLogFormatVersion(1);
        FSDataOutputStream outputStream = getOutputStream();
        long pos = outputStream.getPos();
        long j = 0;
        FSDataOutputStream fSDataOutputStream = new FSDataOutputStream(outputStream, new FileSystem.Statistics(this.storage.getScheme()), pos);
        for (HoodieLogBlock hoodieLogBlock : list) {
            long size = fSDataOutputStream.size();
            fSDataOutputStream.write(HoodieLogFormat.MAGIC);
            byte[] logMetadataBytes = HoodieLogBlock.getLogMetadataBytes(hoodieLogBlock.getLogBlockHeader());
            byte[] contentBytes = hoodieLogBlock.getContentBytes(this.storage);
            byte[] logMetadataBytes2 = HoodieLogBlock.getLogMetadataBytes(hoodieLogBlock.getLogBlockFooter());
            fSDataOutputStream.writeLong(getLogBlockLength(contentBytes.length, logMetadataBytes.length, logMetadataBytes2.length));
            fSDataOutputStream.writeInt(hoodieLogFormatVersion.getVersion());
            fSDataOutputStream.writeInt(hoodieLogBlock.getBlockType().ordinal());
            fSDataOutputStream.write(logMetadataBytes);
            fSDataOutputStream.writeLong(contentBytes.length);
            fSDataOutputStream.write(contentBytes);
            fSDataOutputStream.write(logMetadataBytes2);
            fSDataOutputStream.writeLong(fSDataOutputStream.size() - size);
            if (fSDataOutputStream.size() == Integer.MAX_VALUE) {
                throw new HoodieIOException("Blocks appended may overflow. Please decrease log block size or log block amount");
            }
            j += fSDataOutputStream.size() - size;
        }
        flush();
        AppendResult appendResult = new AppendResult(this.logFile, pos, j);
        rolloverIfNeeded();
        return appendResult;
    }

    private int getLogBlockLength(int i, int i2, int i3) {
        return 8 + i2 + 8 + i + i3 + 8;
    }

    private void rolloverIfNeeded() throws IOException {
        if (getCurrentSize() > this.sizeThreshold) {
            LOG.info("CurrentSize {} has reached threshold {}. Rolling over to the next version", Long.valueOf(getCurrentSize()), Long.valueOf(this.sizeThreshold));
            rollOver();
        }
    }

    private void rollOver() throws IOException {
        closeStream();
        this.logFile = this.logFile.rollOver(this.storage, this.rolloverLogWriteToken);
        this.closed = false;
    }

    private void createNewFile() throws IOException {
        this.logFileWriteCallback.preLogFileCreate(this.logFile);
        this.output = new FSDataOutputStream(this.storage.create(this.logFile.getPath(), false, this.bufferSize, this.replication, 536870912L), new FileSystem.Statistics(this.storage.getScheme()));
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        if (null != this.shutdownThread) {
            Runtime.getRuntime().removeShutdownHook(this.shutdownThread);
        }
        this.logFileWriteCallback.preLogFileClose(this.logFile);
        try {
            closeStream();
        } finally {
            this.logFileWriteCallback.postLogFileClose(this.logFile);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void closeStream() throws IOException {
        if (this.output != null) {
            flush();
            this.output.close();
            this.output = null;
            this.closed = true;
        }
    }

    private void flush() throws IOException {
        if (this.output == null) {
            return;
        }
        this.output.flush();
        this.output.hsync();
    }

    @Override // org.apache.hudi.common.table.log.HoodieLogFormat.Writer
    public long getCurrentSize() throws IOException {
        if (this.closed) {
            throw new IllegalStateException("Cannot get current size as the underlying stream has been closed already");
        }
        if (this.output == null) {
            return 0L;
        }
        return this.output.getPos();
    }

    private void addShutDownHook() {
        this.shutdownThread = new Thread() { // from class: org.apache.hudi.common.table.log.HoodieLogFormatWriter.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    HoodieLogFormatWriter.LOG.warn("running logformatwriter hook");
                    if (HoodieLogFormatWriter.this.output != null) {
                        HoodieLogFormatWriter.this.closeStream();
                    }
                } catch (Exception e) {
                    HoodieLogFormatWriter.LOG.warn(String.format("unable to close output stream for log file %s", HoodieLogFormatWriter.this.logFile), e);
                }
            }
        };
        Runtime.getRuntime().addShutdownHook(this.shutdownThread);
    }

    private void handleAppendExceptionOrRecoverLease(Path path, RemoteException remoteException) throws IOException, InterruptedException {
        DistributedFileSystem distributedFileSystem = (FileSystem) this.storage.getFileSystem();
        if (remoteException.getMessage().contains(APPEND_UNAVAILABLE_EXCEPTION_MESSAGE)) {
            LOG.warn("Failed to open an append stream to the log file. Opening a new log file..", remoteException);
            rollOver();
            createNewFile();
            return;
        }
        if (remoteException.getClassName().contentEquals(AlreadyBeingCreatedException.class.getName())) {
            LOG.warn("Another task executor writing to the same log file(" + this.logFile + ". Rolling over");
            rollOver();
            createNewFile();
            return;
        }
        if (!remoteException.getClassName().contentEquals(RecoveryInProgressException.class.getName()) || !(distributedFileSystem instanceof DistributedFileSystem)) {
            try {
                closeStream();
                throw new HoodieIOException("Failed to append to the output stream ", remoteException);
            } catch (Exception e) {
                LOG.warn("Failed to close the output stream for " + distributedFileSystem.getClass().getName() + " on path " + path + ". Rolling over to a new log file.");
                rollOver();
                createNewFile();
                return;
            }
        }
        LOG.warn("Trying to recover log on path " + path);
        if (HadoopFSUtils.recoverDFSFileLease(distributedFileSystem, path)) {
            LOG.warn("Recovered lease on path " + path);
            this.output = distributedFileSystem.append(path, this.bufferSize.intValue());
        } else {
            String str = "Failed to recover lease on path " + path;
            LOG.warn(str);
            throw new HoodieException(str, remoteException);
        }
    }
}
