/*
 * Decompiled with CFR 0.152.
 */
package kafka.durability.topic;

import com.typesafe.scalalogging.Logger;
import java.io.Serializable;
import java.time.Duration;
import java.util.Collection;
import kafka.durability.db.DurabilityDB;
import kafka.durability.materialization.DurabilityEventsMaterialize;
import kafka.durability.topic.DurabilityTopicClient$;
import kafka.durability.topic.DurabilityTopicConfig;
import kafka.utils.Logging;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.KafkaThread;
import scala.Function0;
import scala.Function1;
import scala.Predef$;
import scala.collection.Seq;
import scala.collection.TraversableOnce;
import scala.collection.immutable.IndexedSeq$;
import scala.collection.immutable.Set;
import scala.jdk.CollectionConverters$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;

@ScalaSignature(bytes="\u0006\u0001\u0005ed\u0001\u0002\u000e\u001c\u0001\tB\u0001b\u000e\u0001\u0003\u0006\u0004%\t\u0001\u000f\u0005\t{\u0001\u0011\t\u0011)A\u0005s!Aa\b\u0001BC\u0002\u0013\u0005q\b\u0003\u0005F\u0001\t\u0005\t\u0015!\u0003A\u0011!1\u0005A!b\u0001\n\u00039\u0005\u0002\u0003(\u0001\u0005\u0003\u0005\u000b\u0011\u0002%\t\u000b=\u0003A\u0011\u0001)\t\u000fU\u0003\u0001\u0019!C\u0001-\"9!\f\u0001a\u0001\n\u0003Y\u0006BB1\u0001A\u0003&q\u000bC\u0004g\u0001\u0001\u0007I\u0011\u0001,\t\u000f\u001d\u0004\u0001\u0019!C\u0001Q\"1!\u000e\u0001Q!\n]Cq\u0001\u001c\u0001C\u0002\u0013%Q\u000e\u0003\u0004{\u0001\u0001\u0006IA\u001c\u0005\u0006w\u0002!\tA\u0016\u0005\u0006y\u0002!\t! \u0005\b}\u0002\u0011\r\u0011\"\u0003\u0000\u0011!\t9\u0001\u0001Q\u0001\n\u0005\u0005\u0001BCA\u0005\u0001!\u0015\r\u0011\"\u0001\u0002\f!1\u0011q\u0005\u0001\u0005BuDq!!\u000b\u0001\t\u0003\tY\u0003\u0003\u0005\u0002L\u0001!\taGA'\u0011)\t\u0019\u0006AI\u0001\n\u0003Y\u0012Q\u000b\u0005\t\u0003W\u0002A\u0011A\u000e\u0002n\t9B)\u001e:bE&d\u0017\u000e^=U_BL7mQ8ogVlWM\u001d\u0006\u00039u\tQ\u0001^8qS\u000eT!AH\u0010\u0002\u0015\u0011,(/\u00192jY&$\u0018PC\u0001!\u0003\u0015Y\u0017MZ6b\u0007\u0001\u0019B\u0001A\u0012*_A\u0011AeJ\u0007\u0002K)\ta%A\u0003tG\u0006d\u0017-\u0003\u0002)K\t1\u0011I\\=SK\u001a\u0004\"AK\u0017\u000e\u0003-R!\u0001L\u0010\u0002\u000bU$\u0018\u000e\\:\n\u00059Z#a\u0002'pO\u001eLgn\u001a\t\u0003aUj\u0011!\r\u0006\u0003eM\nA\u0001\\1oO*\tA'\u0001\u0003kCZ\f\u0017B\u0001\u001c2\u0005!\u0011VO\u001c8bE2,\u0017AB2p]\u001aLw-F\u0001:!\tQ4(D\u0001\u001c\u0013\ta4DA\u000bEkJ\f'-\u001b7jif$v\u000e]5d\u0007>tg-[4\u0002\u000f\r|gNZ5hA\u0005\u0011AMY\u000b\u0002\u0001B\u0011\u0011iQ\u0007\u0002\u0005*\u0011a(H\u0005\u0003\t\n\u0013A\u0002R;sC\nLG.\u001b;z\t\n\u000b1\u0001\u001a2!\u0003-i\u0017\r^3sS\u0006d\u0017N_3\u0016\u0003!\u0003\"!\u0013'\u000e\u0003)S!aS\u000f\u0002\u001f5\fG/\u001a:jC2L'0\u0019;j_:L!!\u0014&\u00037\u0011+(/\u00192jY&$\u00180\u0012<f]R\u001cX*\u0019;fe&\fG.\u001b>f\u00031i\u0017\r^3sS\u0006d\u0017N_3!\u0003\u0019a\u0014N\\5u}Q!\u0011KU*U!\tQ\u0004\u0001C\u00038\u000f\u0001\u0007\u0011\bC\u0003?\u000f\u0001\u0007\u0001\tC\u0003G\u000f\u0001\u0007\u0001*A\u0003sK\u0006$\u00170F\u0001X!\t!\u0003,\u0003\u0002ZK\t9!i\\8mK\u0006t\u0017!\u0003:fC\u0012Lx\fJ3r)\tav\f\u0005\u0002%;&\u0011a,\n\u0002\u0005+:LG\u000fC\u0004a\u0013\u0005\u0005\t\u0019A,\u0002\u0007a$\u0013'\u0001\u0004sK\u0006$\u0017\u0010\t\u0015\u0003\u0015\r\u0004\"\u0001\n3\n\u0005\u0015,#\u0001\u0003<pY\u0006$\u0018\u000e\\3\u0002\u0015\u0011|7\u000b[;uI><h.\u0001\be_NCW\u000f\u001e3po:|F%Z9\u0015\u0005qK\u0007b\u00021\r\u0003\u0003\u0005\raV\u0001\fI>\u001c\u0006.\u001e;e_^t\u0007\u0005\u000b\u0002\u000eG\u0006q1m\u001c8tk6,'\u000f\u00165sK\u0006$W#\u00018\u0011\u0005=DX\"\u00019\u000b\u00051\n(B\u0001:t\u0003\u0019\u0019w.\\7p]*\u0011\u0001\u0005\u001e\u0006\u0003kZ\fa!\u00199bG\",'\"A<\u0002\u0007=\u0014x-\u0003\u0002za\nY1*\u00194lCRC'/Z1e\u0003=\u0019wN\\:v[\u0016\u0014H\u000b\u001b:fC\u0012\u0004\u0013aB5t%\u0016\fG-_\u0001\tg\",H\u000fZ8x]R\tA,\u0001\u0006dY&,g\u000e\u001e+za\u0016,\"!!\u0001\u0011\u0007A\n\u0019!C\u0002\u0002\u0006E\u0012aa\u0015;sS:<\u0017aC2mS\u0016tG\u000fV=qK\u0002\n\u0001bY8ogVlWM]\u000b\u0003\u0003\u001b\u0001\u0002\"a\u0004\u0002\u0018\u0005m\u00111D\u0007\u0003\u0003#QA!!\u0003\u0002\u0014)\u0019\u0011QC:\u0002\u000f\rd\u0017.\u001a8ug&!\u0011\u0011DA\t\u0005!\u0019uN\\:v[\u0016\u0014\b#\u0002\u0013\u0002\u001e\u0005\u0005\u0012bAA\u0010K\t)\u0011I\u001d:bsB\u0019A%a\t\n\u0007\u0005\u0015RE\u0001\u0003CsR,\u0017a\u0001:v]\u0006Q\u0001/\u0019:uSRLwN\\:\u0015\u0005\u00055\u0002CBA\u0018\u0003{\t\u0019E\u0004\u0003\u00022\u0005e\u0002cAA\u001aK5\u0011\u0011Q\u0007\u0006\u0004\u0003o\t\u0013A\u0002\u001fs_>$h(C\u0002\u0002<\u0015\na\u0001\u0015:fI\u00164\u0017\u0002BA \u0003\u0003\u00121aU3u\u0015\r\tY$\n\t\u0005\u0003\u000b\n9%D\u0001r\u0013\r\tI%\u001d\u0002\u000f)>\u0004\u0018n\u0019)beRLG/[8o\u00035\u0019H/\u0019:u\u0007>t7/^7feR\u0019A,a\u0014\t\u0011\u0005Es\u0003%AA\u0002]\u000b!c\u001d;beR\u001cuN\\:v[\u0016$\u0006N]3bI\u000692\u000f^1si\u000e{gn];nKJ$C-\u001a4bk2$H%M\u000b\u0003\u0003/R3aVA-W\t\tY\u0006\u0005\u0003\u0002^\u0005\u001dTBAA0\u0015\u0011\t\t'a\u0019\u0002\u0013Ut7\r[3dW\u0016$'bAA3K\u0005Q\u0011M\u001c8pi\u0006$\u0018n\u001c8\n\t\u0005%\u0014q\f\u0002\u0012k:\u001c\u0007.Z2lK\u00124\u0016M]5b]\u000e,\u0017A\u00049s_\u000e,7o\u001d*fG>\u0014Hm\u001d\u000b\u00049\u0006=\u0004bBA93\u0001\u0007\u00111O\u0001\be\u0016\u001cwN\u001d3t!!\ty!!\u001e\u0002\u001c\u0005m\u0011\u0002BA<\u0003#\u0011qbQ8ogVlWM\u001d*fG>\u0014Hm\u001d")
public class DurabilityTopicConsumer
implements Logging,
Runnable {
    private Consumer<byte[], byte[]> consumer;
    private final DurabilityTopicConfig config;
    private final DurabilityDB db;
    private final DurabilityEventsMaterialize materialize;
    private volatile boolean ready;
    private volatile boolean doShutdown;
    private final KafkaThread consumerThread;
    private final String clientType;
    private Logger logger;
    private String logIdent;
    private volatile byte bitmap$0;

    @Override
    public String loggerName() {
        return Logging.loggerName$(this);
    }

    @Override
    public String msgWithLogIdent(String msg) {
        return Logging.msgWithLogIdent$(this, msg);
    }

    @Override
    public void trace(Function0<String> msg) {
        Logging.trace$(this, msg);
    }

    @Override
    public void trace(Function0<String> msg, Function0<Throwable> e) {
        Logging.trace$(this, msg, e);
    }

    @Override
    public boolean isDebugEnabled() {
        return Logging.isDebugEnabled$(this);
    }

    @Override
    public boolean isTraceEnabled() {
        return Logging.isTraceEnabled$(this);
    }

    @Override
    public void debug(Function0<String> msg) {
        Logging.debug$(this, msg);
    }

    @Override
    public void debug(Function0<String> msg, Function0<Throwable> e) {
        Logging.debug$(this, msg, e);
    }

    @Override
    public void info(Function0<String> msg) {
        Logging.info$(this, msg);
    }

    @Override
    public void info(Function0<String> msg, Function0<Throwable> e) {
        Logging.info$(this, msg, e);
    }

    @Override
    public void warn(Function0<String> msg) {
        Logging.warn$(this, msg);
    }

    @Override
    public void warn(Function0<String> msg, Function0<Throwable> e) {
        Logging.warn$(this, msg, e);
    }

    @Override
    public void error(Function0<String> msg) {
        Logging.error$(this, msg);
    }

    @Override
    public void error(Function0<String> msg, Function0<Throwable> e) {
        Logging.error$(this, msg, e);
    }

    @Override
    public void fatal(Function0<String> msg) {
        Logging.fatal$(this, msg);
    }

    @Override
    public void fatal(Function0<String> msg, Function0<Throwable> e) {
        Logging.fatal$(this, msg, e);
    }

    private Logger logger$lzycompute() {
        synchronized (this) {
            if ((byte)(this.bitmap$0 & 2) == 0) {
                this.logger = Logging.logger$(this);
                this.bitmap$0 = (byte)(this.bitmap$0 | 2);
            }
        }
        return this.logger;
    }

    @Override
    public Logger logger() {
        if ((byte)(this.bitmap$0 & 2) == 0) {
            return this.logger$lzycompute();
        }
        return this.logger;
    }

    @Override
    public String logIdent() {
        return this.logIdent;
    }

    @Override
    public void logIdent_$eq(String x$1) {
        this.logIdent = x$1;
    }

    public DurabilityTopicConfig config() {
        return this.config;
    }

    public DurabilityDB db() {
        return this.db;
    }

    public DurabilityEventsMaterialize materialize() {
        return this.materialize;
    }

    public boolean ready() {
        return this.ready;
    }

    public void ready_$eq(boolean x$1) {
        this.ready = x$1;
    }

    public boolean doShutdown() {
        return this.doShutdown;
    }

    public void doShutdown_$eq(boolean x$1) {
        this.doShutdown = x$1;
    }

    private KafkaThread consumerThread() {
        return this.consumerThread;
    }

    public boolean isReady() {
        return this.ready();
    }

    public synchronized void shutdown() {
        if (this.ready()) {
            this.doShutdown_$eq(true);
            this.consumer().wakeup();
            try {
                this.consumerThread().join();
                return;
            }
            catch (Exception ex) {
                this.error((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "DurabilityTopicConsumer shutdown interrupted, logging", (Function0<Throwable>)(Function0 & Serializable & scala.Serializable)() -> ex);
                return;
            }
        }
    }

    private String clientType() {
        return this.clientType;
    }

    private Consumer<byte[], byte[]> consumer$lzycompute() {
        synchronized (this) {
            if ((byte)(this.bitmap$0 & 1) == 0) {
                String clientId = DurabilityTopicClient$.MODULE$.clientId(this.clientType(), this.config().clusterId(), this.config().brokerId(), 0);
                this.consumer = new KafkaConsumer(this.config().toConsumerProperties(clientId));
                this.bitmap$0 = (byte)(this.bitmap$0 | 1);
            }
        }
        return this.consumer;
    }

    public Consumer<byte[], byte[]> consumer() {
        if ((byte)(this.bitmap$0 & 1) == 0) {
            return this.consumer$lzycompute();
        }
        return this.consumer;
    }

    @Override
    public void run() {
        try {
            try {
                while (!this.doShutdown()) {
                    ConsumerRecords records = this.consumer().poll(Duration.ofMillis(this.config().pollDurationMs()));
                    if (records == null) continue;
                    this.processRecords((ConsumerRecords<byte[], byte[]>)records);
                }
            }
            catch (Exception e) {
                if (this.doShutdown()) {
                    this.debug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Exception caught during shutdown", (Function0<Throwable>)(Function0 & Serializable & scala.Serializable)() -> e);
                } else {
                    this.error((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Exception in TierTopicConsumer", (Function0<Throwable>)(Function0 & Serializable & scala.Serializable)() -> e);
                }
            }
        }
        finally {
            this.ready_$eq(false);
            this.consumer().close();
        }
    }

    public Set<TopicPartition> partitions() {
        return ((TraversableOnce)RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(0), this.config().configuredNumPartitions() - 1).map((Function1 & Serializable & scala.Serializable)x -> DurabilityTopicConsumer.$anonfun$partitions$1(this, BoxesRunTime.unboxToInt((Object)x)), IndexedSeq$.MODULE$.canBuildFrom())).toSet();
    }

    public void startConsumer(boolean startConsumeThread) {
        Set<TopicPartition> durabilityTopicPartitions = this.partitions();
        this.consumer().assign((Collection)CollectionConverters$.MODULE$.seqAsJavaListConverter((Seq)durabilityTopicPartitions.toList()).asJava());
        Seq<Object> offsets = this.db().getDurabilityTopicPartitionOffsets();
        durabilityTopicPartitions.foreach((Function1 & Serializable & scala.Serializable)topicPartition -> {
            DurabilityTopicConsumer.$anonfun$startConsumer$1(this, offsets, topicPartition);
            return BoxedUnit.UNIT;
        });
        if (startConsumeThread) {
            this.consumerThread().start();
        }
        this.ready_$eq(true);
    }

    public boolean startConsumer$default$1() {
        return true;
    }

    public void processRecords(ConsumerRecords<byte[], byte[]> records) {
        records.forEach(record -> {
            try {
                this.materialize().materialize((ConsumerRecord<byte[], byte[]>)record);
                this.db().updateDurabilityTopicPartitionOffset(record.partition(), record.offset() + 1L);
                return;
            }
            catch (Exception ex) {
                this.warn((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(99).append("Ignoring unsupported record ").append(record).append(" during materialization. Could be due to ").append("incompatibility during upgrade").toString(), (Function0<Throwable>)(Function0 & Serializable & scala.Serializable)() -> ex);
                return;
            }
        });
    }

    public static final /* synthetic */ TopicPartition $anonfun$partitions$1(DurabilityTopicConsumer $this, int x) {
        return new TopicPartition($this.config().topicName(), x);
    }

    public static final /* synthetic */ void $anonfun$startConsumer$1(DurabilityTopicConsumer $this, Seq offsets$1, TopicPartition topicPartition) {
        $this.info((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(53).append("seeking durability consumer for partition ").append(topicPartition.partition()).append(" to offset ").append(offsets$1.apply(topicPartition.partition())).toString());
        $this.consumer().seek(topicPartition, BoxesRunTime.unboxToLong((Object)offsets$1.apply(topicPartition.partition())));
    }

    public DurabilityTopicConsumer(DurabilityTopicConfig config, DurabilityDB db, DurabilityEventsMaterialize materialize) {
        this.config = config;
        this.db = db;
        this.materialize = materialize;
        Logging.$init$(this);
        this.ready = false;
        this.doShutdown = false;
        this.consumerThread = new KafkaThread("DurabilityTopicConsumer", (Runnable)this, false);
        this.clientType = "consumer";
    }
}

