/*
 * Decompiled with CFR 0.152.
 */
package kafka.tools;

import java.io.Serializable;
import java.util.Properties;
import kafka.integration.KafkaServerTestHarness;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import kafka.server.KafkaServer;
import kafka.tools.MirrorMaker;
import kafka.tools.MirrorMaker$;
import kafka.utils.TestUtils$;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.junit.Assert;
import org.junit.Test;
import org.scalactic.source.Position;
import org.scalatest.Assertions$;
import scala.Function1;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichLong$;

@ScalaSignature(bytes="\u0006\u0001\u001d3AAB\u0004\u0001\u0019!)1\u0003\u0001C\u0001)!)q\u0003\u0001C!1!)q\u0005\u0001C\u0001Q!)1\t\u0001C\u0001Q!)Q\t\u0001C\u0001Q\tQR*\u001b:s_Jl\u0015m[3s\u0013:$Xm\u001a:bi&|g\u000eV3ti*\u0011\u0001\"C\u0001\u0006i>|Gn\u001d\u0006\u0002\u0015\u0005)1.\u00194lC\u000e\u00011C\u0001\u0001\u000e!\tq\u0011#D\u0001\u0010\u0015\t\u0001\u0012\"A\u0006j]R,wM]1uS>t\u0017B\u0001\n\u0010\u0005YY\u0015MZ6b'\u0016\u0014h/\u001a:UKN$\b*\u0019:oKN\u001c\u0018A\u0002\u001fj]&$h\bF\u0001\u0016!\t1\u0002!D\u0001\b\u0003=9WM\\3sCR,7i\u001c8gS\u001e\u001cX#A\r\u0011\u0007iy\u0012%D\u0001\u001c\u0015\taR$\u0001\u0006d_2dWm\u0019;j_:T\u0011AH\u0001\u0006g\u000e\fG.Y\u0005\u0003Am\u00111aU3r!\t\u0011S%D\u0001$\u0015\t!\u0013\"\u0001\u0004tKJ4XM]\u0005\u0003M\r\u00121bS1gW\u0006\u001cuN\u001c4jO\u00061C/Z:u\u0007>lW.\u001b;PM\u001a\u001cX\r^:UQJ|w\u000fV5nK>,H/\u0012=dKB$\u0018n\u001c8\u0015\u0003%\u0002\"AK\u0016\u000e\u0003uI!\u0001L\u000f\u0003\tUs\u0017\u000e\u001e\u0015\u0005\u000792t\u0007\u0005\u00020i5\t\u0001G\u0003\u00022e\u0005)!.\u001e8ji*\t1'A\u0002pe\u001eL!!\u000e\u0019\u0003\tQ+7\u000f^\u0001\tKb\u0004Xm\u0019;fI\u000e\n\u0001\b\u0005\u0002:\u00036\t!H\u0003\u0002<y\u00051QM\u001d:peNT!!\u0010 \u0002\r\r|W.\\8o\u0015\tQqH\u0003\u0002Ae\u00051\u0011\r]1dQ\u0016L!A\u0011\u001e\u0003!QKW.Z8vi\u0016C8-\u001a9uS>t\u0017\u0001\u000b;fgR\u001cu.\\7ji>3gm]3ugJ+Wn\u001c<f\u001d>tW\t_5ti\u0016tG\u000fV8qS\u000e\u001c\bF\u0001\u0003/\u0003]!Xm\u001d;D_6l\u0017mU3qCJ\fG/\u001a3SK\u001e,\u0007\u0010\u000b\u0002\u0006]\u0001")
public class MirrorMakerIntegrationTest
extends KafkaServerTestHarness {
    @Override
    public Seq<KafkaConfig> generateConfigs() {
        return (Seq)TestUtils$.MODULE$.createBrokerConfigs(1, this.zkConnect(), TestUtils$.MODULE$.createBrokerConfigs$default$3(), TestUtils$.MODULE$.createBrokerConfigs$default$4(), TestUtils$.MODULE$.createBrokerConfigs$default$5(), TestUtils$.MODULE$.createBrokerConfigs$default$6(), TestUtils$.MODULE$.createBrokerConfigs$default$7(), TestUtils$.MODULE$.createBrokerConfigs$default$8(), TestUtils$.MODULE$.createBrokerConfigs$default$9(), TestUtils$.MODULE$.createBrokerConfigs$default$10(), TestUtils$.MODULE$.createBrokerConfigs$default$11(), TestUtils$.MODULE$.createBrokerConfigs$default$12(), TestUtils$.MODULE$.createBrokerConfigs$default$13(), TestUtils$.MODULE$.createBrokerConfigs$default$14(), TestUtils$.MODULE$.createBrokerConfigs$default$15(), TestUtils$.MODULE$.createBrokerConfigs$default$16()).map((Function1 & Serializable & scala.Serializable)x$1 -> KafkaConfig$.MODULE$.fromProps(x$1, new Properties()), Seq$.MODULE$.canBuildFrom());
    }

    @Test(expected=TimeoutException.class)
    public void testCommitOffsetsThrowTimeoutException() {
        Properties consumerProps = new Properties();
        consumerProps.put("group.id", "test-group");
        consumerProps.put("auto.offset.reset", "earliest");
        consumerProps.put("bootstrap.servers", this.brokerList());
        consumerProps.put("default.api.timeout.ms", "1");
        KafkaConsumer consumer = new KafkaConsumer(consumerProps, (Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer());
        MirrorMaker.ConsumerWrapper mirrorMakerConsumer = new MirrorMaker.ConsumerWrapper((Consumer)consumer, (Option)None$.MODULE$, (Option)new Some((Object)"any"));
        mirrorMakerConsumer.offsets().put((Object)new TopicPartition("test", 0), (Object)BoxesRunTime.boxToLong((long)0L));
        mirrorMakerConsumer.commit();
    }

    @Test
    public void testCommitOffsetsRemoveNonExistentTopics() {
        Properties consumerProps = new Properties();
        consumerProps.put("group.id", "test-group");
        consumerProps.put("auto.offset.reset", "earliest");
        consumerProps.put("bootstrap.servers", this.brokerList());
        consumerProps.put("default.api.timeout.ms", "2000");
        KafkaConsumer consumer = new KafkaConsumer(consumerProps, (Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer());
        MirrorMaker.ConsumerWrapper mirrorMakerConsumer = new MirrorMaker.ConsumerWrapper((Consumer)consumer, (Option)None$.MODULE$, (Option)new Some((Object)"any"));
        mirrorMakerConsumer.offsets().put((Object)new TopicPartition("nonexistent-topic1", 0), (Object)BoxesRunTime.boxToLong((long)0L));
        mirrorMakerConsumer.offsets().put((Object)new TopicPartition("nonexistent-topic2", 0), (Object)BoxesRunTime.boxToLong((long)0L));
        MirrorMaker$.MODULE$.commitOffsets(mirrorMakerConsumer);
        Assert.assertTrue((String)"Offsets for non-existent topics should be removed", (boolean)mirrorMakerConsumer.offsets().isEmpty());
    }

    /*
     * WARNING - void declaration
     */
    @Test
    public void testCommaSeparatedRegex() {
        String topic = "new-topic";
        String msg = "a test message";
        String brokerList = TestUtils$.MODULE$.getBrokerListStrFromServers((Seq<KafkaServer>)this.servers(), TestUtils$.MODULE$.getBrokerListStrFromServers$default$2());
        Properties producerProps = new Properties();
        producerProps.put("bootstrap.servers", brokerList);
        producerProps.put("key.serializer", ByteArraySerializer.class);
        producerProps.put("value.serializer", ByteArraySerializer.class);
        MirrorMaker.MirrorMakerProducer producer = new MirrorMaker.MirrorMakerProducer(true, producerProps);
        MirrorMaker$.MODULE$.producer_$eq(producer);
        MirrorMaker$.MODULE$.producer().send(new ProducerRecord(topic, (Object)msg.getBytes()));
        MirrorMaker$.MODULE$.producer().close();
        Properties consumerProps = new Properties();
        consumerProps.put("group.id", "test-group");
        consumerProps.put("auto.offset.reset", "earliest");
        consumerProps.put("bootstrap.servers", brokerList);
        KafkaConsumer consumer = new KafkaConsumer(consumerProps, (Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer());
        MirrorMaker.ConsumerWrapper mirrorMakerConsumer = new MirrorMaker.ConsumerWrapper((Consumer)consumer, (Option)None$.MODULE$, (Option)new Some((Object)"another_topic,new.*,foo"));
        mirrorMakerConsumer.init();
        try {
            long l = TestUtils$.MODULE$.waitUntilTrue$default$4();
            long l2 = TestUtils$.MODULE$.waitUntilTrue$default$3();
            if (TestUtils$.MODULE$ == null) {
                throw null;
            }
            long waitUntilTrue_startTime = System.currentTimeMillis();
            while (!MirrorMakerIntegrationTest.$anonfun$testCommaSeparatedRegex$1(mirrorMakerConsumer, topic, msg)) {
                void waitUntilTrue_pause;
                void waitUntilTrue_waitTimeMs;
                if (System.currentTimeMillis() > waitUntilTrue_startTime + waitUntilTrue_waitTimeMs) {
                    throw Assertions$.MODULE$.fail(MirrorMakerIntegrationTest.$anonfun$testCommaSeparatedRegex$2(), new Position("TestUtils.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 842));
                }
                if (Predef$.MODULE$ == null) {
                    throw null;
                }
                Thread.sleep(RichLong$.MODULE$.min$extension((long)waitUntilTrue_waitTimeMs, (long)waitUntilTrue_pause));
            }
        }
        finally {
            consumer.close();
        }
    }

    public static final /* synthetic */ boolean $anonfun$testCommaSeparatedRegex$1(MirrorMaker.ConsumerWrapper mirrorMakerConsumer$1, String topic$1, String msg$1) {
        boolean bl;
        try {
            ConsumerRecord data = mirrorMakerConsumer$1.receive();
            String string = data.topic();
            if ((string != null ? !string.equals(topic$1) : topic$1 != null) || !new String((byte[])data.value()).equals(msg$1)) {
                return false;
            }
            bl = true;
        }
        catch (MirrorMaker.NoRecordsException noRecordsException) {
            bl = false;
        }
        return bl;
    }

    public static final /* synthetic */ String $anonfun$testCommaSeparatedRegex$2() {
        return "MirrorMaker consumer should read the expected message from the expected topic within the timeout";
    }
}

