/*
 * Decompiled with CFR 0.152.
 */
package com.wavefront.agent.listeners;

import com.fasterxml.jackson.databind.JsonNode;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.wavefront.agent.LogsUtil;
import com.wavefront.agent.auth.TokenAuthenticator;
import com.wavefront.agent.channel.ChannelUtils;
import com.wavefront.agent.channel.HealthCheckManager;
import com.wavefront.agent.channel.SharedGraphiteHostAnnotator;
import com.wavefront.agent.formatter.DataFormat;
import com.wavefront.agent.handlers.HandlerKey;
import com.wavefront.agent.handlers.ReportableEntityHandler;
import com.wavefront.agent.handlers.ReportableEntityHandlerFactory;
import com.wavefront.agent.listeners.AbstractLineDelimitedHandler;
import com.wavefront.agent.listeners.FeatureCheckUtils;
import com.wavefront.agent.listeners.tracing.SpanUtils;
import com.wavefront.agent.preprocessor.ReportableEntityPreprocessor;
import com.wavefront.agent.sampler.SpanSampler;
import com.wavefront.common.TaggedMetricName;
import com.wavefront.common.Utils;
import com.wavefront.data.ReportableEntityType;
import com.wavefront.dto.SourceTag;
import com.wavefront.ingester.ReportableEntityDecoder;
import com.yammer.metrics.Metrics;
import com.yammer.metrics.core.Counter;
import com.yammer.metrics.core.MetricName;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpMessage;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.util.CharsetUtil;
import java.net.URI;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Map;
import java.util.function.Supplier;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.http.NameValuePair;
import org.apache.http.client.utils.URLEncodedUtils;
import wavefront.report.ReportEvent;
import wavefront.report.ReportLog;
import wavefront.report.ReportPoint;
import wavefront.report.ReportSourceTag;
import wavefront.report.Span;
import wavefront.report.SpanLogs;

@ChannelHandler.Sharable
public class WavefrontPortUnificationHandler
extends AbstractLineDelimitedHandler {
    @Nullable
    private final SharedGraphiteHostAnnotator annotator;
    @Nullable
    private final Supplier<ReportableEntityPreprocessor> preprocessorSupplier;
    private final ReportableEntityDecoder<String, ReportPoint> wavefrontDecoder;
    private final ReportableEntityDecoder<String, ReportSourceTag> sourceTagDecoder;
    private final ReportableEntityDecoder<String, ReportEvent> eventDecoder;
    private final ReportableEntityDecoder<String, ReportPoint> histogramDecoder;
    private final ReportableEntityDecoder<String, Span> spanDecoder;
    private final ReportableEntityDecoder<JsonNode, SpanLogs> spanLogsDecoder;
    private final ReportableEntityDecoder<String, ReportLog> logDecoder;
    private final ReportableEntityHandler<ReportPoint, String> wavefrontHandler;
    private final Supplier<ReportableEntityHandler<ReportPoint, String>> histogramHandlerSupplier;
    private final Supplier<ReportableEntityHandler<ReportSourceTag, SourceTag>> sourceTagHandlerSupplier;
    private final Supplier<ReportableEntityHandler<Span, String>> spanHandlerSupplier;
    private final Supplier<ReportableEntityHandler<SpanLogs, String>> spanLogsHandlerSupplier;
    private final Supplier<ReportableEntityHandler<ReportEvent, ReportEvent>> eventHandlerSupplier;
    private final Supplier<ReportableEntityHandler<ReportLog, ReportLog>> logHandlerSupplier;
    private final Supplier<Boolean> histogramDisabled;
    private final Supplier<Boolean> traceDisabled;
    private final Supplier<Boolean> spanLogsDisabled;
    private final Supplier<Boolean> logsDisabled;
    private final SpanSampler sampler;
    private final Supplier<Counter> receivedSpansTotal;
    private final Supplier<Counter> discardedHistograms;
    private final Supplier<Counter> discardedSpans;
    private final Supplier<Counter> discardedSpanLogs;
    private final Supplier<Counter> discardedSpansBySampler;
    private final Supplier<Counter> discardedSpanLogsBySampler;
    private final LoadingCache<DataFormat, Counter> receivedLogsCounter;
    private final LoadingCache<DataFormat, Counter> discardedLogsCounter;

    public WavefrontPortUnificationHandler(String handle, TokenAuthenticator tokenAuthenticator, HealthCheckManager healthCheckManager, Map<ReportableEntityType, ReportableEntityDecoder<?, ?>> decoders, ReportableEntityHandlerFactory handlerFactory, @Nullable SharedGraphiteHostAnnotator annotator, @Nullable Supplier<ReportableEntityPreprocessor> preprocessor, Supplier<Boolean> histogramDisabled, Supplier<Boolean> traceDisabled, Supplier<Boolean> spanLogsDisabled, SpanSampler sampler, Supplier<Boolean> logsDisabled) {
        super(tokenAuthenticator, healthCheckManager, handle);
        this.wavefrontDecoder = decoders.get(ReportableEntityType.POINT);
        this.annotator = annotator;
        this.preprocessorSupplier = preprocessor;
        this.wavefrontHandler = handlerFactory.getHandler(HandlerKey.of(ReportableEntityType.POINT, handle));
        this.histogramDecoder = decoders.get(ReportableEntityType.HISTOGRAM);
        this.sourceTagDecoder = decoders.get(ReportableEntityType.SOURCE_TAG);
        this.spanDecoder = decoders.get(ReportableEntityType.TRACE);
        this.spanLogsDecoder = decoders.get(ReportableEntityType.TRACE_SPAN_LOGS);
        this.eventDecoder = decoders.get(ReportableEntityType.EVENT);
        this.logDecoder = decoders.get(ReportableEntityType.LOGS);
        this.histogramHandlerSupplier = Utils.lazySupplier(() -> handlerFactory.getHandler(HandlerKey.of(ReportableEntityType.HISTOGRAM, handle)));
        this.sourceTagHandlerSupplier = Utils.lazySupplier(() -> handlerFactory.getHandler(HandlerKey.of(ReportableEntityType.SOURCE_TAG, handle)));
        this.spanHandlerSupplier = Utils.lazySupplier(() -> handlerFactory.getHandler(HandlerKey.of(ReportableEntityType.TRACE, handle)));
        this.spanLogsHandlerSupplier = Utils.lazySupplier(() -> handlerFactory.getHandler(HandlerKey.of(ReportableEntityType.TRACE_SPAN_LOGS, handle)));
        this.eventHandlerSupplier = Utils.lazySupplier(() -> handlerFactory.getHandler(HandlerKey.of(ReportableEntityType.EVENT, handle)));
        this.logHandlerSupplier = Utils.lazySupplier(() -> handlerFactory.getHandler(HandlerKey.of(ReportableEntityType.LOGS, handle)));
        this.histogramDisabled = histogramDisabled;
        this.traceDisabled = traceDisabled;
        this.spanLogsDisabled = spanLogsDisabled;
        this.logsDisabled = logsDisabled;
        this.sampler = sampler;
        this.discardedHistograms = Utils.lazySupplier(() -> Metrics.newCounter((MetricName)new MetricName("histogram", "", "discarded_points")));
        this.discardedSpans = Utils.lazySupplier(() -> Metrics.newCounter((MetricName)new MetricName("spans." + handle, "", "discarded")));
        this.discardedSpanLogs = Utils.lazySupplier(() -> Metrics.newCounter((MetricName)new MetricName("spanLogs." + handle, "", "discarded")));
        this.discardedSpansBySampler = Utils.lazySupplier(() -> Metrics.newCounter((MetricName)new MetricName("spans." + handle, "", "sampler.discarded")));
        this.discardedSpanLogsBySampler = Utils.lazySupplier(() -> Metrics.newCounter((MetricName)new MetricName("spanLogs." + handle, "", "sampler.discarded")));
        this.receivedSpansTotal = Utils.lazySupplier(() -> Metrics.newCounter((MetricName)new MetricName("spans." + handle, "", "received.total")));
        this.receivedLogsCounter = Caffeine.newBuilder().build(format -> Metrics.newCounter((MetricName)new TaggedMetricName("logs." + handle, "received.total", new String[]{"format", format.name().toLowerCase()})));
        this.discardedLogsCounter = Caffeine.newBuilder().build(format -> Metrics.newCounter((MetricName)new TaggedMetricName("logs." + handle, "discarded", new String[]{"format", format.name().toLowerCase()})));
    }

    @Override
    protected DataFormat getFormat(FullHttpRequest httpRequest) {
        return DataFormat.parse(URLEncodedUtils.parse((URI)URI.create(httpRequest.uri()), (Charset)CharsetUtil.UTF_8).stream().filter(x -> x.getName().equals("format") || x.getName().equals("f")).map(NameValuePair::getValue).findFirst().orElse(null));
    }

    @Override
    protected void handleHttpMessage(ChannelHandlerContext ctx, FullHttpRequest request) {
        StringBuilder out = new StringBuilder();
        DataFormat format = this.getFormat(request);
        if (format == DataFormat.HISTOGRAM && FeatureCheckUtils.isFeatureDisabled(this.histogramDisabled, "Ingested point discarded because histogram feature has not been enabled for your account", this.discardedHistograms.get(), out, request) || format == DataFormat.SPAN_LOG && FeatureCheckUtils.isFeatureDisabled(this.spanLogsDisabled, "Ingested span log discarded because this feature has not been enabled for your account.", this.discardedSpanLogs.get(), out, request)) {
            ChannelUtils.writeHttpResponse(ctx, HttpResponseStatus.FORBIDDEN, (Object)out, (HttpMessage)request);
            return;
        }
        if (format == DataFormat.SPAN && FeatureCheckUtils.isFeatureDisabled(this.traceDisabled, "Ingested span discarded because distributed tracing feature has not been enabled for your account.", this.discardedSpans.get(), out, request)) {
            this.receivedSpansTotal.get().inc(this.discardedSpans.get().count());
            ChannelUtils.writeHttpResponse(ctx, HttpResponseStatus.FORBIDDEN, (Object)out, (HttpMessage)request);
            return;
        }
        if (LogsUtil.LOGS_DATA_FORMATS.contains((Object)format) && FeatureCheckUtils.isFeatureDisabled(this.logsDisabled, "Ingested logs discarded because this feature has not been enabled for your account.", (Counter)this.discardedLogsCounter.get((Object)format), out, request)) {
            ((Counter)this.receivedLogsCounter.get((Object)format)).inc(((Counter)this.discardedLogsCounter.get((Object)format)).count());
            ChannelUtils.writeHttpResponse(ctx, HttpResponseStatus.FORBIDDEN, (Object)out, (HttpMessage)request);
            return;
        }
        super.handleHttpMessage(ctx, request);
    }

    @Override
    protected void processLine(ChannelHandlerContext ctx, @Nonnull String message, @Nullable DataFormat format) {
        if (message.contains("\u0004")) {
            this.wavefrontHandler.reject(message, "'EOT' character is not allowed!");
        }
        DataFormat dataFormat = format == null ? DataFormat.autodetect(message) : format;
        switch (dataFormat) {
            case SOURCE_TAG: {
                ReportableEntityHandler<ReportSourceTag, SourceTag> sourceTagHandler = this.sourceTagHandlerSupplier.get();
                if (sourceTagHandler == null || this.sourceTagDecoder == null) {
                    this.wavefrontHandler.reject(message, "Port is not configured to accept sourceTag-formatted data!");
                    return;
                }
                ArrayList output = new ArrayList(1);
                try {
                    this.sourceTagDecoder.decode((Object)message, output, "dummy");
                    for (ReportSourceTag tag : output) {
                        sourceTagHandler.report(tag);
                    }
                }
                catch (Exception e) {
                    sourceTagHandler.reject(message, ChannelUtils.formatErrorMessage("WF-300 Cannot parse sourceTag: \"" + message + "\"", e, ctx));
                }
                return;
            }
            case EVENT: {
                ReportableEntityHandler<ReportEvent, ReportEvent> eventHandler = this.eventHandlerSupplier.get();
                if (eventHandler == null || this.eventDecoder == null) {
                    this.wavefrontHandler.reject(message, "Port is not configured to accept event data!");
                    return;
                }
                ArrayList events = new ArrayList(1);
                try {
                    this.eventDecoder.decode((Object)message, events, "dummy");
                    for (ReportEvent event : events) {
                        eventHandler.report(event);
                    }
                }
                catch (Exception e) {
                    eventHandler.reject(message, ChannelUtils.formatErrorMessage("WF-300 Cannot parse event: \"" + message + "\"", e, ctx));
                }
                return;
            }
            case SPAN: {
                ReportableEntityHandler<Span, String> spanHandler = this.spanHandlerSupplier.get();
                if (spanHandler == null || this.spanDecoder == null) {
                    this.wavefrontHandler.reject(message, "Port is not configured to accept tracing data (spans)!");
                    return;
                }
                message = this.annotator == null ? message : this.annotator.apply(ctx, message);
                this.receivedSpansTotal.get().inc();
                SpanUtils.preprocessAndHandleSpan(message, this.spanDecoder, spanHandler, spanHandler::report, this.preprocessorSupplier, ctx, span -> this.sampler.sample((Span)span, this.discardedSpansBySampler.get()));
                return;
            }
            case SPAN_LOG: {
                if (FeatureCheckUtils.isFeatureDisabled(this.spanLogsDisabled, "Ingested span log discarded because this feature has not been enabled for your account.", this.discardedSpanLogs.get())) {
                    return;
                }
                ReportableEntityHandler<SpanLogs, String> spanLogsHandler = this.spanLogsHandlerSupplier.get();
                if (spanLogsHandler == null || this.spanLogsDecoder == null || this.spanDecoder == null) {
                    this.wavefrontHandler.reject(message, "Port is not configured to accept tracing data (span logs)!");
                    return;
                }
                SpanUtils.handleSpanLogs(message, this.spanLogsDecoder, this.spanDecoder, spanLogsHandler, this.preprocessorSupplier, ctx, span -> this.sampler.sample((Span)span, this.discardedSpanLogsBySampler.get()));
                return;
            }
            case HISTOGRAM: {
                if (FeatureCheckUtils.isFeatureDisabled(this.histogramDisabled, "Ingested point discarded because histogram feature has not been enabled for your account", this.discardedHistograms.get())) {
                    return;
                }
                ReportableEntityHandler<ReportPoint, String> histogramHandler = this.histogramHandlerSupplier.get();
                if (histogramHandler == null || this.histogramDecoder == null) {
                    this.wavefrontHandler.reject(message, "Port is not configured to accept histogram-formatted data!");
                    return;
                }
                message = this.annotator == null ? message : this.annotator.apply(ctx, message);
                WavefrontPortUnificationHandler.preprocessAndHandlePoint(message, this.histogramDecoder, histogramHandler, this.preprocessorSupplier, ctx, "histogram");
                return;
            }
            case LOGS_JSON_ARR: 
            case LOGS_JSON_LINES: 
            case LOGS_JSON_CLOUDWATCH: {
                ((Counter)this.receivedLogsCounter.get((Object)format)).inc();
                if (FeatureCheckUtils.isFeatureDisabled(this.logsDisabled, "Ingested logs discarded because this feature has not been enabled for your account.", (Counter)this.discardedLogsCounter.get((Object)format))) {
                    return;
                }
                ReportableEntityHandler<ReportLog, ReportLog> logHandler = this.logHandlerSupplier.get();
                if (logHandler == null || this.logDecoder == null) {
                    this.wavefrontHandler.reject(message, "Port is not configured to accept log data!");
                    return;
                }
                logHandler.setLogFormat(format);
                message = this.annotator == null ? message : this.annotator.apply(ctx, message, true);
                WavefrontPortUnificationHandler.preprocessAndHandleLog(message, this.logDecoder, logHandler, this.preprocessorSupplier, ctx);
                return;
            }
        }
        message = this.annotator == null ? message : this.annotator.apply(ctx, message);
        WavefrontPortUnificationHandler.preprocessAndHandlePoint(message, this.wavefrontDecoder, this.wavefrontHandler, this.preprocessorSupplier, ctx, "metric");
    }

    public static void preprocessAndHandlePoint(String message, ReportableEntityDecoder<String, ReportPoint> decoder, ReportableEntityHandler<ReportPoint, String> handler, @Nullable Supplier<ReportableEntityPreprocessor> preprocessorSupplier, @Nullable ChannelHandlerContext ctx, String type) {
        ReportableEntityPreprocessor preprocessor = preprocessorSupplier == null ? null : preprocessorSupplier.get();
        String[] messageHolder = new String[1];
        if (preprocessor != null) {
            message = preprocessor.forPointLine().transform(message);
            if (!preprocessor.forPointLine().filter(message, messageHolder)) {
                if (messageHolder[0] != null) {
                    handler.reject((ReportPoint)null, message);
                } else {
                    handler.block(null, message);
                }
                return;
            }
        }
        ArrayList output = new ArrayList(1);
        try {
            decoder.decode((Object)message, output, "dummy");
        }
        catch (Exception e) {
            handler.reject(message, ChannelUtils.formatErrorMessage("WF-300 Cannot parse " + type + ": \"" + message + "\"", e, ctx));
            return;
        }
        for (ReportPoint object : output) {
            if (preprocessor != null) {
                preprocessor.forReportPoint().transform(object);
                if (!preprocessor.forReportPoint().filter(object, messageHolder)) {
                    if (messageHolder[0] != null) {
                        handler.reject(object, messageHolder[0]);
                    } else {
                        handler.block(object);
                    }
                    return;
                }
            }
            handler.report(object);
        }
    }

    public static void preprocessAndHandleLog(String message, ReportableEntityDecoder<String, ReportLog> decoder, ReportableEntityHandler<ReportLog, ReportLog> handler, @Nullable Supplier<ReportableEntityPreprocessor> preprocessorSupplier, @Nullable ChannelHandlerContext ctx) {
        ReportableEntityPreprocessor preprocessor = preprocessorSupplier == null ? null : preprocessorSupplier.get();
        String[] messageHolder = new String[1];
        if (preprocessor != null) {
            message = preprocessor.forPointLine().transform(message);
            if (!preprocessor.forPointLine().filter(message, messageHolder)) {
                if (messageHolder[0] != null) {
                    handler.reject((ReportLog)null, message);
                } else {
                    handler.block(null, message);
                }
                return;
            }
        }
        ArrayList output = new ArrayList(1);
        try {
            decoder.decode((Object)message, output, "dummy");
        }
        catch (Exception e) {
            handler.reject(message, ChannelUtils.formatErrorMessage("WF-600 Cannot parse Log: \"" + message + "\"", e, ctx));
            return;
        }
        if (output.get(0) == null) {
            handler.reject(message, ChannelUtils.formatErrorMessage("WF-600 Cannot parse Log: \"" + message + "\"", null, ctx));
            return;
        }
        for (ReportLog object : output) {
            if (preprocessor != null) {
                preprocessor.forReportLog().transform(object);
                if (!preprocessor.forReportLog().filter(object, messageHolder)) {
                    if (messageHolder[0] != null) {
                        handler.reject(object, messageHolder[0]);
                    } else {
                        handler.block(object);
                    }
                    return;
                }
            }
            handler.report(object);
        }
    }
}

