/*
 * Decompiled with CFR 0.152.
 */
package other.kafka;

import java.io.OutputStream;
import joptsimple.ArgumentAcceptingOptionSpec;
import joptsimple.OptionParser;
import joptsimple.OptionSet;
import joptsimple.OptionSpec;
import joptsimple.OptionSpecBuilder;
import kafka.utils.ZKStringSerializer$;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.serialize.ZkSerializer;
import other.kafka.TestOffsetManager;
import scala.Function1;
import scala.Predef$;
import scala.Serializable;
import scala.Tuple2;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.immutable.IndexedSeq$;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.StringOps;
import scala.runtime.BoxesRunTime;
import scala.runtime.ObjectRef;
import scala.runtime.RichInt$;
import scala.util.Random;

public final class TestOffsetManager$ {
    public static final TestOffsetManager$ MODULE$;
    private final Random random;
    private final int SocketTimeoutMs;

    static {
        new TestOffsetManager$();
    }

    public Random random() {
        return this.random;
    }

    public int SocketTimeoutMs() {
        return this.SocketTimeoutMs;
    }

    public void main(String[] args) {
        OptionParser parser = new OptionParser();
        ArgumentAcceptingOptionSpec zookeeperOpt = parser.accepts("zookeeper", "The ZooKeeper connection URL.").withRequiredArg().describedAs("ZooKeeper URL").ofType(String.class).defaultsTo((Object)"localhost:2181", (Object[])new String[0]);
        ArgumentAcceptingOptionSpec commitIntervalOpt = parser.accepts("commit-interval-ms", "Offset commit interval.").withRequiredArg().describedAs("interval").ofType(Integer.class).defaultsTo((Object)Predef$.MODULE$.int2Integer(100), (Object[])new Integer[0]);
        ArgumentAcceptingOptionSpec fetchIntervalOpt = parser.accepts("fetch-interval-ms", "Offset fetch interval.").withRequiredArg().describedAs("interval").ofType(Integer.class).defaultsTo((Object)Predef$.MODULE$.int2Integer(1000), (Object[])new Integer[0]);
        ArgumentAcceptingOptionSpec numPartitionsOpt = parser.accepts("partition-count", "Number of partitions per commit.").withRequiredArg().describedAs("interval").ofType(Integer.class).defaultsTo((Object)Predef$.MODULE$.int2Integer(1), (Object[])new Integer[0]);
        ArgumentAcceptingOptionSpec numThreadsOpt = parser.accepts("thread-count", "Number of commit threads.").withRequiredArg().describedAs("threads").ofType(Integer.class).defaultsTo((Object)Predef$.MODULE$.int2Integer(1), (Object[])new Integer[0]);
        ArgumentAcceptingOptionSpec reportingIntervalOpt = parser.accepts("reporting-interval-ms", "Interval at which stats are reported.").withRequiredArg().describedAs("interval (ms)").ofType(Integer.class).defaultsTo((Object)Predef$.MODULE$.int2Integer(3000), (Object[])new Integer[0]);
        OptionSpecBuilder helpOpt = parser.accepts("help", "Print this message.");
        OptionSet options = parser.parse(args);
        if (options.has((OptionSpec)helpOpt)) {
            parser.printHelpOn((OutputStream)System.out);
            System.exit(0);
        }
        int commitIntervalMs = (Integer)options.valueOf((OptionSpec)commitIntervalOpt);
        int fetchIntervalMs = (Integer)options.valueOf((OptionSpec)fetchIntervalOpt);
        int threadCount = (Integer)options.valueOf((OptionSpec)numThreadsOpt);
        int partitionCount = (Integer)options.valueOf((OptionSpec)numPartitionsOpt);
        String zookeeper = (String)options.valueOf((OptionSpec)zookeeperOpt);
        int reportingIntervalMs = (Integer)options.valueOf((OptionSpec)reportingIntervalOpt);
        Predef$.MODULE$.println((Object)new StringOps(Predef$.MODULE$.augmentString("Commit thread count: %d; Partition count: %d, Commit interval: %d ms; Fetch interval: %d ms; Reporting interval: %d ms")).format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)threadCount), BoxesRunTime.boxToInteger((int)partitionCount), BoxesRunTime.boxToInteger((int)commitIntervalMs), BoxesRunTime.boxToInteger((int)fetchIntervalMs), BoxesRunTime.boxToInteger((int)reportingIntervalMs)})));
        ObjectRef zkClient = ObjectRef.create(null);
        ObjectRef commitThreads = ObjectRef.create((Object)((Seq)Seq$.MODULE$.apply((Seq)Nil$.MODULE$)));
        ObjectRef fetchThread = ObjectRef.create(null);
        ObjectRef statsThread = ObjectRef.create(null);
        try {
            try {
                zkClient.elem = new ZkClient(zookeeper, 6000, 2000, (ZkSerializer)ZKStringSerializer$.MODULE$);
                commitThreads.elem = (Seq)RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(0), threadCount - 1).map((Function1)new Serializable(commitIntervalMs, partitionCount, zkClient){
                    public static final long serialVersionUID = 0L;
                    private final int commitIntervalMs$1;
                    private final int partitionCount$1;
                    private final ObjectRef zkClient$1;

                    public final TestOffsetManager.CommitThread apply(int threadId) {
                        return new TestOffsetManager.CommitThread(threadId, this.partitionCount$1, this.commitIntervalMs$1, (ZkClient)this.zkClient$1.elem);
                    }
                    {
                        this.commitIntervalMs$1 = commitIntervalMs$1;
                        this.partitionCount$1 = partitionCount$1;
                        this.zkClient$1 = zkClient$1;
                    }
                }, IndexedSeq$.MODULE$.canBuildFrom());
                fetchThread.elem = new TestOffsetManager.FetchThread(threadCount, fetchIntervalMs, (ZkClient)zkClient.elem);
                TestOffsetManager.StatsThread statsThread2 = new TestOffsetManager.StatsThread(reportingIntervalMs, (Seq<TestOffsetManager.CommitThread>)((Seq)commitThreads.elem), (TestOffsetManager.FetchThread)((Object)fetchThread.elem));
                Runtime.getRuntime().addShutdownHook(new Thread(zkClient, commitThreads, fetchThread, statsThread, statsThread2){
                    private final ObjectRef zkClient$1;
                    private final ObjectRef commitThreads$1;
                    private final ObjectRef fetchThread$1;
                    private final ObjectRef statsThread$1;
                    private final TestOffsetManager.StatsThread statsThread$2;

                    public void run() {
                        TestOffsetManager$.MODULE$.other$kafka$TestOffsetManager$$cleanShutdown$1(this.zkClient$1, this.commitThreads$1, this.fetchThread$1, this.statsThread$1);
                        this.statsThread$2.printStats();
                    }
                    {
                        this.zkClient$1 = zkClient$1;
                        this.commitThreads$1 = commitThreads$1;
                        this.fetchThread$1 = fetchThread$1;
                        this.statsThread$1 = statsThread$1;
                        this.statsThread$2 = statsThread$2;
                    }
                });
                ((Seq)commitThreads.elem).foreach((Function1)new Serializable(){
                    public static final long serialVersionUID = 0L;

                    public final void apply(TestOffsetManager.CommitThread x$11) {
                        x$11.start();
                    }
                });
                ((TestOffsetManager.FetchThread)((Object)fetchThread.elem)).start();
                statsThread2.start();
                ((Seq)commitThreads.elem).foreach((Function1)new Serializable(){
                    public static final long serialVersionUID = 0L;

                    public final void apply(TestOffsetManager.CommitThread x$12) {
                        x$12.join();
                    }
                });
                ((TestOffsetManager.FetchThread)((Object)fetchThread.elem)).join();
                statsThread2.join();
            }
            catch (Throwable throwable) {
                Predef$.MODULE$.println((Object)new Tuple2((Object)"Error: ", (Object)throwable));
            }
        }
        finally {
            this.other$kafka$TestOffsetManager$$cleanShutdown$1(zkClient, commitThreads, fetchThread, statsThread);
        }
    }

    public final void other$kafka$TestOffsetManager$$cleanShutdown$1(ObjectRef zkClient$1, ObjectRef commitThreads$1, ObjectRef fetchThread$1, ObjectRef statsThread$1) {
        ((Seq)commitThreads$1.elem).foreach((Function1)new Serializable(){
            public static final long serialVersionUID = 0L;

            public final void apply(TestOffsetManager.CommitThread x$13) {
                x$13.shutdown();
            }
        });
        ((Seq)commitThreads$1.elem).foreach((Function1)new Serializable(){
            public static final long serialVersionUID = 0L;

            public final void apply(TestOffsetManager.CommitThread x$14) {
                x$14.join();
            }
        });
        if ((TestOffsetManager.FetchThread)((Object)fetchThread$1.elem) != null) {
            ((TestOffsetManager.FetchThread)((Object)fetchThread$1.elem)).shutdown();
            ((TestOffsetManager.FetchThread)((Object)fetchThread$1.elem)).join();
        }
        if ((TestOffsetManager.StatsThread)((Object)statsThread$1.elem) != null) {
            ((TestOffsetManager.StatsThread)((Object)statsThread$1.elem)).shutdown();
            ((TestOffsetManager.StatsThread)((Object)statsThread$1.elem)).join();
        }
        ((ZkClient)zkClient$1.elem).close();
    }

    private TestOffsetManager$() {
        MODULE$ = this;
        this.random = new Random();
        this.SocketTimeoutMs = 10000;
    }
}

