package org.apache.seatunnel.flink.elasticsearch.sink;

import java.net.InetAddress;
import java.util.List;
import org.apache.flink.api.common.io.RichOutputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigValue;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/flink/elasticsearch/sink/ElasticsearchOutputFormat.class */
public class ElasticsearchOutputFormat<T> extends RichOutputFormat<T> {
    private static final long serialVersionUID = 2048590860723433896L;
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) ElasticsearchOutputFormat.class);
    private final Config config;
    private static final String PREFIX = "es.";
    private final ElasticsearchSinkFunction<T> elasticsearchSinkFunction;
    private transient RequestIndexer requestIndexer;
    private transient BulkProcessor bulkProcessor;

    public ElasticsearchOutputFormat(Config config, ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
        this.config = config;
        this.elasticsearchSinkFunction = elasticsearchSinkFunction;
    }

    public void configure(Configuration configuration) {
        List<String> stringList = this.config.getStringList(org.apache.seatunnel.flink.elasticsearch.config.Config.HOSTS);
        Settings.Builder builder = Settings.builder();
        this.config.entrySet().forEach(entry -> {
            String str = (String) entry.getKey();
            Object unwrapped = ((ConfigValue) entry.getValue()).unwrapped();
            if (str.startsWith(PREFIX)) {
                builder.put(str.substring(PREFIX.length()), unwrapped.toString());
            }
        });
        PreBuiltTransportClient preBuiltTransportClient = new PreBuiltTransportClient(builder.build(), (Class<? extends Plugin>[]) new Class[0]);
        for (String str : stringList) {
            try {
                preBuiltTransportClient.addTransportAddresses(new TransportAddress(InetAddress.getByName(str.split(ParameterizedMessage.ERROR_MSG_SEPARATOR)[0]), Integer.parseInt(str.split(ParameterizedMessage.ERROR_MSG_SEPARATOR)[1])));
            } catch (Exception e) {
                LOGGER.warn("Host '{}' parse failed.", str, e);
            }
        }
        this.bulkProcessor = BulkProcessor.builder(preBuiltTransportClient, new BulkProcessor.Listener() { // from class: org.apache.seatunnel.flink.elasticsearch.sink.ElasticsearchOutputFormat.1
            @Override // org.elasticsearch.action.bulk.BulkProcessor.Listener
            public void beforeBulk(long j, BulkRequest bulkRequest) {
            }

            @Override // org.elasticsearch.action.bulk.BulkProcessor.Listener
            public void afterBulk(long j, BulkRequest bulkRequest, BulkResponse bulkResponse) {
            }

            @Override // org.elasticsearch.action.bulk.BulkProcessor.Listener
            public void afterBulk(long j, BulkRequest bulkRequest, Throwable th) {
            }
        }).build();
        this.requestIndexer = new RequestIndexer() { // from class: org.apache.seatunnel.flink.elasticsearch.sink.ElasticsearchOutputFormat.2
            @Override // org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer
            public void add(DeleteRequest... deleteRequestArr) {
            }

            @Override // org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer
            public void add(IndexRequest... indexRequestArr) {
                for (IndexRequest indexRequest : indexRequestArr) {
                    ElasticsearchOutputFormat.this.bulkProcessor.add(indexRequest);
                }
            }

            @Override // org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer
            public void add(UpdateRequest... updateRequestArr) {
            }
        };
    }

    public void open(int i, int i2) {
    }

    public void writeRecord(T t) {
        this.elasticsearchSinkFunction.process(t, getRuntimeContext(), this.requestIndexer);
    }

    public void close() {
        this.bulkProcessor.flush();
    }
}
