package org.apache.flume.sink.solr.morphline;

import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.SharedMetricRegistries;
import com.codahale.metrics.Timer;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import java.io.File;
import java.util.Map;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.kitesdk.morphline.api.Command;
import org.kitesdk.morphline.api.MorphlineCompilationException;
import org.kitesdk.morphline.api.MorphlineContext;
import org.kitesdk.morphline.api.Record;
import org.kitesdk.morphline.base.Compiler;
import org.kitesdk.morphline.base.FaultTolerance;
import org.kitesdk.morphline.base.Notifications;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flume/sink/solr/morphline/MorphlineHandlerImpl.class */
public class MorphlineHandlerImpl implements MorphlineHandler {
    private MorphlineContext morphlineContext;
    private Command morphline;
    private Command finalChild;
    private String morphlineFileAndId;
    private Timer mappingTimer;
    private Meter numRecords;
    private Meter numFailedRecords;
    private Meter numExceptionRecords;
    public static final String MORPHLINE_FILE_PARAM = "morphlineFile";
    public static final String MORPHLINE_ID_PARAM = "morphlineId";
    public static final String MORPHLINE_VARIABLE_PARAM = "morphlineVariable";
    private static final Logger LOG = LoggerFactory.getLogger(MorphlineHandlerImpl.class);

    void setMorphlineContext(MorphlineContext morphlineContext) {
        this.morphlineContext = morphlineContext;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setFinalChild(Command command) {
        this.finalChild = command;
    }

    public void configure(Context context) {
        String string = context.getString(MORPHLINE_FILE_PARAM);
        String string2 = context.getString(MORPHLINE_ID_PARAM);
        if (string == null || string.trim().length() == 0) {
            throw new MorphlineCompilationException("Missing parameter: morphlineFile", (Config) null);
        }
        this.morphlineFileAndId = string + "@" + string2;
        if (this.morphlineContext == null) {
            this.morphlineContext = new MorphlineContext.Builder().setExceptionHandler(new FaultTolerance(context.getBoolean("isProductionMode", false).booleanValue(), context.getBoolean("isIgnoringRecoverableExceptions", false).booleanValue(), context.getString("recoverableExceptionClasses"))).setMetricRegistry(SharedMetricRegistries.getOrCreate(this.morphlineFileAndId)).build();
        }
        this.morphline = new Compiler().compile(new File(string), string2, this.morphlineContext, this.finalChild, new Config[]{ConfigFactory.parseMap(context.getSubProperties("morphlineVariable."))});
        this.mappingTimer = this.morphlineContext.getMetricRegistry().timer(MetricRegistry.name("morphline.app", new String[]{"elapsedTime"}));
        this.numRecords = this.morphlineContext.getMetricRegistry().meter(MetricRegistry.name("morphline.app", new String[]{"numRecords"}));
        this.numFailedRecords = this.morphlineContext.getMetricRegistry().meter(MetricRegistry.name("morphline.app", new String[]{"numFailedRecords"}));
        this.numExceptionRecords = this.morphlineContext.getMetricRegistry().meter(MetricRegistry.name("morphline.app", new String[]{"numExceptionRecords"}));
    }

    @Override // org.apache.flume.sink.solr.morphline.MorphlineHandler
    public void process(Event event) {
        this.numRecords.mark();
        Timer.Context time = this.mappingTimer.time();
        try {
            Record record = new Record();
            for (Map.Entry entry : event.getHeaders().entrySet()) {
                record.put((String) entry.getKey(), entry.getValue());
            }
            byte[] body = event.getBody();
            if (body != null && body.length > 0) {
                record.put("_attachment_body", body);
            }
            try {
                Notifications.notifyStartSession(this.morphline);
                if (!this.morphline.process(record)) {
                    this.numFailedRecords.mark();
                    LOG.warn("Morphline {} failed to process record: {}", this.morphlineFileAndId, record);
                }
            } catch (RuntimeException e) {
                this.numExceptionRecords.mark();
                this.morphlineContext.getExceptionHandler().handleException(e, record);
            }
        } finally {
            time.stop();
        }
    }

    @Override // org.apache.flume.sink.solr.morphline.MorphlineHandler
    public void beginTransaction() {
        Notifications.notifyBeginTransaction(this.morphline);
    }

    @Override // org.apache.flume.sink.solr.morphline.MorphlineHandler
    public void commitTransaction() {
        Notifications.notifyCommitTransaction(this.morphline);
    }

    @Override // org.apache.flume.sink.solr.morphline.MorphlineHandler
    public void rollbackTransaction() {
        Notifications.notifyRollbackTransaction(this.morphline);
    }

    @Override // org.apache.flume.sink.solr.morphline.MorphlineHandler
    public void stop() {
        Notifications.notifyShutdown(this.morphline);
    }
}
