/*
 * Decompiled with CFR 0.152.
 */
package com.wavefront.agent.handlers;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.AtomicDouble;
import com.wavefront.agent.handlers.AbstractReportableEntityHandler;
import com.wavefront.agent.handlers.HandlerKey;
import com.wavefront.agent.handlers.SenderTask;
import com.wavefront.api.agent.ValidationConfiguration;
import com.wavefront.common.Clock;
import com.wavefront.common.HostMetricTagsPair;
import com.wavefront.common.Utils;
import com.wavefront.data.DeltaCounterValueException;
import com.wavefront.data.Validation;
import com.wavefront.ingester.ReportPointSerializer;
import com.yammer.metrics.Metrics;
import com.yammer.metrics.core.BurstRateTrackingCounter;
import com.yammer.metrics.core.Counter;
import com.yammer.metrics.core.DeltaCounter;
import com.yammer.metrics.core.Gauge;
import com.yammer.metrics.core.Histogram;
import com.yammer.metrics.core.MetricName;
import java.util.Collection;
import java.util.Map;
import java.util.Objects;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import wavefront.report.ReportPoint;

public class DeltaCounterAccumulationHandlerImpl
extends AbstractReportableEntityHandler<ReportPoint, String> {
    private final ValidationConfiguration validationConfig;
    private final Logger validItemsLogger;
    final Histogram receivedPointLag;
    private final BurstRateTrackingCounter reportedStats;
    private final Supplier<Counter> discardedCounterSupplier;
    private final Cache<HostMetricTagsPair, AtomicDouble> aggregatedDeltas;
    private final ScheduledExecutorService reporter = Executors.newSingleThreadScheduledExecutor();
    private final Timer receivedRateTimer;

    public DeltaCounterAccumulationHandlerImpl(HandlerKey handlerKey, int blockedItemsPerBatch, final @Nullable Map<String, Collection<SenderTask<String>>> senderTaskMap, @Nonnull ValidationConfiguration validationConfig, long aggregationIntervalSeconds, final @Nullable BiConsumer<String, Long> receivedRateSink, @Nullable Logger blockedItemLogger, @Nullable Logger validItemsLogger) {
        super(handlerKey, blockedItemsPerBatch, new ReportPointSerializer(), senderTaskMap, true, (BiConsumer<String, Long>)null, blockedItemLogger);
        super.initializeCounters();
        this.validationConfig = validationConfig;
        this.validItemsLogger = validItemsLogger;
        this.aggregatedDeltas = Caffeine.newBuilder().expireAfterAccess(5L * aggregationIntervalSeconds, TimeUnit.SECONDS).removalListener((metric, value, reason) -> this.reportAggregatedDeltaValue((HostMetricTagsPair)metric, (AtomicDouble)value)).build();
        this.receivedPointLag = Metrics.newHistogram((MetricName)new MetricName("points." + handlerKey.getHandle() + ".received", "", "lag"), (boolean)false);
        this.reporter.scheduleWithFixedDelay(this::flushDeltaCounters, aggregationIntervalSeconds, aggregationIntervalSeconds, TimeUnit.SECONDS);
        String metricPrefix = handlerKey.toString();
        this.reportedStats = new BurstRateTrackingCounter(new MetricName(metricPrefix, "", "sent"), Metrics.defaultRegistry(), 1000);
        this.discardedCounterSupplier = Utils.lazySupplier(() -> Metrics.newCounter((MetricName)new MetricName(metricPrefix, "", "discarded")));
        Metrics.newGauge((MetricName)new MetricName(metricPrefix, "", "accumulator.size"), (Gauge)new Gauge<Long>(){

            public Long value() {
                return DeltaCounterAccumulationHandlerImpl.this.aggregatedDeltas.estimatedSize();
            }
        });
        if (receivedRateSink == null) {
            this.receivedRateTimer = null;
        } else {
            this.receivedRateTimer = new Timer("delta-counter-timer-" + handlerKey.getHandle());
            this.receivedRateTimer.scheduleAtFixedRate(new TimerTask(){

                @Override
                public void run() {
                    for (String tenantName : senderTaskMap.keySet()) {
                        receivedRateSink.accept(tenantName, DeltaCounterAccumulationHandlerImpl.this.receivedStats.getCurrentRate());
                    }
                }
            }, 1000L, 1000L);
        }
    }

    @VisibleForTesting
    public void flushDeltaCounters() {
        this.aggregatedDeltas.asMap().forEach(this::reportAggregatedDeltaValue);
    }

    private void reportAggregatedDeltaValue(@Nullable HostMetricTagsPair hostMetricTagsPair, @Nullable AtomicDouble value) {
        if (value == null || hostMetricTagsPair == null) {
            return;
        }
        this.reportedStats.inc();
        double reportedValue = value.getAndSet(0.0);
        if (reportedValue == 0.0) {
            return;
        }
        String strPoint = com.wavefront.sdk.common.Utils.metricToLineData((String)hostMetricTagsPair.metric, (double)reportedValue, (Long)Clock.now(), (String)hostMetricTagsPair.getHost(), hostMetricTagsPair.getTags(), (String)"wavefront-proxy");
        this.getTask("central").add(strPoint);
        if (this.isMulticastingActive && hostMetricTagsPair.getTags() != null && hostMetricTagsPair.getTags().containsKey("multicastingTenantName")) {
            String[] multicastingTenantNames = hostMetricTagsPair.getTags().get("multicastingTenantName").trim().split(",");
            hostMetricTagsPair.getTags().remove("multicastingTenantName");
            for (String multicastingTenantName : multicastingTenantNames) {
                if (this.getTask(multicastingTenantName) == null) continue;
                this.getTask(multicastingTenantName).add(com.wavefront.sdk.common.Utils.metricToLineData((String)hostMetricTagsPair.metric, (double)reportedValue, (Long)Clock.now(), (String)hostMetricTagsPair.getHost(), hostMetricTagsPair.getTags(), (String)"wavefront-proxy"));
            }
        }
    }

    @Override
    void reportInternal(ReportPoint point) {
        if (DeltaCounter.isDelta((String)point.getMetric())) {
            try {
                Validation.validatePoint((ReportPoint)point, (ValidationConfiguration)this.validationConfig);
            }
            catch (DeltaCounterValueException e) {
                this.discardedCounterSupplier.get().inc();
                return;
            }
            this.getReceivedCounter().inc();
            double deltaValue = (Double)point.getValue();
            this.receivedPointLag.update(Clock.now() - point.getTimestamp());
            HostMetricTagsPair hostMetricTagsPair = new HostMetricTagsPair(point.getHost(), point.getMetric(), point.getAnnotations());
            Objects.requireNonNull((AtomicDouble)this.aggregatedDeltas.get((Object)hostMetricTagsPair, key -> new AtomicDouble(0.0))).getAndAdd(deltaValue);
            if (this.validItemsLogger != null && this.validItemsLogger.isLoggable(Level.FINEST)) {
                this.validItemsLogger.info((String)this.serializer.apply(point));
            }
        } else {
            this.reject(point, "Port is not configured to accept non-delta counter data!");
        }
    }

    @Override
    public void shutdown() {
        super.shutdown();
        this.reporter.shutdown();
        if (this.receivedRateTimer != null) {
            this.receivedRateTimer.cancel();
        }
    }
}

