/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.sql.execution.streaming.state;

import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.Serializable;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.fs.FSError;
import org.apache.hadoop.fs.Path;
import org.apache.spark.internal.Logging;
import org.apache.spark.io.CompressionCodec;
import org.apache.spark.sql.execution.streaming.CheckpointFileManager;
import org.slf4j.Logger;
import scala.Function0;
import scala.Option;
import scala.Predef$;
import scala.collection.ArrayOps$;
import scala.reflect.ScalaSignature;
import scala.util.control.NonFatal$;

@ScalaSignature(bytes="\u0006\u0005\u0005Eb\u0001B\n\u0015\u0001\rB\u0001\u0002\r\u0001\u0003\u0002\u0003\u0006I!\r\u0005\tk\u0001\u0011\t\u0011)A\u0005m!Aa\b\u0001B\u0001B\u0003%q\bC\u0003F\u0001\u0011\u0005a\tC\u0003M\u0001\u0011%Q\nC\u0004X\u0001\u0001\u0007I\u0011\u0002-\t\u000f=\u0004\u0001\u0019!C\u0005a\"1a\u000f\u0001Q!\neCqa\u001e\u0001A\u0002\u0013%\u0001\u0010C\u0004z\u0001\u0001\u0007I\u0011\u0002>\t\rq\u0004\u0001\u0015)\u0003O\u0011\u001di\b\u00011A\u0005\u0002yD\u0011\"!\u0002\u0001\u0001\u0004%\t!a\u0002\t\u000f\u0005-\u0001\u0001)Q\u0005\u007f\"9\u0011Q\u0002\u0001\u0005\u0002\u0005=\u0001bBA\u0013\u0001\u0011\u0005\u0011q\u0005\u0005\b\u0003W\u0001A\u0011AA\u0017\u0011\u001d\ty\u0003\u0001C\u0001\u0003[\u0011\u0011d\u0015;bi\u0016\u001cFo\u001c:f\u0007\"\fgnZ3m_\u001e<&/\u001b;fe*\u0011QCF\u0001\u0006gR\fG/\u001a\u0006\u0003/a\t\u0011b\u001d;sK\u0006l\u0017N\\4\u000b\u0005eQ\u0012!C3yK\u000e,H/[8o\u0015\tYB$A\u0002tc2T!!\b\u0010\u0002\u000bM\u0004\u0018M]6\u000b\u0005}\u0001\u0013AB1qC\u000eDWMC\u0001\"\u0003\ry'oZ\u0002\u0001'\r\u0001AE\u000b\t\u0003K!j\u0011A\n\u0006\u0002O\u0005)1oY1mC&\u0011\u0011F\n\u0002\u0007\u0003:L(+\u001a4\u0011\u0005-rS\"\u0001\u0017\u000b\u00055b\u0012\u0001C5oi\u0016\u0014h.\u00197\n\u0005=b#a\u0002'pO\u001eLgnZ\u0001\u0003M6\u0004\"AM\u001a\u000e\u0003YI!\u0001\u000e\f\u0003+\rCWmY6q_&tGOR5mK6\u000bg.Y4fe\u0006!a-\u001b7f!\t9D(D\u00019\u0015\tI$(\u0001\u0002gg*\u00111HH\u0001\u0007Q\u0006$wn\u001c9\n\u0005uB$\u0001\u0002)bi\"\f\u0001cY8naJ,7o]5p]\u000e{G-Z2\u0011\u0005\u0001\u001bU\"A!\u000b\u0005\tc\u0012AA5p\u0013\t!\u0015I\u0001\tD_6\u0004(/Z:tS>t7i\u001c3fG\u00061A(\u001b8jiz\"BaR%K\u0017B\u0011\u0001\nA\u0007\u0002)!)\u0001\u0007\u0002a\u0001c!)Q\u0007\u0002a\u0001m!)a\b\u0002a\u0001\u007f\u0005q1m\\7qe\u0016\u001c8o\u0015;sK\u0006lGC\u0001(V!\ty5+D\u0001Q\u0015\t\u0011\u0015KC\u0001S\u0003\u0011Q\u0017M^1\n\u0005Q\u0003&\u0001\u0005#bi\u0006|U\u000f\u001e9viN#(/Z1n\u0011\u00151V\u00011\u0001O\u00031yW\u000f\u001e9viN#(/Z1n\u0003E\u0011\u0017mY6j]\u001e4\u0015\u000e\\3TiJ,\u0017-\\\u000b\u00023B\u0011!\f\u001c\b\u00037*t!\u0001X5\u000f\u0005uCgB\u00010h\u001d\tyfM\u0004\u0002aK:\u0011\u0011\rZ\u0007\u0002E*\u00111MI\u0001\u0007yI|w\u000e\u001e \n\u0003\u0005J!a\b\u0011\n\u0005uq\u0012BA\u000e\u001d\u0013\tI\"$\u0003\u0002\u00181%\u00111NF\u0001\u0016\u0007\",7m\u001b9pS:$h)\u001b7f\u001b\u0006t\u0017mZ3s\u0013\tigNA\u000fDC:\u001cW\r\u001c7bE2,gi\u0015#bi\u0006|U\u000f\u001e9viN#(/Z1n\u0015\tYg#A\u000bcC\u000e\\\u0017N\\4GS2,7\u000b\u001e:fC6|F%Z9\u0015\u0005E$\bCA\u0013s\u0013\t\u0019hE\u0001\u0003V]&$\bbB;\b\u0003\u0003\u0005\r!W\u0001\u0004q\u0012\n\u0014A\u00052bG.Lgn\u001a$jY\u0016\u001cFO]3b[\u0002\n\u0001cY8naJ,7o]3e'R\u0014X-Y7\u0016\u00039\u000bAcY8naJ,7o]3e'R\u0014X-Y7`I\u0015\fHCA9|\u0011\u001d)(\"!AA\u00029\u000b\u0011cY8naJ,7o]3e'R\u0014X-Y7!\u0003\u0011\u0019\u0018N_3\u0016\u0003}\u00042!JA\u0001\u0013\r\t\u0019A\n\u0002\u0004\u0013:$\u0018\u0001C:ju\u0016|F%Z9\u0015\u0007E\fI\u0001C\u0004v\u001b\u0005\u0005\t\u0019A@\u0002\u000bML'0\u001a\u0011\u0002\u0007A,H\u000fF\u0003r\u0003#\t\t\u0003C\u0004\u0002\u0014=\u0001\r!!\u0006\u0002\u0007-,\u0017\u0010E\u0003&\u0003/\tY\"C\u0002\u0002\u001a\u0019\u0012Q!\u0011:sCf\u00042!JA\u000f\u0013\r\tyB\n\u0002\u0005\u0005f$X\rC\u0004\u0002$=\u0001\r!!\u0006\u0002\u000bY\fG.^3\u0002\r\u0011,G.\u001a;f)\r\t\u0018\u0011\u0006\u0005\b\u0003'\u0001\u0002\u0019AA\u000b\u0003\u0015\t'm\u001c:u)\u0005\t\u0018AB2p[6LG\u000f")
public class StateStoreChangelogWriter
implements Logging {
    private final Path file;
    private final CompressionCodec compressionCodec;
    private CheckpointFileManager.CancellableFSDataOutputStream backingFileStream;
    private DataOutputStream compressedStream;
    private int size;
    private transient Logger org$apache$spark$internal$Logging$$log_;

    public String logName() {
        return Logging.logName$((Logging)this);
    }

    public Logger log() {
        return Logging.log$((Logging)this);
    }

    public void logInfo(Function0<String> msg) {
        Logging.logInfo$((Logging)this, msg);
    }

    public void logDebug(Function0<String> msg) {
        Logging.logDebug$((Logging)this, msg);
    }

    public void logTrace(Function0<String> msg) {
        Logging.logTrace$((Logging)this, msg);
    }

    public void logWarning(Function0<String> msg) {
        Logging.logWarning$((Logging)this, msg);
    }

    public void logError(Function0<String> msg) {
        Logging.logError$((Logging)this, msg);
    }

    public void logInfo(Function0<String> msg, Throwable throwable) {
        Logging.logInfo$((Logging)this, msg, (Throwable)throwable);
    }

    public void logDebug(Function0<String> msg, Throwable throwable) {
        Logging.logDebug$((Logging)this, msg, (Throwable)throwable);
    }

    public void logTrace(Function0<String> msg, Throwable throwable) {
        Logging.logTrace$((Logging)this, msg, (Throwable)throwable);
    }

    public void logWarning(Function0<String> msg, Throwable throwable) {
        Logging.logWarning$((Logging)this, msg, (Throwable)throwable);
    }

    public void logError(Function0<String> msg, Throwable throwable) {
        Logging.logError$((Logging)this, msg, (Throwable)throwable);
    }

    public boolean isTraceEnabled() {
        return Logging.isTraceEnabled$((Logging)this);
    }

    public void initializeLogIfNecessary(boolean isInterpreter) {
        Logging.initializeLogIfNecessary$((Logging)this, (boolean)isInterpreter);
    }

    public boolean initializeLogIfNecessary(boolean isInterpreter, boolean silent) {
        return Logging.initializeLogIfNecessary$((Logging)this, (boolean)isInterpreter, (boolean)silent);
    }

    public boolean initializeLogIfNecessary$default$2() {
        return Logging.initializeLogIfNecessary$default$2$((Logging)this);
    }

    public void initializeForcefully(boolean isInterpreter, boolean silent) {
        Logging.initializeForcefully$((Logging)this, (boolean)isInterpreter, (boolean)silent);
    }

    public Logger org$apache$spark$internal$Logging$$log_() {
        return this.org$apache$spark$internal$Logging$$log_;
    }

    public void org$apache$spark$internal$Logging$$log__$eq(Logger x$1) {
        this.org$apache$spark$internal$Logging$$log_ = x$1;
    }

    private DataOutputStream compressStream(DataOutputStream outputStream) {
        OutputStream compressed = this.compressionCodec.compressedOutputStream((OutputStream)outputStream);
        return new DataOutputStream(compressed);
    }

    private CheckpointFileManager.CancellableFSDataOutputStream backingFileStream() {
        return this.backingFileStream;
    }

    private void backingFileStream_$eq(CheckpointFileManager.CancellableFSDataOutputStream x$1) {
        this.backingFileStream = x$1;
    }

    private DataOutputStream compressedStream() {
        return this.compressedStream;
    }

    private void compressedStream_$eq(DataOutputStream x$1) {
        this.compressedStream = x$1;
    }

    public int size() {
        return this.size;
    }

    public void size_$eq(int x$1) {
        this.size = x$1;
    }

    public void put(byte[] key, byte[] value) {
        Predef$.MODULE$.assert(this.compressedStream() != null);
        this.compressedStream().writeInt(ArrayOps$.MODULE$.size$extension(Predef$.MODULE$.byteArrayOps(key)));
        this.compressedStream().write(key);
        this.compressedStream().writeInt(ArrayOps$.MODULE$.size$extension(Predef$.MODULE$.byteArrayOps(value)));
        this.compressedStream().write(value);
        this.size_$eq(this.size() + 1);
    }

    public void delete(byte[] key) {
        Predef$.MODULE$.assert(this.compressedStream() != null);
        this.compressedStream().writeInt(ArrayOps$.MODULE$.size$extension(Predef$.MODULE$.byteArrayOps(key)));
        this.compressedStream().write(key);
        this.compressedStream().writeInt(-1);
        this.size_$eq(this.size() + 1);
    }

    public void abort() {
        block9: {
            try {
                try {
                    if (this.backingFileStream() != null) {
                        this.backingFileStream().cancel();
                    }
                    if (this.compressedStream() != null) {
                        IOUtils.closeQuietly((OutputStream)this.compressedStream());
                    }
                }
                catch (Throwable throwable) {
                    Option option;
                    FSError fSError;
                    Throwable throwable2 = throwable;
                    if (throwable2 instanceof FSError && (fSError = (FSError)throwable2).getCause() instanceof IOException) {
                        break block9;
                    }
                    if (throwable2 != null && !(option = NonFatal$.MODULE$.unapply(throwable2)).isEmpty()) {
                        Throwable ex = (Throwable)option.get();
                        this.logInfo((Function0<String>)(Function0 & Serializable)() -> "Failed to cancel changelog file " + $this.file + " for state store provider with exception=" + ex);
                        break block9;
                    }
                    throw throwable;
                }
            }
            finally {
                this.backingFileStream_$eq(null);
                this.compressedStream_$eq(null);
            }
        }
    }

    public void commit() {
        try {
            try {
                this.compressedStream().writeInt(-1);
                this.compressedStream().close();
            }
            catch (Throwable e) {
                this.abort();
                this.logError((Function0<String>)(Function0 & Serializable)() -> "Fail to commit changelog file " + $this.file + " because of exception " + e);
                throw e;
            }
        }
        finally {
            this.backingFileStream_$eq(null);
            this.compressedStream_$eq(null);
        }
    }

    public StateStoreChangelogWriter(CheckpointFileManager fm, Path file, CompressionCodec compressionCodec) {
        this.file = file;
        this.compressionCodec = compressionCodec;
        Logging.$init$((Logging)this);
        this.backingFileStream = fm.createAtomic(file, true);
        this.compressedStream = this.compressStream((DataOutputStream)((Object)this.backingFileStream()));
        this.size = 0;
    }
}

