package io.confluent.connect.elasticsearch;

import io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.sink.ErrantRecordReporter;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.elasticsearch.action.DocWriteRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/connect/elasticsearch/ElasticsearchSinkTask.class */
public class ElasticsearchSinkTask extends SinkTask {
    private static final Logger log = LoggerFactory.getLogger(ElasticsearchSinkTask.class);
    private DataConverter converter;
    private ElasticsearchClient client;
    private ElasticsearchSinkConnectorConfig config;
    private ErrantRecordReporter reporter;
    private Set<String> existingMappings;
    private Set<String> indexCache;

    public void start(Map<String, String> map) {
        start(map, null);
    }

    protected void start(Map<String, String> map, ElasticsearchClient elasticsearchClient) {
        log.info("Starting ElasticsearchSinkTask.");
        this.config = new ElasticsearchSinkConnectorConfig(map);
        this.converter = new DataConverter(this.config);
        this.existingMappings = new HashSet();
        this.indexCache = new HashSet();
        this.reporter = null;
        try {
            if (this.context.errantRecordReporter() == null) {
                log.info("Errant record reporter not configured.");
            }
            this.reporter = this.context.errantRecordReporter();
        } catch (NoClassDefFoundError | NoSuchMethodError e) {
            log.warn("AK versions prior to 2.6 do not support the errant record reporter.");
        }
        this.client = elasticsearchClient != null ? elasticsearchClient : new ElasticsearchClient(this.config, this.reporter);
        log.info("Started ElasticsearchSinkTask.");
    }

    public void put(Collection<SinkRecord> collection) throws ConnectException {
        log.debug("Putting {} records to Elasticsearch.", Integer.valueOf(collection.size()));
        for (SinkRecord sinkRecord : collection) {
            if (shouldSkipRecord(sinkRecord)) {
                logTrace("Ignoring {} with null value.", sinkRecord);
                reportBadRecord(sinkRecord, new ConnectException("Cannot write null valued record."));
            } else {
                logTrace("Writing {} to Elasticsearch.", sinkRecord);
                ensureIndexExists(convertTopicToIndexName(sinkRecord.topic()));
                checkMapping(sinkRecord);
                tryWriteRecord(sinkRecord);
            }
        }
    }

    public void flush(Map<TopicPartition, OffsetAndMetadata> map) {
        log.debug("Flushing data to Elasticsearch with the following offsets: {}", map);
        try {
            this.client.flush();
        } catch (IllegalStateException e) {
            log.debug("Tried to flush data to Elasticsearch, but BulkProcessor is already closed.");
        }
    }

    public void stop() {
        log.debug("Stopping Elasticsearch client.");
        this.client.close();
    }

    public String version() {
        return Version.getVersion();
    }

    private void checkMapping(SinkRecord sinkRecord) {
        String convertTopicToIndexName = convertTopicToIndexName(sinkRecord.topic());
        if (this.config.shouldIgnoreSchema(sinkRecord.topic()) || this.existingMappings.contains(convertTopicToIndexName)) {
            return;
        }
        if (!this.client.hasMapping(convertTopicToIndexName)) {
            this.client.createMapping(convertTopicToIndexName, sinkRecord.valueSchema());
        }
        log.debug("Caching mapping for index '{}' locally.", convertTopicToIndexName);
        this.existingMappings.add(convertTopicToIndexName);
    }

    private String convertTopicToIndexName(String str) {
        String lowerCase = str.toLowerCase();
        if (lowerCase.length() > 255) {
            lowerCase = lowerCase.substring(0, 255);
        }
        if (lowerCase.startsWith("-") || lowerCase.startsWith("_")) {
            lowerCase = lowerCase.substring(1);
        }
        if (lowerCase.equals(".") || lowerCase.equals("..")) {
            lowerCase = lowerCase.replace(".", "dot");
            log.warn("Elasticsearch cannot have indices named {}. Index will be named {}.", str, lowerCase);
        }
        if (!str.equals(lowerCase)) {
            log.trace("Topic '{}' was translated to index '{}'.", str, lowerCase);
        }
        return lowerCase;
    }

    private void ensureIndexExists(String str) {
        if (this.indexCache.contains(str)) {
            return;
        }
        log.info("Creating index {}.", str);
        this.client.createIndex(str);
        this.indexCache.add(str);
    }

    private void logTrace(String str, SinkRecord sinkRecord) {
        if (log.isTraceEnabled()) {
            log.trace(str, recordString(sinkRecord));
        }
    }

    private void reportBadRecord(SinkRecord sinkRecord, Throwable th) {
        if (this.reporter != null) {
            this.reporter.report(sinkRecord, th);
        }
    }

    private boolean shouldSkipRecord(SinkRecord sinkRecord) {
        return sinkRecord.value() == null && this.config.behaviorOnNullValues() == ElasticsearchSinkConnectorConfig.BehaviorOnNullValues.IGNORE;
    }

    private void tryWriteRecord(SinkRecord sinkRecord) {
        DocWriteRequest<?> docWriteRequest = null;
        try {
            docWriteRequest = this.converter.convertRecord(sinkRecord, convertTopicToIndexName(sinkRecord.topic()));
        } catch (DataException e) {
            reportBadRecord(sinkRecord, e);
            if (!this.config.dropInvalidMessage()) {
                throw e;
            }
            log.error("Can't convert {}.", recordString(sinkRecord), e);
        }
        if (docWriteRequest != null) {
            log.trace("Adding {} to bulk processor.", recordString(sinkRecord));
            this.client.index(sinkRecord, docWriteRequest);
        }
    }

    private static String recordString(SinkRecord sinkRecord) {
        return String.format("record from topic=%s partition=%s offset=%s", sinkRecord.topic(), sinkRecord.kafkaPartition(), Long.valueOf(sinkRecord.kafkaOffset()));
    }
}
