/*
 * Decompiled with CFR 0.152.
 */
package kafka.durability.topic;

import com.typesafe.scalalogging.Logger;
import java.io.Serializable;
import java.time.Duration;
import java.util.function.Supplier;
import kafka.durability.db.DurabilityDB;
import kafka.durability.events.AbstractDurabilityEvent;
import kafka.durability.materialization.DurabilityEventsMaterialize;
import kafka.durability.topic.DurabilityTopicConfig;
import kafka.durability.topic.DurabilityTopicConsumer;
import kafka.durability.topic.DurabilityTopicManager$;
import kafka.durability.topic.DurabilityTopicProducer;
import kafka.utils.Logging;
import kafka.zk.AdminZkClient;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.utils.Time;
import scala.Function0;
import scala.Function1;
import scala.Predef$;
import scala.collection.Seq;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Set;
import scala.collection.mutable.Queue;
import scala.collection.mutable.Queue$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

@ScalaSignature(bytes="\u0006\u0001\u0005mf\u0001\u0002\u0013&\u00011B\u0001\"\u000f\u0001\u0003\u0006\u0004%\tA\u000f\u0005\t\u007f\u0001\u0011\t\u0011)A\u0005w!A\u0001\t\u0001BC\u0002\u0013\u0005\u0011\t\u0003\u0005I\u0001\t\u0005\t\u0015!\u0003C\u0011!I\u0005A!b\u0001\n\u0003Q\u0005\u0002C)\u0001\u0005\u0003\u0005\u000b\u0011B&\t\u0011I\u0003!Q1A\u0005\u0002MC\u0001\u0002\u001a\u0001\u0003\u0002\u0003\u0006I\u0001\u0016\u0005\tK\u0002\u0011\t\u0011)A\u0005M\")!\u000f\u0001C\u0001g\"9!\u0010\u0001b\u0001\n\u0013Y\bBB@\u0001A\u0003%A\u0010C\u0006\u0002\u0002\u0001A)\u0019!C\u0001K\u0005\r\u0001bCA\u0006\u0001!\u0015\r\u0011\"\u0001&\u0003\u001bA\u0011\"!\u0006\u0001\u0001\u0004%\t!a\u0006\t\u0013\u0005}\u0001\u00011A\u0005\u0002\u0005\u0005\u0002\u0002CA\u0017\u0001\u0001\u0006K!!\u0007\t\u0013\u0005]\u0002\u00011A\u0005\u0002\u0005]\u0001\"CA\u001d\u0001\u0001\u0007I\u0011AA\u001e\u0011!\ty\u0004\u0001Q!\n\u0005e\u0001\"CA\"\u0001\t\u0007I\u0011AA#\u0011!\t\u0019\u0007\u0001Q\u0001\n\u0005\u001d\u0003bBA3\u0001\u0011\u0005\u0011q\r\u0005\b\u0003S\u0002A\u0011AA6\u0011\u001d\ti\u0007\u0001C\u0005\u0003WBq!a\u001c\u0001\t\u0003\t9\u0007C\u0004\u0002r\u0001!I!a\u001a\t\u000f\u0005M\u0004\u0001\"\u0003\u0002h!9\u0011Q\u000f\u0001\u0005\n\u0005]\u0004bBA?\u0001\u0011%\u0011q\u0010\u0005\b\u0003'\u0003A\u0011AAK\u000f%\tY*JA\u0001\u0012\u0003\tiJ\u0002\u0005%K\u0005\u0005\t\u0012AAP\u0011\u0019\u0011\u0018\u0005\"\u0001\u0002\"\"I\u00111U\u0011\u0012\u0002\u0013\u0005\u0011Q\u0015\u0002\u0017\tV\u0014\u0018MY5mSRLHk\u001c9jG6\u000bg.Y4fe*\u0011aeJ\u0001\u0006i>\u0004\u0018n\u0019\u0006\u0003Q%\n!\u0002Z;sC\nLG.\u001b;z\u0015\u0005Q\u0013!B6bM.\f7\u0001A\n\u0004\u00015\u001a\u0004C\u0001\u00182\u001b\u0005y#\"\u0001\u0019\u0002\u000bM\u001c\u0017\r\\1\n\u0005Iz#AB!osJ+g\r\u0005\u00025o5\tQG\u0003\u00027S\u0005)Q\u000f^5mg&\u0011\u0001(\u000e\u0002\b\u0019><w-\u001b8h\u0003\u0019\u0019wN\u001c4jOV\t1\b\u0005\u0002={5\tQ%\u0003\u0002?K\t)B)\u001e:bE&d\u0017\u000e^=U_BL7mQ8oM&<\u0017aB2p]\u001aLw\rI\u0001\tI\u0006$\u0018MY1tKV\t!\t\u0005\u0002D\r6\tAI\u0003\u0002FO\u0005\u0011AMY\u0005\u0003\u000f\u0012\u0013A\u0002R;sC\nLG.\u001b;z\t\n\u000b\u0011\u0002Z1uC\n\f7/\u001a\u0011\u0002\u00175\fG/\u001a:jC2L'0Z\u000b\u0002\u0017B\u0011AjT\u0007\u0002\u001b*\u0011ajJ\u0001\u0010[\u0006$XM]5bY&T\u0018\r^5p]&\u0011\u0001+\u0014\u0002\u001c\tV\u0014\u0018MY5mSRLXI^3oiNl\u0015\r^3sS\u0006d\u0017N_3\u0002\u00195\fG/\u001a:jC2L'0\u001a\u0011\u0002\u0015i\\7+\u001e9qY&,'/F\u0001U!\r)FLX\u0007\u0002-*\u0011q\u000bW\u0001\tMVt7\r^5p]*\u0011\u0011LW\u0001\u0005kRLGNC\u0001\\\u0003\u0011Q\u0017M^1\n\u0005u3&\u0001C*vaBd\u0017.\u001a:\u0011\u0005}\u0013W\"\u00011\u000b\u0005\u0005L\u0013A\u0001>l\u0013\t\u0019\u0007MA\u0007BI6LgNW6DY&,g\u000e^\u0001\fu.\u001cV\u000f\u001d9mS\u0016\u0014\b%\u0001\u0003uS6,\u0007CA4q\u001b\u0005A'B\u0001\u001cj\u0015\tQ7.\u0001\u0004d_6lwN\u001c\u0006\u0003U1T!!\u001c8\u0002\r\u0005\u0004\u0018m\u00195f\u0015\u0005y\u0017aA8sO&\u0011\u0011\u000f\u001b\u0002\u0005)&lW-\u0001\u0004=S:LGO\u0010\u000b\u0007iV4x\u000f_=\u0011\u0005q\u0002\u0001\"B\u001d\u000b\u0001\u0004Y\u0004\"\u0002!\u000b\u0001\u0004\u0011\u0005\"B%\u000b\u0001\u0004Y\u0005\"\u0002*\u000b\u0001\u0004!\u0006bB3\u000b!\u0003\u0005\rAZ\u0001\u001f\tV\u0013\u0016IQ%M\u0013RKvl\u0011*F\u0003RKuJT0C\u0003\u000e[uJ\u0012$`\u001bN+\u0012\u0001 \t\u0003]uL!A`\u0018\u0003\t1{gnZ\u0001 \tV\u0013\u0016IQ%M\u0013RKvl\u0011*F\u0003RKuJT0C\u0003\u000e[uJ\u0012$`\u001bN\u0003\u0013\u0001E2p]N,X.\u001a:Qe>4\u0018\u000eZ3s+\t\t)\u0001E\u0002=\u0003\u000fI1!!\u0003&\u0005]!UO]1cS2LG/\u001f+pa&\u001c7i\u001c8tk6,'/\u0001\tqe>$WoY3s!J|g/\u001b3feV\u0011\u0011q\u0002\t\u0004y\u0005E\u0011bAA\nK\t9B)\u001e:bE&d\u0017\u000e^=U_BL7\r\u0015:pIV\u001cWM]\u0001\u0006e\u0016\fG-_\u000b\u0003\u00033\u00012ALA\u000e\u0013\r\tib\f\u0002\b\u0005>|G.Z1o\u0003%\u0011X-\u00193z?\u0012*\u0017\u000f\u0006\u0003\u0002$\u0005%\u0002c\u0001\u0018\u0002&%\u0019\u0011qE\u0018\u0003\tUs\u0017\u000e\u001e\u0005\n\u0003W\u0001\u0012\u0011!a\u0001\u00033\t1\u0001\u001f\u00132\u0003\u0019\u0011X-\u00193zA!\u001a\u0011#!\r\u0011\u00079\n\u0019$C\u0002\u00026=\u0012\u0001B^8mCRLG.Z\u0001\u000bSN\u001c\u0006.\u001e;e_^t\u0017AD5t'\",H\u000fZ8x]~#S-\u001d\u000b\u0005\u0003G\ti\u0004C\u0005\u0002,M\t\t\u00111\u0001\u0002\u001a\u0005Y\u0011n]*ikR$wn\u001e8!Q\r!\u0012\u0011G\u0001\rcV,W/\u001a3Fm\u0016tGo]\u000b\u0003\u0003\u000f\u0002b!!\u0013\u0002T\u0005]SBAA&\u0015\u0011\ti%a\u0014\u0002\u000f5,H/\u00192mK*\u0019\u0011\u0011K\u0018\u0002\u0015\r|G\u000e\\3di&|g.\u0003\u0003\u0002V\u0005-#!B)vKV,\u0007\u0003BA-\u0003?j!!a\u0017\u000b\u0007\u0005us%\u0001\u0004fm\u0016tGo]\u0005\u0005\u0003C\nYFA\fBEN$(/Y2u\tV\u0014\u0018MY5mSRLXI^3oi\u0006i\u0011/^3vK\u0012,e/\u001a8ug\u0002\nQa\u001d;beR$\"!a\t\u0002\u000f%\u001c(+Z1esR\u0011\u0011\u0011D\u0001\u000fiJL()Z2p[\u0016\u0014V-\u00193z\u0003!\u0019\b.\u001e;e_^t\u0017!D:uCJ$8i\u001c8tk6,'/A\u0004dY\u0016\fg.\u001e9\u0002\u0017\u0015t7/\u001e:f)>\u0004\u0018n\u0019\u000b\u0005\u0003G\tI\b\u0003\u0004\u0002|u\u0001\rAX\u0001\u000eC\u0012l\u0017N\u001c.l\u00072LWM\u001c;\u0002\u0017Q|\u0007+\u0019:uSRLwN\u001c\u000b\u0005\u0003\u0003\u000b9\tE\u0002/\u0003\u0007K1!!\"0\u0005\rIe\u000e\u001e\u0005\b\u0003\u0013s\u0002\u0019AAF\u00039!x\u000e]5d!\u0006\u0014H/\u001b;j_:\u0004B!!$\u0002\u00106\t\u0011.C\u0002\u0002\u0012&\u0014a\u0002V8qS\u000e\u0004\u0016M\u001d;ji&|g.\u0001\nbI\u0012$UO]1cS2LG/_#wK:$H\u0003BA\u0012\u0003/Cq!!' \u0001\u0004\t9&A\u0003fm\u0016tG/\u0001\fEkJ\f'-\u001b7jif$v\u000e]5d\u001b\u0006t\u0017mZ3s!\ta\u0014e\u0005\u0002\"[Q\u0011\u0011QT\u0001\u001cI1,7o]5oSR$sM]3bi\u0016\u0014H\u0005Z3gCVdG\u000fJ\u001b\u0016\u0005\u0005\u001d&f\u00014\u0002*.\u0012\u00111\u0016\t\u0005\u0003[\u000b9,\u0004\u0002\u00020*!\u0011\u0011WAZ\u0003%)hn\u00195fG.,GMC\u0002\u00026>\n!\"\u00198o_R\fG/[8o\u0013\u0011\tI,a,\u0003#Ut7\r[3dW\u0016$g+\u0019:jC:\u001cW\r")
public class DurabilityTopicManager
implements Logging {
    private DurabilityTopicConsumer consumerProvider;
    private DurabilityTopicProducer producerProvider;
    private final DurabilityTopicConfig config;
    private final DurabilityDB database;
    private final DurabilityEventsMaterialize materialize;
    private final Supplier<AdminZkClient> zkSupplier;
    private final Time time;
    private final long DURABILITY_CREATION_BACKOFF_MS;
    private volatile boolean ready;
    private volatile boolean isShutdown;
    private final Queue<AbstractDurabilityEvent> queuedEvents;
    private Logger logger;
    private String logIdent;
    private volatile byte bitmap$0;

    public static Time $lessinit$greater$default$5() {
        return DurabilityTopicManager$.MODULE$.$lessinit$greater$default$5();
    }

    @Override
    public String loggerName() {
        return Logging.loggerName$(this);
    }

    @Override
    public String msgWithLogIdent(String msg) {
        return Logging.msgWithLogIdent$(this, msg);
    }

    @Override
    public void trace(Function0<String> msg) {
        Logging.trace$(this, msg);
    }

    @Override
    public void trace(Function0<String> msg, Function0<Throwable> e) {
        Logging.trace$(this, msg, e);
    }

    @Override
    public boolean isDebugEnabled() {
        return Logging.isDebugEnabled$(this);
    }

    @Override
    public boolean isTraceEnabled() {
        return Logging.isTraceEnabled$(this);
    }

    @Override
    public void debug(Function0<String> msg) {
        Logging.debug$(this, msg);
    }

    @Override
    public void debug(Function0<String> msg, Function0<Throwable> e) {
        Logging.debug$(this, msg, e);
    }

    @Override
    public void info(Function0<String> msg) {
        Logging.info$(this, msg);
    }

    @Override
    public void info(Function0<String> msg, Function0<Throwable> e) {
        Logging.info$(this, msg, e);
    }

    @Override
    public void warn(Function0<String> msg) {
        Logging.warn$(this, msg);
    }

    @Override
    public void warn(Function0<String> msg, Function0<Throwable> e) {
        Logging.warn$(this, msg, e);
    }

    @Override
    public void error(Function0<String> msg) {
        Logging.error$(this, msg);
    }

    @Override
    public void error(Function0<String> msg, Function0<Throwable> e) {
        Logging.error$(this, msg, e);
    }

    @Override
    public void fatal(Function0<String> msg) {
        Logging.fatal$(this, msg);
    }

    @Override
    public void fatal(Function0<String> msg, Function0<Throwable> e) {
        Logging.fatal$(this, msg, e);
    }

    private Logger logger$lzycompute() {
        synchronized (this) {
            if ((byte)(this.bitmap$0 & 4) == 0) {
                this.logger = Logging.logger$(this);
                this.bitmap$0 = (byte)(this.bitmap$0 | 4);
            }
        }
        return this.logger;
    }

    @Override
    public Logger logger() {
        if ((byte)(this.bitmap$0 & 4) == 0) {
            return this.logger$lzycompute();
        }
        return this.logger;
    }

    @Override
    public String logIdent() {
        return this.logIdent;
    }

    @Override
    public void logIdent_$eq(String x$1) {
        this.logIdent = x$1;
    }

    public DurabilityTopicConfig config() {
        return this.config;
    }

    public DurabilityDB database() {
        return this.database;
    }

    public DurabilityEventsMaterialize materialize() {
        return this.materialize;
    }

    public Supplier<AdminZkClient> zkSupplier() {
        return this.zkSupplier;
    }

    private long DURABILITY_CREATION_BACKOFF_MS() {
        return this.DURABILITY_CREATION_BACKOFF_MS;
    }

    private DurabilityTopicConsumer consumerProvider$lzycompute() {
        synchronized (this) {
            if ((byte)(this.bitmap$0 & 1) == 0) {
                this.consumerProvider = new DurabilityTopicConsumer(this.config(), this.database(), this.materialize());
                this.bitmap$0 = (byte)(this.bitmap$0 | 1);
            }
        }
        return this.consumerProvider;
    }

    public DurabilityTopicConsumer consumerProvider() {
        if ((byte)(this.bitmap$0 & 1) == 0) {
            return this.consumerProvider$lzycompute();
        }
        return this.consumerProvider;
    }

    private DurabilityTopicProducer producerProvider$lzycompute() {
        synchronized (this) {
            if ((byte)(this.bitmap$0 & 2) == 0) {
                this.producerProvider = new DurabilityTopicProducer(this.config());
                this.bitmap$0 = (byte)(this.bitmap$0 | 2);
            }
        }
        return this.producerProvider;
    }

    public DurabilityTopicProducer producerProvider() {
        if ((byte)(this.bitmap$0 & 2) == 0) {
            return this.producerProvider$lzycompute();
        }
        return this.producerProvider;
    }

    public boolean ready() {
        return this.ready;
    }

    public void ready_$eq(boolean x$1) {
        this.ready = x$1;
    }

    public boolean isShutdown() {
        return this.isShutdown;
    }

    public void isShutdown_$eq(boolean x$1) {
        this.isShutdown = x$1;
    }

    public Queue<AbstractDurabilityEvent> queuedEvents() {
        return this.queuedEvents;
    }

    public void start() {
        try {
            while (!this.ready() && !this.isShutdown()) {
                if (this.tryBecomeReady()) continue;
                this.warn((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Failed to become ready.");
                this.time.sleep(this.DURABILITY_CREATION_BACKOFF_MS());
            }
        }
        catch (Exception e) {
            if (this.isShutdown()) {
                this.debug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Ignoring exception caught during shutdown", (Function0<Throwable>)(Function0 & Serializable & scala.Serializable)() -> e);
                return;
            }
            this.error((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Caught exception while starting DurabilityTopicManager", (Function0<Throwable>)(Function0 & Serializable & scala.Serializable)() -> e);
        }
    }

    public boolean isReady() {
        return this.ready() && this.consumerProvider().isReady();
    }

    private boolean tryBecomeReady() {
        if (this.config().interBrokerClientConfigs().get().isEmpty()) {
            this.info((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Could not resolve bootstrap server. Will retry.");
            return false;
        }
        try {
            this.ensureTopic(this.zkSupplier().get());
        }
        catch (Exception e) {
            this.info((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Caught exception when ensuring durability topic is created. Will retry.", (Function0<Throwable>)(Function0 & Serializable & scala.Serializable)() -> e);
            return false;
        }
        this.startConsumer();
        this.info((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Started consumer for DurabilityTopicManager");
        return true;
    }

    public void shutdown() {
        this.isShutdown_$eq(true);
        this.cleanup();
    }

    private void startConsumer() {
        this.consumerProvider().startConsumer(this.consumerProvider().startConsumer$default$1());
        synchronized (this) {
            this.ready_$eq(true);
            this.queuedEvents().foreach((Function1 & Serializable & scala.Serializable)event -> {
                this.addDurabilityEvent(event);
                return BoxedUnit.UNIT;
            });
            return;
        }
    }

    private synchronized void cleanup() {
        try {
            this.ready_$eq(false);
            if (this.producerProvider() != null) {
                try {
                    this.producerProvider().get().close(Duration.ofSeconds(2L));
                }
                catch (InterruptException interruptException) {
                    this.info((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Received interrupt while being blocked on producer, will close", (Function0<Throwable>)(Function0 & Serializable & scala.Serializable)() -> interruptException);
                }
                catch (Exception e) {
                    this.error((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Received unknown error while closing producer", (Function0<Throwable>)(Function0 & Serializable & scala.Serializable)() -> e);
                }
            }
        }
        finally {
            this.consumerProvider().shutdown();
        }
    }

    private void ensureTopic(AdminZkClient adminZkClient) {
        try {
            adminZkClient.createTopic(this.config().topicName(), this.config().configuredNumPartitions(), this.config().configuredReplicationFactor(), this.config().toTopicProperties(), adminZkClient.createTopic$default$5(), adminZkClient.createTopic$default$6(), adminZkClient.createTopic$default$7(), adminZkClient.createTopic$default$8());
            this.info((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(31).append("Created topic ").append(this.config().topicName()).append(" with ").append(this.config().configuredNumPartitions()).append(" partitions").toString());
            return;
        }
        catch (TopicExistsException topicExistsException) {
            int numPartitions = BoxesRunTime.unboxToInt((Object)adminZkClient.numPartitions((Set<String>)((Set)Predef$.MODULE$.Set().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{this.config().topicName()})))).apply((Object)this.config().topicName()));
            if (numPartitions != this.config().configuredNumPartitions()) {
                this.warn((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(98).append("Topic ").append(this.config().topicName()).append(" already exists. Mismatch between existing partition count ").append(numPartitions).append(" ").append("and configured partition count ").append(this.config().configuredNumPartitions()).append(".").toString());
                return;
            }
            this.debug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(30).append("Topic ").append(this.config().topicName()).append(" exists with ").append(numPartitions).append(" partitions").toString());
            return;
        }
    }

    private int toPartition(TopicPartition topicPartition) {
        return Math.abs(topicPartition.hashCode()) % this.config().configuredNumPartitions();
    }

    public void addDurabilityEvent(AbstractDurabilityEvent event) {
        synchronized (this) {
            if (this.isShutdown()) {
                this.debug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(72).append("Durability Topic Manager service shutting down, ignoring append request ").append(event).toString());
            } else {
                this.debug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(45).append("DurabilityEvent: Sending ").append(event).append(" to durability topic").toString());
                this.liftedTree1$1(event);
            }
            return;
        }
    }

    private final /* synthetic */ Object liftedTree1$1(AbstractDurabilityEvent event$1) {
        Object object;
        try {
            if (this.ready()) {
                return this.producerProvider().get().send(new ProducerRecord(this.config().topicName(), Predef$.MODULE$.int2Integer(this.toPartition(event$1.topicPartition())), (Object)event$1.serializeKey(), (Object)event$1.serializeValue()));
            }
            object = this.queuedEvents().$colon$plus((Object)event$1, Queue$.MODULE$.canBuildFrom());
        }
        catch (Exception ex) {
            this.warn((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(33).append("Exception in addDurabilityEvent ").append(ex.getCause()).append(" ").append(ex.getMessage()).toString(), (Function0<Throwable>)(Function0 & Serializable & scala.Serializable)() -> ex);
            object = BoxedUnit.UNIT;
        }
        return object;
    }

    public DurabilityTopicManager(DurabilityTopicConfig config, DurabilityDB database, DurabilityEventsMaterialize materialize, Supplier<AdminZkClient> zkSupplier, Time time) {
        this.config = config;
        this.database = database;
        this.materialize = materialize;
        this.zkSupplier = zkSupplier;
        this.time = time;
        Logging.$init$(this);
        this.DURABILITY_CREATION_BACKOFF_MS = 5000L;
        this.ready = false;
        this.isShutdown = false;
        this.queuedEvents = (Queue)Queue$.MODULE$.apply((Seq)Nil$.MODULE$);
    }
}

