/*
 * Decompiled with CFR 0.152.
 */
package kafka.tools;

import com.typesafe.scalalogging.Logger;
import com.yammer.metrics.core.Gauge;
import com.yammer.metrics.core.Histogram;
import com.yammer.metrics.core.Meter;
import com.yammer.metrics.core.MetricName;
import com.yammer.metrics.core.Timer;
import java.io.OutputStream;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import joptsimple.ArgumentAcceptingOptionSpec;
import joptsimple.OptionParser;
import joptsimple.OptionSet;
import joptsimple.OptionSpec;
import joptsimple.OptionSpecBuilder;
import kafka.metrics.KafkaMetricsGroup;
import kafka.metrics.KafkaMetricsGroup$class;
import kafka.tools.MirrorMaker;
import kafka.tools.MirrorMaker$defaultMirrorMakerMessageHandler$;
import kafka.utils.CommandLineUtils$;
import kafka.utils.CoreUtils$;
import kafka.utils.Logging$class;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import scala.Function0;
import scala.Function1;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Serializable;
import scala.Some;
import scala.StringContext;
import scala.collection.Map;
import scala.collection.Seq;
import scala.collection.immutable.IndexedSeq;
import scala.collection.immutable.IndexedSeq$;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.StringBuilder;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.IntRef;
import scala.runtime.Nothing$;
import scala.runtime.RichInt$;
import scala.sys.package$;
import scala.util.control.ControlThrowable;

public final class MirrorMaker$
implements KafkaMetricsGroup {
    public static final MirrorMaker$ MODULE$;
    private MirrorMaker.MirrorMakerProducer producer;
    private Seq<MirrorMaker.MirrorMakerThread> mirrorMakerThreads;
    private final AtomicBoolean kafka$tools$MirrorMaker$$isShuttingDown;
    private final AtomicInteger kafka$tools$MirrorMaker$$numDroppedMessages;
    private MirrorMaker.MirrorMakerMessageHandler kafka$tools$MirrorMaker$$messageHandler;
    private int kafka$tools$MirrorMaker$$offsetCommitIntervalMs;
    private boolean kafka$tools$MirrorMaker$$abortOnSendFailure;
    private volatile boolean kafka$tools$MirrorMaker$$exitingOnSendFailure;
    private long kafka$tools$MirrorMaker$$lastSuccessfulCommitTime;
    private final Time time;
    private final Logger logger;
    private String logIdent;
    private volatile boolean bitmap$0;

    static {
        new MirrorMaker$();
    }

    @Override
    public MetricName metricName(String name, Map<String, String> tags) {
        return KafkaMetricsGroup$class.metricName(this, name, tags);
    }

    @Override
    public MetricName explicitMetricName(String group, String typeName, String name, Map<String, String> tags) {
        return KafkaMetricsGroup$class.explicitMetricName(this, group, typeName, name, tags);
    }

    @Override
    public <T> Gauge<T> newGauge(String name, Gauge<T> metric, Map<String, String> tags) {
        return KafkaMetricsGroup$class.newGauge(this, name, metric, tags);
    }

    @Override
    public Meter newMeter(String name, String eventType, TimeUnit timeUnit, Map<String, String> tags) {
        return KafkaMetricsGroup$class.newMeter(this, name, eventType, timeUnit, tags);
    }

    @Override
    public Histogram newHistogram(String name, boolean biased, Map<String, String> tags) {
        return KafkaMetricsGroup$class.newHistogram(this, name, biased, tags);
    }

    @Override
    public Timer newTimer(String name, TimeUnit durationUnit, TimeUnit rateUnit, Map<String, String> tags) {
        return KafkaMetricsGroup$class.newTimer(this, name, durationUnit, rateUnit, tags);
    }

    @Override
    public void removeMetric(String name, Map<String, String> tags) {
        KafkaMetricsGroup$class.removeMetric(this, name, tags);
    }

    @Override
    public <T> Map<String, String> newGauge$default$3() {
        return KafkaMetricsGroup$class.newGauge$default$3(this);
    }

    @Override
    public Map<String, String> newMeter$default$4() {
        return KafkaMetricsGroup$class.newMeter$default$4(this);
    }

    @Override
    public Map<String, String> removeMetric$default$2() {
        return KafkaMetricsGroup$class.removeMetric$default$2(this);
    }

    @Override
    public Map<String, String> newTimer$default$4() {
        return KafkaMetricsGroup$class.newTimer$default$4(this);
    }

    @Override
    public boolean newHistogram$default$2() {
        return KafkaMetricsGroup$class.newHistogram$default$2(this);
    }

    @Override
    public Map<String, String> newHistogram$default$3() {
        return KafkaMetricsGroup$class.newHistogram$default$3(this);
    }

    private Logger logger$lzycompute() {
        MirrorMaker$ mirrorMaker$ = this;
        synchronized (mirrorMaker$) {
            if (!this.bitmap$0) {
                this.logger = Logging$class.logger(this);
                this.bitmap$0 = true;
            }
            return this.logger;
        }
    }

    @Override
    public Logger logger() {
        return this.bitmap$0 ? this.logger : this.logger$lzycompute();
    }

    @Override
    public String logIdent() {
        return this.logIdent;
    }

    @Override
    public void logIdent_$eq(String x$1) {
        this.logIdent = x$1;
    }

    @Override
    public String loggerName() {
        return Logging$class.loggerName(this);
    }

    @Override
    public String msgWithLogIdent(String msg) {
        return Logging$class.msgWithLogIdent(this, msg);
    }

    @Override
    public void trace(Function0<String> msg) {
        Logging$class.trace(this, msg);
    }

    @Override
    public void trace(Function0<String> msg, Function0<Throwable> e) {
        Logging$class.trace(this, msg, e);
    }

    @Override
    public boolean isDebugEnabled() {
        return Logging$class.isDebugEnabled(this);
    }

    @Override
    public boolean isTraceEnabled() {
        return Logging$class.isTraceEnabled(this);
    }

    @Override
    public void debug(Function0<String> msg) {
        Logging$class.debug(this, msg);
    }

    @Override
    public void debug(Function0<String> msg, Function0<Throwable> e) {
        Logging$class.debug(this, msg, e);
    }

    @Override
    public void info(Function0<String> msg) {
        Logging$class.info(this, msg);
    }

    @Override
    public void info(Function0<String> msg, Function0<Throwable> e) {
        Logging$class.info(this, msg, e);
    }

    @Override
    public void warn(Function0<String> msg) {
        Logging$class.warn(this, msg);
    }

    @Override
    public void warn(Function0<String> msg, Function0<Throwable> e) {
        Logging$class.warn(this, msg, e);
    }

    @Override
    public void error(Function0<String> msg) {
        Logging$class.error(this, msg);
    }

    @Override
    public void error(Function0<String> msg, Function0<Throwable> e) {
        Logging$class.error(this, msg, e);
    }

    @Override
    public void fatal(Function0<String> msg) {
        Logging$class.fatal(this, msg);
    }

    @Override
    public void fatal(Function0<String> msg, Function0<Throwable> e) {
        Logging$class.fatal(this, msg, e);
    }

    public MirrorMaker.MirrorMakerProducer producer() {
        return this.producer;
    }

    public void producer_$eq(MirrorMaker.MirrorMakerProducer x$1) {
        this.producer = x$1;
    }

    private Seq<MirrorMaker.MirrorMakerThread> mirrorMakerThreads() {
        return this.mirrorMakerThreads;
    }

    private void mirrorMakerThreads_$eq(Seq<MirrorMaker.MirrorMakerThread> x$1) {
        this.mirrorMakerThreads = x$1;
    }

    public AtomicBoolean kafka$tools$MirrorMaker$$isShuttingDown() {
        return this.kafka$tools$MirrorMaker$$isShuttingDown;
    }

    public AtomicInteger kafka$tools$MirrorMaker$$numDroppedMessages() {
        return this.kafka$tools$MirrorMaker$$numDroppedMessages;
    }

    public MirrorMaker.MirrorMakerMessageHandler kafka$tools$MirrorMaker$$messageHandler() {
        return this.kafka$tools$MirrorMaker$$messageHandler;
    }

    private void kafka$tools$MirrorMaker$$messageHandler_$eq(MirrorMaker.MirrorMakerMessageHandler x$1) {
        this.kafka$tools$MirrorMaker$$messageHandler = x$1;
    }

    public int kafka$tools$MirrorMaker$$offsetCommitIntervalMs() {
        return this.kafka$tools$MirrorMaker$$offsetCommitIntervalMs;
    }

    private void kafka$tools$MirrorMaker$$offsetCommitIntervalMs_$eq(int x$1) {
        this.kafka$tools$MirrorMaker$$offsetCommitIntervalMs = x$1;
    }

    public boolean kafka$tools$MirrorMaker$$abortOnSendFailure() {
        return this.kafka$tools$MirrorMaker$$abortOnSendFailure;
    }

    private void kafka$tools$MirrorMaker$$abortOnSendFailure_$eq(boolean x$1) {
        this.kafka$tools$MirrorMaker$$abortOnSendFailure = x$1;
    }

    public boolean kafka$tools$MirrorMaker$$exitingOnSendFailure() {
        return this.kafka$tools$MirrorMaker$$exitingOnSendFailure;
    }

    public void kafka$tools$MirrorMaker$$exitingOnSendFailure_$eq(boolean x$1) {
        this.kafka$tools$MirrorMaker$$exitingOnSendFailure = x$1;
    }

    public long kafka$tools$MirrorMaker$$lastSuccessfulCommitTime() {
        return this.kafka$tools$MirrorMaker$$lastSuccessfulCommitTime;
    }

    private void kafka$tools$MirrorMaker$$lastSuccessfulCommitTime_$eq(long x$1) {
        this.kafka$tools$MirrorMaker$$lastSuccessfulCommitTime = x$1;
    }

    private Time time() {
        return this.time;
    }

    public void main(String[] args) {
        Throwable throwable2;
        block8: {
            this.info((Function0<String>)new Serializable(){
                public static final long serialVersionUID = 0L;

                public final String apply() {
                    return "Starting mirror maker";
                }
            });
            try {
                String rebalanceListenerArgs;
                OptionParser parser = new OptionParser(false);
                ArgumentAcceptingOptionSpec consumerConfigOpt = parser.accepts("consumer.config", "Embedded consumer config for consuming from the source cluster.").withRequiredArg().describedAs("config file").ofType(String.class);
                parser.accepts("new.consumer", "DEPRECATED Use new consumer in mirror maker (this is the default so this option will be removed in a future version).");
                ArgumentAcceptingOptionSpec producerConfigOpt = parser.accepts("producer.config", "Embedded producer config.").withRequiredArg().describedAs("config file").ofType(String.class);
                ArgumentAcceptingOptionSpec numStreamsOpt = parser.accepts("num.streams", "Number of consumption streams.").withRequiredArg().describedAs("Number of threads").ofType(Integer.class).defaultsTo((Object)Predef$.MODULE$.int2Integer(1), (Object[])new Integer[0]);
                ArgumentAcceptingOptionSpec whitelistOpt = parser.accepts("whitelist", "Whitelist of topics to mirror.").withRequiredArg().describedAs("Java regex (String)").ofType(String.class);
                ArgumentAcceptingOptionSpec offsetCommitIntervalMsOpt = parser.accepts("offset.commit.interval.ms", "Offset commit interval in ms.").withRequiredArg().describedAs("offset commit interval in millisecond").ofType(Integer.class).defaultsTo((Object)Predef$.MODULE$.int2Integer(60000), (Object[])new Integer[0]);
                ArgumentAcceptingOptionSpec consumerRebalanceListenerOpt = parser.accepts("consumer.rebalance.listener", "The consumer rebalance listener to use for mirror maker consumer.").withRequiredArg().describedAs("A custom rebalance listener of type ConsumerRebalanceListener").ofType(String.class);
                ArgumentAcceptingOptionSpec rebalanceListenerArgsOpt = parser.accepts("rebalance.listener.args", "Arguments used by custom rebalance listener for mirror maker consumer.").withRequiredArg().describedAs("Arguments passed to custom rebalance listener constructor as a string.").ofType(String.class);
                ArgumentAcceptingOptionSpec messageHandlerOpt = parser.accepts("message.handler", "Message handler which will process every record in-between consumer and producer.").withRequiredArg().describedAs("A custom message handler of type MirrorMakerMessageHandler").ofType(String.class);
                ArgumentAcceptingOptionSpec messageHandlerArgsOpt = parser.accepts("message.handler.args", "Arguments used by custom message handler for mirror maker.").withRequiredArg().describedAs("Arguments passed to message handler constructor.").ofType(String.class);
                ArgumentAcceptingOptionSpec abortOnSendFailureOpt = parser.accepts("abort.on.send.failure", "Configure the mirror maker to exit on a failed send.").withRequiredArg().describedAs("Stop the entire mirror maker when a send failure occurs").ofType(String.class).defaultsTo((Object)"true", (Object[])new String[0]);
                OptionSpecBuilder helpOpt = parser.accepts("help", "Print this message.");
                if (args.length == 0) {
                    throw CommandLineUtils$.MODULE$.printUsageAndDie(parser, "Continuously copy data between two Kafka clusters.");
                }
                OptionSet options = parser.parse(args);
                if (options.has((OptionSpec)helpOpt)) {
                    parser.printHelpOn((OutputStream)System.out);
                    throw package$.MODULE$.exit(0);
                }
                CommandLineUtils$.MODULE$.checkRequiredArgs(parser, options, (Seq<OptionSpec<?>>)Predef$.MODULE$.wrapRefArray((Object[])new OptionSpec[]{consumerConfigOpt, producerConfigOpt}));
                Properties consumerProps = Utils.loadProps((String)((String)options.valueOf((OptionSpec)consumerConfigOpt)));
                if (options.has((OptionSpec)whitelistOpt)) {
                    if (!consumerProps.containsKey("partition.assignment.strategy")) {
                        System.err.println("WARNING: The default partition assignment strategy of the mirror maker will change from 'range' to 'roundrobin' in an upcoming release (so that better load balancing can be achieved). If you prefer to make this switch in advance of that release add the following to the corresponding config: 'partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor'");
                    }
                } else {
                    this.error((Function0<String>)new Serializable(){
                        public static final long serialVersionUID = 0L;

                        public final String apply() {
                            return "whitelist must be specified";
                        }
                    });
                    throw package$.MODULE$.exit(1);
                }
                this.kafka$tools$MirrorMaker$$abortOnSendFailure_$eq(new StringOps(Predef$.MODULE$.augmentString((String)options.valueOf((OptionSpec)abortOnSendFailureOpt))).toBoolean());
                this.kafka$tools$MirrorMaker$$offsetCommitIntervalMs_$eq((Integer)options.valueOf((OptionSpec)offsetCommitIntervalMsOpt));
                int numStreams = (Integer)options.valueOf((OptionSpec)numStreamsOpt);
                Runtime.getRuntime().addShutdownHook(new Thread(){

                    public void run() {
                        MirrorMaker$.MODULE$.cleanShutdown();
                    }
                });
                Properties producerProps = Utils.loadProps((String)((String)options.valueOf((OptionSpec)producerConfigOpt)));
                boolean sync = producerProps.getProperty("producer.type", "async").equals("sync");
                producerProps.remove("producer.type");
                this.maybeSetDefaultProperty(producerProps, "delivery.timeout.ms", ((Object)BoxesRunTime.boxToInteger((int)Integer.MAX_VALUE)).toString());
                this.maybeSetDefaultProperty(producerProps, "max.block.ms", ((Object)BoxesRunTime.boxToLong((long)Long.MAX_VALUE)).toString());
                this.maybeSetDefaultProperty(producerProps, "acks", "all");
                this.maybeSetDefaultProperty(producerProps, "max.in.flight.requests.per.connection", "1");
                producerProps.setProperty("key.serializer", ByteArraySerializer.class.getName());
                producerProps.setProperty("value.serializer", ByteArraySerializer.class.getName());
                this.producer_$eq(new MirrorMaker.MirrorMakerProducer(sync, producerProps));
                String customRebalanceListenerClass = (String)options.valueOf((OptionSpec)consumerRebalanceListenerOpt);
                None$ customRebalanceListener = customRebalanceListenerClass == null ? None$.MODULE$ : ((rebalanceListenerArgs = (String)options.valueOf((OptionSpec)rebalanceListenerArgsOpt)) == null ? new Some(CoreUtils$.MODULE$.createObject(customRebalanceListenerClass, (Seq<Object>)Predef$.MODULE$.wrapRefArray(new Object[0]))) : new Some(CoreUtils$.MODULE$.createObject(customRebalanceListenerClass, (Seq<Object>)Predef$.MODULE$.wrapRefArray(new Object[]{rebalanceListenerArgs}))));
                Seq<MirrorMaker.ConsumerWrapper> mirrorMakerConsumers = this.createConsumers(numStreams, consumerProps, (Option<ConsumerRebalanceListener>)customRebalanceListener, (Option<String>)Option$.MODULE$.apply(options.valueOf((OptionSpec)whitelistOpt)));
                this.mirrorMakerThreads_$eq((Seq<MirrorMaker.MirrorMakerThread>)((Seq)RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), numStreams).map((Function1)new Serializable(mirrorMakerConsumers){
                    public static final long serialVersionUID = 0L;
                    private final Seq mirrorMakerConsumers$1;

                    public final MirrorMaker.MirrorMakerThread apply(int i) {
                        return new MirrorMaker.MirrorMakerThread((MirrorMaker.ConsumerWrapper)this.mirrorMakerConsumers$1.apply(i), i);
                    }
                    {
                        this.mirrorMakerConsumers$1 = mirrorMakerConsumers$1;
                    }
                }, IndexedSeq$.MODULE$.canBuildFrom())));
                String customMessageHandlerClass = (String)options.valueOf((OptionSpec)messageHandlerOpt);
                String messageHandlerArgs = (String)options.valueOf((OptionSpec)messageHandlerArgsOpt);
                this.kafka$tools$MirrorMaker$$messageHandler_$eq(customMessageHandlerClass == null ? MirrorMaker$defaultMirrorMakerMessageHandler$.MODULE$ : (messageHandlerArgs == null ? (MirrorMaker.MirrorMakerMessageHandler)CoreUtils$.MODULE$.createObject(customMessageHandlerClass, (Seq<Object>)Predef$.MODULE$.wrapRefArray(new Object[0])) : (MirrorMaker.MirrorMakerMessageHandler)CoreUtils$.MODULE$.createObject(customMessageHandlerClass, (Seq<Object>)Predef$.MODULE$.wrapRefArray(new Object[]{messageHandlerArgs}))));
            }
            catch (Throwable throwable2) {
                Throwable throwable3 = throwable2;
                if (throwable3 instanceof ControlThrowable) {
                    ControlThrowable controlThrowable = (ControlThrowable)throwable3;
                    throw (Throwable)controlThrowable;
                }
                if (throwable3 == null) break block8;
                Throwable throwable4 = throwable3;
                this.error((Function0<String>)new Serializable(){
                    public static final long serialVersionUID = 0L;

                    public final String apply() {
                        return "Exception when starting mirror maker.";
                    }
                }, (Function0<Throwable>)new Serializable(throwable4){
                    public static final long serialVersionUID = 0L;
                    private final Throwable x8$1;

                    public final Throwable apply() {
                        return this.x8$1;
                    }
                    {
                        this.x8$1 = x8$1;
                    }
                });
                BoxedUnit boxedUnit = BoxedUnit.UNIT;
            }
            this.mirrorMakerThreads().foreach((Function1)new Serializable(){
                public static final long serialVersionUID = 0L;

                public final void apply(MirrorMaker.MirrorMakerThread x$1) {
                    x$1.start();
                }
            });
            this.mirrorMakerThreads().foreach((Function1)new Serializable(){
                public static final long serialVersionUID = 0L;

                public final void apply(MirrorMaker.MirrorMakerThread x$2) {
                    x$2.awaitShutdown();
                }
            });
            return;
        }
        throw throwable2;
    }

    public Seq<MirrorMaker.ConsumerWrapper> createConsumers(int numStreams, Properties consumerConfigProps, Option<ConsumerRebalanceListener> customRebalanceListener, Option<String> whitelist) {
        this.maybeSetDefaultProperty(consumerConfigProps, "enable.auto.commit", "false");
        consumerConfigProps.setProperty("key.deserializer", ByteArrayDeserializer.class.getName());
        consumerConfigProps.setProperty("value.deserializer", ByteArrayDeserializer.class.getName());
        String groupIdString = consumerConfigProps.getProperty("group.id");
        IndexedSeq consumers = (IndexedSeq)RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), numStreams).map((Function1)new Serializable(consumerConfigProps, groupIdString){
            public static final long serialVersionUID = 0L;
            private final Properties consumerConfigProps$1;
            private final String groupIdString$1;

            public final KafkaConsumer<byte[], byte[]> apply(int i) {
                this.consumerConfigProps$1.setProperty("client.id", new StringBuilder().append((Object)this.groupIdString$1).append((Object)"-").append((Object)((Object)BoxesRunTime.boxToInteger((int)i)).toString()).toString());
                return new KafkaConsumer(this.consumerConfigProps$1);
            }
            {
                this.consumerConfigProps$1 = consumerConfigProps$1;
                this.groupIdString$1 = groupIdString$1;
            }
        }, IndexedSeq$.MODULE$.canBuildFrom());
        whitelist.getOrElse((Function0)new Serializable(){
            public static final long serialVersionUID = 0L;

            public final Nothing$ apply() {
                throw new IllegalArgumentException("White list cannot be empty");
            }
        });
        return (Seq)consumers.map((Function1)new Serializable(customRebalanceListener, whitelist){
            public static final long serialVersionUID = 0L;
            private final Option customRebalanceListener$1;
            private final Option whitelist$1;

            public final MirrorMaker.ConsumerWrapper apply(KafkaConsumer<byte[], byte[]> consumer) {
                return new MirrorMaker.ConsumerWrapper((Consumer<byte[], byte[]>)consumer, (Option<ConsumerRebalanceListener>)this.customRebalanceListener$1, (Option<String>)this.whitelist$1);
            }
            {
                this.customRebalanceListener$1 = customRebalanceListener$1;
                this.whitelist$1 = whitelist$1;
            }
        }, IndexedSeq$.MODULE$.canBuildFrom());
    }

    /*
     * WARNING - Removed back jump from a try to a catch block - possible behaviour change.
     * Loose catch block
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public void commitOffsets(MirrorMaker.ConsumerWrapper consumerWrapper) {
        if (this.kafka$tools$MirrorMaker$$exitingOnSendFailure()) {
            this.info((Function0<String>)new Serializable(){
                public static final long serialVersionUID = 0L;

                public final String apply() {
                    return "Exiting on send failure, skip committing offsets.";
                }
            });
            return;
        }
        IntRef retry = IntRef.create((int)0);
        boolean retryNeeded = true;
        while (retryNeeded) {
            this.trace((Function0<String>)new Serializable(){
                public static final long serialVersionUID = 0L;

                public final String apply() {
                    return "Committing offsets.";
                }
            });
            try {
                consumerWrapper.commit();
                this.kafka$tools$MirrorMaker$$lastSuccessfulCommitTime_$eq(this.time().milliseconds());
                retryNeeded = false;
            }
            catch (CommitFailedException commitFailedException) {
                retryNeeded = false;
                this.warn((Function0<String>)new Serializable(){
                    public static final long serialVersionUID = 0L;

                    public final String apply() {
                        return new StringBuilder().append((Object)"Failed to commit offsets because the consumer group has rebalanced and assigned partitions to another instance. If you see this regularly, it could indicate that you need to either increase ").append((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"the consumer's ", " or reduce the number of records "})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"session.timeout.ms"}))).append((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"handled on each iteration with ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"max.poll.records"}))).toString();
                    }
                });
            }
        }
        return;
        {
            catch (WakeupException wakeupException) {
                this.commitOffsets(consumerWrapper);
                throw wakeupException;
            }
        }
    }

    public void cleanShutdown() {
        if (this.kafka$tools$MirrorMaker$$isShuttingDown().compareAndSet(false, true)) {
            this.info((Function0<String>)new Serializable(){
                public static final long serialVersionUID = 0L;

                public final String apply() {
                    return "Start clean shutdown.";
                }
            });
            this.info((Function0<String>)new Serializable(){
                public static final long serialVersionUID = 0L;

                public final String apply() {
                    return "Shutting down consumer threads.";
                }
            });
            if (this.mirrorMakerThreads() != null) {
                this.mirrorMakerThreads().foreach((Function1)new Serializable(){
                    public static final long serialVersionUID = 0L;

                    public final void apply(MirrorMaker.MirrorMakerThread x$4) {
                        x$4.shutdown();
                    }
                });
                this.mirrorMakerThreads().foreach((Function1)new Serializable(){
                    public static final long serialVersionUID = 0L;

                    public final void apply(MirrorMaker.MirrorMakerThread x$5) {
                        x$5.awaitShutdown();
                    }
                });
            }
            this.info((Function0<String>)new Serializable(){
                public static final long serialVersionUID = 0L;

                public final String apply() {
                    return "Closing producer.";
                }
            });
            this.producer().close();
            this.info((Function0<String>)new Serializable(){
                public static final long serialVersionUID = 0L;

                public final String apply() {
                    return "Kafka mirror maker shutdown successfully";
                }
            });
        }
    }

    private void maybeSetDefaultProperty(Properties properties, String propertyName, String defaultValue) {
        String propertyValue = properties.getProperty(propertyName);
        properties.setProperty(propertyName, (String)Option$.MODULE$.apply((Object)propertyValue).getOrElse((Function0)new Serializable(defaultValue){
            public static final long serialVersionUID = 0L;
            private final String defaultValue$1;

            public final String apply() {
                return this.defaultValue$1;
            }
            {
                this.defaultValue$1 = defaultValue$1;
            }
        }));
        String string = properties.getProperty(propertyName);
        String string2 = defaultValue;
        if (string == null ? string2 != null : !string.equals(string2)) {
            this.info((Function0<String>)new Serializable(propertyName, propertyValue){
                public static final long serialVersionUID = 0L;
                private final String propertyName$1;
                private final String propertyValue$1;

                public final String apply() {
                    return new StringOps(Predef$.MODULE$.augmentString("Property %s is overridden to %s - data loss or message reordering is possible.")).format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.propertyName$1, this.propertyValue$1}));
                }
                {
                    this.propertyName$1 = propertyName$1;
                    this.propertyValue$1 = propertyValue$1;
                }
            });
        }
    }

    private MirrorMaker$() {
        MODULE$ = this;
        Logging$class.$init$(this);
        KafkaMetricsGroup$class.$init$(this);
        this.producer = null;
        this.mirrorMakerThreads = null;
        this.kafka$tools$MirrorMaker$$isShuttingDown = new AtomicBoolean(false);
        this.kafka$tools$MirrorMaker$$numDroppedMessages = new AtomicInteger(0);
        this.kafka$tools$MirrorMaker$$messageHandler = null;
        this.kafka$tools$MirrorMaker$$offsetCommitIntervalMs = 0;
        this.kafka$tools$MirrorMaker$$abortOnSendFailure = true;
        this.kafka$tools$MirrorMaker$$exitingOnSendFailure = false;
        this.kafka$tools$MirrorMaker$$lastSuccessfulCommitTime = -1L;
        this.time = Time.SYSTEM;
        this.newGauge("MirrorMaker-numDroppedMessages", new Gauge<Object>(){

            public int value() {
                return MirrorMaker$.MODULE$.kafka$tools$MirrorMaker$$numDroppedMessages().get();
            }
        }, this.newGauge$default$3());
    }
}

