package org.apache.hedwig.client.benchmark;

import com.google.protobuf.ByteString;
import java.util.HashMap;
import java.util.concurrent.Callable;
import org.apache.bookkeeper.util.MathUtils;
import org.apache.hedwig.client.api.MessageHandler;
import org.apache.hedwig.client.api.Subscriber;
import org.apache.hedwig.client.benchmark.BenchmarkUtils;
import org.apache.hedwig.protocol.PubSubProtocol;
import org.apache.hedwig.util.Callback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hedwig/client/benchmark/BenchmarkSubscriber.class */
public class BenchmarkSubscriber extends BenchmarkWorker implements Callable<Void> {
    static final Logger logger = LoggerFactory.getLogger(BenchmarkSubscriber.class);
    Subscriber subscriber;
    ByteString subId;

    public BenchmarkSubscriber(int i, int i2, int i3, int i4, int i5, int i6, Subscriber subscriber, ByteString byteString) {
        super(i, i2, i3, i4, i5, i6);
        this.subscriber = subscriber;
        this.subId = byteString;
    }

    public void warmup(int i) throws InterruptedException {
        multiSub("warmup", "warmup", 0, i, i * this.numPartitions);
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // java.util.concurrent.Callable
    public Void call() throws Exception {
        final BenchmarkUtils.ThroughputAggregator throughputAggregator = new BenchmarkUtils.ThroughputAggregator("recvs", this.numMessages);
        throughputAggregator.startProgress();
        final HashMap hashMap = new HashMap();
        for (int i = this.startTopicLabel; i < this.startTopicLabel + this.numTopics; i++) {
            if (HedwigBenchmark.amIResponsibleForTopic(i, this.partitionIndex, this.numPartitions)) {
                final String str = "topic" + i;
                this.subscriber.subscribe(ByteString.copyFromUtf8(str), this.subId, PubSubProtocol.SubscribeRequest.CreateOrAttach.CREATE_OR_ATTACH);
                this.subscriber.startDelivery(ByteString.copyFromUtf8(str), this.subId, new MessageHandler() { // from class: org.apache.hedwig.client.benchmark.BenchmarkSubscriber.1
                    @Override // org.apache.hedwig.client.api.MessageHandler
                    public void deliver(ByteString byteString, ByteString byteString2, PubSubProtocol.Message message, Callback<Void> callback, Object obj) {
                        BenchmarkSubscriber.logger.debug("Got message from src-region: {} with seq-id: {}", message.getSrcRegion(), message.getMsgId());
                        Long l = (Long) hashMap.get(str + message.getSrcRegion().toStringUtf8());
                        if (l == null) {
                            l = 0L;
                        }
                        if (BenchmarkSubscriber.this.getSrcSeqId(message) <= l.longValue()) {
                            BenchmarkSubscriber.logger.info("Redelivery of message, src-region: " + message.getSrcRegion() + "seq-id: " + message.getMsgId());
                        } else {
                            throughputAggregator.ding(false);
                        }
                        callback.operationFinished(obj, null);
                    }
                });
            }
        }
        System.out.println("Finished subscribing to topics and now waiting for messages to come in...");
        throughputAggregator.queue.take();
        System.out.println(throughputAggregator.summarize(throughputAggregator.earliest.get()));
        return null;
    }

    long getSrcSeqId(PubSubProtocol.Message message) {
        if (message.getMsgId().getRemoteComponentsCount() == 0) {
            return message.getMsgId().getLocalComponent();
        }
        for (PubSubProtocol.RegionSpecificSeqId regionSpecificSeqId : message.getMsgId().getRemoteComponentsList()) {
            if (regionSpecificSeqId.getRegion().equals(message.getSrcRegion())) {
                return regionSpecificSeqId.getSeqId();
            }
        }
        return message.getMsgId().getLocalComponent();
    }

    void multiSub(String str, String str2, int i, int i2, int i3) throws InterruptedException {
        long now = MathUtils.now();
        BenchmarkUtils.ThroughputLatencyAggregator throughputLatencyAggregator = new BenchmarkUtils.ThroughputLatencyAggregator(str, i3 / this.numPartitions, i2);
        throughputLatencyAggregator.startProgress();
        int i4 = i + i3;
        for (int i5 = i; i5 < i4; i5++) {
            if (HedwigBenchmark.amIResponsibleForTopic(i5, this.partitionIndex, this.numPartitions)) {
                this.subscriber.asyncSubscribe(ByteString.copyFromUtf8(str2 + i5), this.subId, PubSubProtocol.SubscribeRequest.CreateOrAttach.CREATE_OR_ATTACH, new BenchmarkUtils.BenchmarkCallback(throughputLatencyAggregator), (Object) null);
            }
        }
        throughputLatencyAggregator.tpAgg.queue.take();
        if (i3 > 1) {
            System.out.println(throughputLatencyAggregator.summarize(now));
        }
    }
}
