package io.confluent.logevents.connect;

import io.confluent.telemetry.api.events.Event;
import io.confluent.telemetry.events.EventLogger;
import io.confluent.telemetry.events.EventUtils;
import java.time.Instant;
import java.time.ZoneOffset;
import java.util.Map;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/logevents/connect/LogEventsKafkaEmitter.class */
public class LogEventsKafkaEmitter implements LogEventsEmitter {
    private static final Logger log = LoggerFactory.getLogger(LogEventsKafkaEmitter.class);
    private EventLogger eventLogger;
    private LogEventsConfig logEventsConfig;
    private String dataContentType;

    public LogEventsKafkaEmitter(EventLogger eventLogger, LogEventsConfig logEventsConfig, String str) {
        this.eventLogger = null;
        this.logEventsConfig = null;
        this.eventLogger = eventLogger;
        this.logEventsConfig = logEventsConfig;
        this.dataContentType = str;
    }

    public LogEventsKafkaEmitter() {
        this.eventLogger = null;
        this.logEventsConfig = null;
    }

    public synchronized void start(Map<String, ?> map) {
        if (this.eventLogger != null) {
            log.warn("Skipping reinitialization of {}", this);
            return;
        }
        this.logEventsConfig = new LogEventsConfig(map);
        if (!this.logEventsConfig.getBoolean(LogEventsConfig.LOGGER_ENABLE_CONFIG).booleanValue()) {
            log.info("Connect Log Events aren't enabled.");
            return;
        }
        log.info("Initializing {}", this);
        populateDataContentType();
        this.eventLogger = new EventLogger();
        this.eventLogger.configure(this.logEventsConfig.toEventLoggerConfig());
    }

    public synchronized LogEventsConfig logEventsConfig() {
        if (this.logEventsConfig != null) {
            return this.logEventsConfig;
        }
        log.error("logEventsConfig instance can't be returned without starting logEventsKafkaEmitter instance.");
        throw new IllegalStateException("logEventsConfig can't be accessed without starting logEventsKafkaEmitter instance");
    }

    @Override // io.confluent.logevents.connect.LogEventsEmitter
    public void emit(ConnectLogEntry connectLogEntry, String str, String str2, String str3) {
        if (this.eventLogger == null) {
            log.trace("Skipping emitting the Connect Log Entry: {}, with source: {}, subject: {}, type: {}", new Object[]{connectLogEntry, str2, str, str3});
            return;
        }
        Event eventBuilder = eventBuilder(connectLogEntry, str, str2, str3);
        try {
            this.eventLogger.log(eventBuilder);
        } catch (RuntimeException e) {
            log.error("Unable to emit event: {}", EventUtils.toJson(eventBuilder), e);
        }
    }

    public synchronized void stop() {
        if (this.eventLogger == null) {
            return;
        }
        log.info("Stopping {}, closing event logger", this);
        try {
            this.eventLogger.close();
            this.eventLogger = null;
        } catch (Exception e) {
            log.error("Error closing the event logger in {}", this, e);
        }
    }

    public String toString() {
        return LogEventsKafkaEmitter.class.getSimpleName();
    }

    EventLogger getEventLogger() {
        return this.eventLogger;
    }

    private Event eventBuilder(ConnectLogEntry connectLogEntry, String str, String str2, String str3) {
        return new Event().setId(UUID.randomUUID().toString()).setTime(Instant.now().atOffset(ZoneOffset.UTC)).setData(this.dataContentType, EventUtils.protoToBytes(connectLogEntry, this.dataContentType)).setSource(str2).setSubject(str).setType(str3).setExtension("route", this.logEventsConfig.getString(LogEventsConfig.LOGGER_LOG_EVENTS_TOPIC_CONFIG));
    }

    private void populateDataContentType() {
        String string = this.logEventsConfig.getString(LogEventsConfig.LOGGER_CLOUD_EVENT_ENCODING_CONFIG);
        boolean z = -1;
        switch (string.hashCode()) {
            case -1388966911:
                if (string.equals("binary")) {
                    z = false;
                    break;
                }
                break;
            case 185106769:
                if (string.equals("structured")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                this.dataContentType = "application/protobuf";
                return;
            case true:
                this.dataContentType = "application/json";
                return;
            default:
                throw new RuntimeException("unknown encoding " + string);
        }
    }
}
