/*
 * Decompiled with CFR 0.152.
 */
package com.wavefront.agent.logsharvesting;

import com.wavefront.agent.logsharvesting.FilebeatMessage;
import com.wavefront.agent.logsharvesting.LogsIngester;
import com.wavefront.agent.logsharvesting.MalformedMessageException;
import com.yammer.metrics.Metrics;
import com.yammer.metrics.core.Counter;
import com.yammer.metrics.core.Histogram;
import com.yammer.metrics.core.MetricName;
import io.netty.channel.ChannelHandlerContext;
import java.util.function.Supplier;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.logstash.beats.IMessageListener;
import org.logstash.beats.Message;

public class FilebeatIngester
implements IMessageListener {
    protected static final Logger logger = Logger.getLogger(LogsIngester.class.getCanonicalName());
    private final LogsIngester logsIngester;
    private final Counter received;
    private final Counter malformed;
    private final Histogram drift;
    private final Supplier<Long> currentMillis;

    public FilebeatIngester(LogsIngester logsIngester, Supplier<Long> currentMillis) {
        this.logsIngester = logsIngester;
        this.received = Metrics.newCounter((MetricName)new MetricName("logsharvesting", "", "filebeat-received"));
        this.malformed = Metrics.newCounter((MetricName)new MetricName("logsharvesting", "", "filebeat-malformed"));
        this.drift = Metrics.newHistogram((MetricName)new MetricName("logsharvesting", "", "filebeat-drift"));
        this.currentMillis = currentMillis;
    }

    @Override
    public void onNewMessage(ChannelHandlerContext ctx, Message message) {
        FilebeatMessage filebeatMessage;
        this.received.inc();
        try {
            filebeatMessage = new FilebeatMessage(message);
        }
        catch (MalformedMessageException exn) {
            logger.severe("Malformed message received from filebeat, dropping (" + exn.getMessage() + ")");
            this.malformed.inc();
            return;
        }
        if (filebeatMessage.getTimestampMillis() != null) {
            this.drift.update(this.currentMillis.get() - filebeatMessage.getTimestampMillis());
        }
        this.logsIngester.ingestLog(filebeatMessage);
    }

    @Override
    public void onNewConnection(ChannelHandlerContext ctx) {
        logger.info("New filebeat connection.");
    }

    @Override
    public void onConnectionClose(ChannelHandlerContext ctx) {
        logger.info("Filebeat connection closed.");
    }

    @Override
    public void onException(ChannelHandlerContext ctx, Throwable cause) {
        logger.log(Level.SEVERE, "Caught error processing beats data.", cause);
    }

    @Override
    public void onChannelInitializeException(ChannelHandlerContext ctx, Throwable cause) {
        logger.log(Level.SEVERE, "Caught initializing beats data processor.", cause);
    }
}

