/*
 * Decompiled with CFR 0.152.
 */
package kafka.tools;

import com.typesafe.scalalogging.Logger;
import com.yammer.metrics.core.Gauge;
import com.yammer.metrics.core.Histogram;
import com.yammer.metrics.core.Meter;
import com.yammer.metrics.core.MetricName;
import com.yammer.metrics.core.Timer;
import java.io.Serializable;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import kafka.metrics.KafkaMetricsGroup;
import kafka.tools.MirrorMaker;
import kafka.utils.CommandLineUtils$;
import kafka.utils.Logging;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.utils.Time;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.collection.IterableOnce;
import scala.collection.Map;
import scala.collection.StringOps$;
import scala.collection.immutable.IndexedSeq;
import scala.collection.immutable.Seq;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.IntRef;
import scala.runtime.RichInt$;
import scala.runtime.ScalaRunTime$;
import scala.util.Failure;
import scala.util.Success;
import scala.util.Try;
import scala.util.Try$;
import scala.util.control.ControlThrowable;

public final class MirrorMaker$
implements KafkaMetricsGroup {
    public static final MirrorMaker$ MODULE$ = new MirrorMaker$();
    private static MirrorMaker.MirrorMakerProducer producer = null;
    private static Seq<MirrorMaker.MirrorMakerThread> kafka$tools$MirrorMaker$$mirrorMakerThreads = null;
    private static final AtomicBoolean kafka$tools$MirrorMaker$$isShuttingDown = new AtomicBoolean(false);
    private static final AtomicInteger kafka$tools$MirrorMaker$$numDroppedMessages = new AtomicInteger(0);
    private static MirrorMaker.MirrorMakerMessageHandler kafka$tools$MirrorMaker$$messageHandler = null;
    private static int kafka$tools$MirrorMaker$$offsetCommitIntervalMs = 0;
    private static boolean kafka$tools$MirrorMaker$$abortOnSendFailure = true;
    private static volatile boolean kafka$tools$MirrorMaker$$exitingOnSendFailure = false;
    private static long lastSuccessfulCommitTime = -1L;
    private static final Time time = Time.SYSTEM;
    private static Logger logger;
    private static String logIdent;
    private static volatile boolean bitmap$0;

    static {
        KafkaMetricsGroup.newGauge$(MODULE$, "MirrorMaker-numDroppedMessages", (Gauge)new Gauge<Object>(){

            public final int value() {
                return MirrorMaker$.kafka$tools$MirrorMaker$$$anonfun$new$1();
            }
        }, KafkaMetricsGroup.newGauge$default$3$(MODULE$));
    }

    @Override
    public MetricName metricName(String name, Map<String, String> tags) {
        return KafkaMetricsGroup.metricName$(this, name, tags);
    }

    @Override
    public MetricName explicitMetricName(String group, String typeName, String name, Map<String, String> tags) {
        return KafkaMetricsGroup.explicitMetricName$(this, group, typeName, name, tags);
    }

    @Override
    public <T> Gauge<T> newGauge(String name, Gauge<T> metric, Map<String, String> tags) {
        return KafkaMetricsGroup.newGauge$(this, name, metric, tags);
    }

    @Override
    public <T> Map<String, String> newGauge$default$3() {
        return KafkaMetricsGroup.newGauge$default$3$(this);
    }

    @Override
    public Meter newMeter(String name, String eventType, TimeUnit timeUnit, Map<String, String> tags) {
        return KafkaMetricsGroup.newMeter$(this, name, eventType, timeUnit, tags);
    }

    @Override
    public Map<String, String> newMeter$default$4() {
        return KafkaMetricsGroup.newMeter$default$4$(this);
    }

    @Override
    public Histogram newHistogram(String name, boolean biased, Map<String, String> tags) {
        return KafkaMetricsGroup.newHistogram$(this, name, biased, tags);
    }

    @Override
    public boolean newHistogram$default$2() {
        return KafkaMetricsGroup.newHistogram$default$2$(this);
    }

    @Override
    public Map<String, String> newHistogram$default$3() {
        return KafkaMetricsGroup.newHistogram$default$3$(this);
    }

    @Override
    public Timer newTimer(String name, TimeUnit durationUnit, TimeUnit rateUnit, Map<String, String> tags) {
        return KafkaMetricsGroup.newTimer$(this, name, durationUnit, rateUnit, tags);
    }

    @Override
    public Map<String, String> newTimer$default$4() {
        return KafkaMetricsGroup.newTimer$default$4$(this);
    }

    @Override
    public void removeMetric(String name, Map<String, String> tags) {
        KafkaMetricsGroup.removeMetric$(this, name, tags);
    }

    @Override
    public Map<String, String> removeMetric$default$2() {
        return KafkaMetricsGroup.removeMetric$default$2$(this);
    }

    @Override
    public String loggerName() {
        return Logging.loggerName$(this);
    }

    @Override
    public String msgWithLogIdent(String msg) {
        return Logging.msgWithLogIdent$(this, msg);
    }

    @Override
    public void trace(Function0<String> msg) {
        Logging.trace$(this, msg);
    }

    @Override
    public void trace(Function0<String> msg, Function0<Throwable> e) {
        Logging.trace$(this, msg, e);
    }

    @Override
    public boolean isDebugEnabled() {
        return Logging.isDebugEnabled$(this);
    }

    @Override
    public boolean isTraceEnabled() {
        return Logging.isTraceEnabled$(this);
    }

    @Override
    public void debug(Function0<String> msg) {
        Logging.debug$(this, msg);
    }

    @Override
    public void debug(Function0<String> msg, Function0<Throwable> e) {
        Logging.debug$(this, msg, e);
    }

    @Override
    public void info(Function0<String> msg) {
        Logging.info$(this, msg);
    }

    @Override
    public void info(Function0<String> msg, Function0<Throwable> e) {
        Logging.info$(this, msg, e);
    }

    @Override
    public void warn(Function0<String> msg) {
        Logging.warn$(this, msg);
    }

    @Override
    public void warn(Function0<String> msg, Function0<Throwable> e) {
        Logging.warn$(this, msg, e);
    }

    @Override
    public void error(Function0<String> msg) {
        Logging.error$(this, msg);
    }

    @Override
    public void error(Function0<String> msg, Function0<Throwable> e) {
        Logging.error$(this, msg, e);
    }

    @Override
    public void fatal(Function0<String> msg) {
        Logging.fatal$(this, msg);
    }

    @Override
    public void fatal(Function0<String> msg, Function0<Throwable> e) {
        Logging.fatal$(this, msg, e);
    }

    private Logger logger$lzycompute() {
        synchronized (this) {
            if (!bitmap$0) {
                logger = Logging.logger$(this);
                bitmap$0 = true;
            }
        }
        return logger;
    }

    @Override
    public Logger logger() {
        if (!bitmap$0) {
            return this.logger$lzycompute();
        }
        return logger;
    }

    @Override
    public String logIdent() {
        return logIdent;
    }

    @Override
    public void logIdent_$eq(String x$1) {
        logIdent = x$1;
    }

    public MirrorMaker.MirrorMakerProducer producer() {
        return producer;
    }

    public void producer_$eq(MirrorMaker.MirrorMakerProducer x$1) {
        producer = x$1;
    }

    private Seq<MirrorMaker.MirrorMakerThread> mirrorMakerThreads() {
        return kafka$tools$MirrorMaker$$mirrorMakerThreads;
    }

    public void kafka$tools$MirrorMaker$$mirrorMakerThreads_$eq(Seq<MirrorMaker.MirrorMakerThread> x$1) {
        kafka$tools$MirrorMaker$$mirrorMakerThreads = x$1;
    }

    public AtomicBoolean kafka$tools$MirrorMaker$$isShuttingDown() {
        return kafka$tools$MirrorMaker$$isShuttingDown;
    }

    public AtomicInteger kafka$tools$MirrorMaker$$numDroppedMessages() {
        return kafka$tools$MirrorMaker$$numDroppedMessages;
    }

    public MirrorMaker.MirrorMakerMessageHandler kafka$tools$MirrorMaker$$messageHandler() {
        return kafka$tools$MirrorMaker$$messageHandler;
    }

    public void kafka$tools$MirrorMaker$$messageHandler_$eq(MirrorMaker.MirrorMakerMessageHandler x$1) {
        kafka$tools$MirrorMaker$$messageHandler = x$1;
    }

    public int kafka$tools$MirrorMaker$$offsetCommitIntervalMs() {
        return kafka$tools$MirrorMaker$$offsetCommitIntervalMs;
    }

    public void kafka$tools$MirrorMaker$$offsetCommitIntervalMs_$eq(int x$1) {
        kafka$tools$MirrorMaker$$offsetCommitIntervalMs = x$1;
    }

    public boolean kafka$tools$MirrorMaker$$abortOnSendFailure() {
        return kafka$tools$MirrorMaker$$abortOnSendFailure;
    }

    public void kafka$tools$MirrorMaker$$abortOnSendFailure_$eq(boolean x$1) {
        kafka$tools$MirrorMaker$$abortOnSendFailure = x$1;
    }

    public boolean kafka$tools$MirrorMaker$$exitingOnSendFailure() {
        return kafka$tools$MirrorMaker$$exitingOnSendFailure;
    }

    public void kafka$tools$MirrorMaker$$exitingOnSendFailure_$eq(boolean x$1) {
        kafka$tools$MirrorMaker$$exitingOnSendFailure = x$1;
    }

    private long lastSuccessfulCommitTime() {
        return lastSuccessfulCommitTime;
    }

    private void lastSuccessfulCommitTime_$eq(long x$1) {
        lastSuccessfulCommitTime = x$1;
    }

    private Time time() {
        return time;
    }

    public void main(String[] args) {
        block5: {
            if (this.logger().underlying().isWarnEnabled()) {
                String msgWithLogIdent_msg = "This tool is deprecated and may be removed in a future major release.";
                Object var4_2 = null;
                this.logger().underlying().warn(Logging.msgWithLogIdent$(this, msgWithLogIdent_msg));
            }
            if (this.logger().underlying().isInfoEnabled()) {
                String msgWithLogIdent_msg = "Starting mirror maker";
                Object var5_3 = null;
                this.logger().underlying().info(Logging.msgWithLogIdent$(this, msgWithLogIdent_msg));
            }
            try {
                MirrorMaker.MirrorMakerOptions opts = new MirrorMaker.MirrorMakerOptions(args);
                CommandLineUtils$.MODULE$.printHelpAndExitIfNeeded(opts, "This tool helps to continuously copy data between two Kafka clusters.");
                opts.checkArgs();
            }
            catch (ControlThrowable controlThrowable) {
                throw controlThrowable;
            }
            catch (Throwable t) {
                if (!this.logger().underlying().isErrorEnabled()) break block5;
                String msgWithLogIdent_msg = "Exception when starting mirror maker.";
                Object var6_6 = null;
                this.logger().underlying().error(Logging.msgWithLogIdent$(this, msgWithLogIdent_msg), t);
            }
        }
        this.mirrorMakerThreads().foreach((Function1 & Serializable)x$1 -> {
            x$1.start();
            return BoxedUnit.UNIT;
        });
        this.mirrorMakerThreads().foreach((Function1 & Serializable)x$2 -> {
            x$2.awaitShutdown();
            return BoxedUnit.UNIT;
        });
    }

    public Seq<MirrorMaker.ConsumerWrapper> createConsumers(int numStreams, Properties consumerConfigProps, Option<ConsumerRebalanceListener> customRebalanceListener, Option<String> include) {
        this.kafka$tools$MirrorMaker$$maybeSetDefaultProperty(consumerConfigProps, "enable.auto.commit", "false");
        consumerConfigProps.setProperty("key.deserializer", ByteArrayDeserializer.class.getName());
        consumerConfigProps.setProperty("value.deserializer", ByteArrayDeserializer.class.getName());
        String groupIdString = consumerConfigProps.getProperty("group.id");
        IndexedSeq consumers = RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), numStreams).map((Function1 & Serializable)i -> MirrorMaker$.$anonfun$createConsumers$1(consumerConfigProps, groupIdString, BoxesRunTime.unboxToInt((Object)i)));
        include.getOrElse((Function0 & Serializable)() -> {
            throw new IllegalArgumentException("include list cannot be empty");
        });
        return (Seq)consumers.map((Function1 & Serializable)consumer -> new MirrorMaker.ConsumerWrapper((Consumer<byte[], byte[]>)consumer, customRebalanceListener, include));
    }

    public void commitOffsets(MirrorMaker.ConsumerWrapper consumerWrapper) {
        if (!this.kafka$tools$MirrorMaker$$exitingOnSendFailure()) {
            int n = 0;
            boolean retryNeeded = true;
            while (retryNeeded) {
                if (this.logger().underlying().isTraceEnabled()) {
                    String msgWithLogIdent_msg = "Committing offsets.";
                    Object var7_7 = null;
                    this.logger().underlying().trace(Logging.msgWithLogIdent$(this, msgWithLogIdent_msg));
                }
                try {
                    consumerWrapper.commit();
                    this.lastSuccessfulCommitTime_$eq(this.time().milliseconds());
                    retryNeeded = false;
                }
                catch (WakeupException e) {
                    this.commitOffsets(consumerWrapper);
                    throw e;
                }
                catch (TimeoutException timeoutException) {
                    Try try_ = Try$.MODULE$.apply((Function0 & Serializable)() -> consumerWrapper.consumer().listTopics());
                    if (try_ instanceof Success) {
                        java.util.Map visibleTopics = (java.util.Map)((Success)try_).value();
                        consumerWrapper.offsets().$minus$minus$eq((IterableOnce)consumerWrapper.offsets().keySet().filter((Function1 & Serializable)tp -> BoxesRunTime.boxToBoolean((boolean)MirrorMaker$.$anonfun$commitOffsets$3(visibleTopics, tp))));
                    } else if (try_ instanceof Failure) {
                        Throwable e = ((Failure)try_).exception();
                        if (this.logger().underlying().isWarnEnabled()) {
                            String msgWithLogIdent_msg = "Failed to list all authorized topics after committing offsets timed out: ";
                            Object var8_8 = null;
                            this.logger().underlying().warn(Logging.msgWithLogIdent$(this, msgWithLogIdent_msg), e);
                        }
                    } else {
                        throw new MatchError((Object)try_);
                    }
                    ++n;
                    if (this.logger().underlying().isWarnEnabled()) {
                        String msgWithLogIdent_msg = new StringBuilder(263).append("Failed to commit offsets because the offset commit request processing can not be completed in time. ").append("If you see this regularly, it could indicate that you need to increase the consumer's ").append("default.api.timeout.ms").append(" ").append("Last successful offset commit timestamp=").append(MODULE$.lastSuccessfulCommitTime()).append(", retry count=").append(n).toString();
                        Object var9_9 = null;
                        this.logger().underlying().warn(Logging.msgWithLogIdent$(this, msgWithLogIdent_msg));
                    }
                    Thread.sleep(100L);
                }
                catch (CommitFailedException commitFailedException) {
                    retryNeeded = false;
                    if (!this.logger().underlying().isWarnEnabled()) continue;
                    String msgWithLogIdent_msg = MirrorMaker$.$anonfun$commitOffsets$7();
                    Object var10_10 = null;
                    this.logger().underlying().warn(Logging.msgWithLogIdent$(this, msgWithLogIdent_msg));
                }
            }
            return;
        }
        if (this.logger().underlying().isInfoEnabled()) {
            String msgWithLogIdent_msg = "Exiting on send failure, skip committing offsets.";
            Object var11_12 = null;
            this.logger().underlying().info(Logging.msgWithLogIdent$(this, msgWithLogIdent_msg));
            return;
        }
    }

    public void cleanShutdown() {
        if (this.kafka$tools$MirrorMaker$$isShuttingDown().compareAndSet(false, true)) {
            if (this.logger().underlying().isInfoEnabled()) {
                String msgWithLogIdent_msg = "Start clean shutdown.";
                Object var1_1 = null;
                this.logger().underlying().info(Logging.msgWithLogIdent$(this, msgWithLogIdent_msg));
            }
            if (this.logger().underlying().isInfoEnabled()) {
                String msgWithLogIdent_msg = "Shutting down consumer threads.";
                Object var2_2 = null;
                this.logger().underlying().info(Logging.msgWithLogIdent$(this, msgWithLogIdent_msg));
            }
            if (this.mirrorMakerThreads() != null) {
                this.mirrorMakerThreads().foreach((Function1 & Serializable)x$3 -> {
                    x$3.shutdown();
                    return BoxedUnit.UNIT;
                });
                this.mirrorMakerThreads().foreach((Function1 & Serializable)x$4 -> {
                    x$4.awaitShutdown();
                    return BoxedUnit.UNIT;
                });
            }
            if (this.logger().underlying().isInfoEnabled()) {
                String msgWithLogIdent_msg = "Closing producer.";
                Object var3_3 = null;
                this.logger().underlying().info(Logging.msgWithLogIdent$(this, msgWithLogIdent_msg));
            }
            this.producer().close();
            if (this.logger().underlying().isInfoEnabled()) {
                String msgWithLogIdent_msg = "Kafka mirror maker shutdown successfully";
                Object var4_4 = null;
                this.logger().underlying().info(Logging.msgWithLogIdent$(this, msgWithLogIdent_msg));
                return;
            }
            return;
        }
    }

    public void kafka$tools$MirrorMaker$$maybeSetDefaultProperty(Properties properties, String propertyName, String defaultValue) {
        String propertyValue = properties.getProperty(propertyName);
        properties.setProperty(propertyName, (String)Option$.MODULE$.apply((Object)propertyValue).getOrElse((Function0 & Serializable)() -> defaultValue));
        String string = properties.getProperty(propertyName);
        if (string == null ? defaultValue != null : !string.equals(defaultValue)) {
            if (this.logger().underlying().isInfoEnabled()) {
                String msgWithLogIdent_msg = StringOps$.MODULE$.format$extension(Predef$.MODULE$.augmentString("Property %s is overridden to %s - data loss or message reordering is possible."), (Seq)ScalaRunTime$.MODULE$.genericWrapArray((Object)new Object[]{propertyName, propertyValue}));
                Object var5_5 = null;
                this.logger().underlying().info(Logging.msgWithLogIdent$(this, msgWithLogIdent_msg));
                return;
            }
            return;
        }
    }

    public static final /* synthetic */ int kafka$tools$MirrorMaker$$$anonfun$new$1() {
        return MODULE$.kafka$tools$MirrorMaker$$numDroppedMessages().get();
    }

    public static final /* synthetic */ String $anonfun$main$1() {
        return "This tool is deprecated and may be removed in a future major release.";
    }

    public static final /* synthetic */ String $anonfun$main$2() {
        return "Starting mirror maker";
    }

    public static final /* synthetic */ String $anonfun$main$3() {
        return "Exception when starting mirror maker.";
    }

    public static final /* synthetic */ Throwable $anonfun$main$4(Throwable t$1) {
        return t$1;
    }

    public static final /* synthetic */ KafkaConsumer $anonfun$createConsumers$1(Properties consumerConfigProps$1, String groupIdString$1, int i) {
        consumerConfigProps$1.setProperty("client.id", new StringBuilder(1).append(groupIdString$1).append("-").append(Integer.toString(i)).toString());
        return new KafkaConsumer(consumerConfigProps$1);
    }

    public static final /* synthetic */ String $anonfun$commitOffsets$1() {
        return "Committing offsets.";
    }

    public static final /* synthetic */ boolean $anonfun$commitOffsets$3(java.util.Map visibleTopics$1, TopicPartition tp) {
        return !visibleTopics$1.containsKey(tp.topic());
    }

    public static final /* synthetic */ String $anonfun$commitOffsets$4() {
        return "Failed to list all authorized topics after committing offsets timed out: ";
    }

    public static final /* synthetic */ Throwable $anonfun$commitOffsets$5(Throwable e$1) {
        return e$1;
    }

    public static final /* synthetic */ String $anonfun$commitOffsets$6(IntRef retry$1) {
        return new StringBuilder(263).append("Failed to commit offsets because the offset commit request processing can not be completed in time. ").append("If you see this regularly, it could indicate that you need to increase the consumer's ").append("default.api.timeout.ms").append(" ").append("Last successful offset commit timestamp=").append(MODULE$.lastSuccessfulCommitTime()).append(", retry count=").append(retry$1.elem).toString();
    }

    public static final /* synthetic */ String $anonfun$commitOffsets$7() {
        return new StringBuilder(303).append("Failed to commit offsets because the consumer group has rebalanced and assigned partitions to another instance. If you see this regularly, it could indicate that you need to either increase ").append("the consumer's ").append("session.timeout.ms").append(" or reduce the number of records ").append("handled on each iteration with ").append("max.poll.records").toString();
    }

    public static final /* synthetic */ String $anonfun$commitOffsets$8() {
        return "Exiting on send failure, skip committing offsets.";
    }

    public static final /* synthetic */ String $anonfun$cleanShutdown$1() {
        return "Start clean shutdown.";
    }

    public static final /* synthetic */ String $anonfun$cleanShutdown$2() {
        return "Shutting down consumer threads.";
    }

    public static final /* synthetic */ String $anonfun$cleanShutdown$5() {
        return "Closing producer.";
    }

    public static final /* synthetic */ String $anonfun$cleanShutdown$6() {
        return "Kafka mirror maker shutdown successfully";
    }

    private MirrorMaker$() {
    }
}

