package org.apache.seatunnel.flink.sink;

import java.util.ArrayList;
import java.util.HashMap;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.operators.DataSink;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
import org.apache.flink.types.Row;
import org.apache.http.HttpHost;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.utils.StringTemplate;
import org.apache.seatunnel.flink.FlinkEnvironment;
import org.apache.seatunnel.flink.batch.FlinkBatchSink;
import org.apache.seatunnel.flink.stream.FlinkStreamSink;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigMergeable;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;

/* loaded from: input_file:org/apache/seatunnel/flink/sink/Elasticsearch.class */
public class Elasticsearch implements FlinkStreamSink<Row, Row>, FlinkBatchSink<Row, Row> {
    private static final long serialVersionUID = 8445868321245456793L;
    private static final int DEFAULT_CONFIG_SIZE = 3;
    private static final String PARALLELISM = "parallelism";
    private Config config;
    private String indexName;

    @Override // org.apache.seatunnel.plugin.Plugin
    public void setConfig(Config config) {
        this.config = config;
    }

    @Override // org.apache.seatunnel.plugin.Plugin
    public Config getConfig() {
        return this.config;
    }

    @Override // org.apache.seatunnel.plugin.Plugin
    public CheckResult checkConfig() {
        return CheckConfigUtil.checkAllExists(this.config, "hosts");
    }

    @Override // org.apache.seatunnel.plugin.Plugin
    public void prepare(FlinkEnvironment flinkEnvironment) {
        this.config = this.config.withFallback((ConfigMergeable) ConfigFactory.parseMap(new HashMap<String, String>(3) { // from class: org.apache.seatunnel.flink.sink.Elasticsearch.1
            {
                put("index", "seatunnel");
                put("index_type", "log");
                put("index_time_format", "yyyy.MM.dd");
            }
        }));
    }

    @Override // org.apache.seatunnel.flink.stream.FlinkStreamSink
    public DataStreamSink<Row> outputStream(FlinkEnvironment flinkEnvironment, DataStream<Row> dataStream) {
        ArrayList arrayList = new ArrayList();
        for (String str : this.config.getStringList("hosts")) {
            arrayList.add(new HttpHost(str.split(ParameterizedMessage.ERROR_MSG_SEPARATOR)[0], Integer.parseInt(str.split(ParameterizedMessage.ERROR_MSG_SEPARATOR)[1]), HttpHost.DEFAULT_SCHEME_NAME));
        }
        final String[] fieldNames = dataStream.getType().getFieldNames();
        this.indexName = StringTemplate.substitute(this.config.getString("index"), this.config.getString("index_time_format"));
        ElasticsearchSink.Builder builder = new ElasticsearchSink.Builder(arrayList, new ElasticsearchSinkFunction<Row>() { // from class: org.apache.seatunnel.flink.sink.Elasticsearch.2
            /* JADX WARN: Multi-variable type inference failed */
            public IndexRequest createIndexRequest(Row row) {
                int arity = row.getArity();
                HashMap hashMap = new HashMap(arity);
                for (int i = 0; i < arity; i++) {
                    hashMap.put(fieldNames[i], row.getField(i));
                }
                return ((IndexRequest) Requests.indexRequest().index(Elasticsearch.this.indexName)).type(Elasticsearch.this.config.getString("index_type")).source(hashMap);
            }

            @Override // org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction
            public void process(Row row, RuntimeContext runtimeContext, RequestIndexer requestIndexer) {
                requestIndexer.add(createIndexRequest(row));
            }
        });
        builder.setBulkFlushMaxActions(1);
        if (!this.config.hasPath(PARALLELISM)) {
            return dataStream.addSink(builder.build());
        }
        return dataStream.addSink(builder.build()).setParallelism(this.config.getInt(PARALLELISM));
    }

    @Override // org.apache.seatunnel.flink.batch.FlinkBatchSink
    public DataSink<Row> outputBatch(FlinkEnvironment flinkEnvironment, DataSet<Row> dataSet) {
        final String[] fieldNames = dataSet.getType().getFieldNames();
        this.indexName = StringTemplate.substitute(this.config.getString("index"), this.config.getString("index_time_format"));
        DataSink<Row> output = dataSet.output(new ElasticsearchOutputFormat(this.config, new ElasticsearchSinkFunction<Row>() { // from class: org.apache.seatunnel.flink.sink.Elasticsearch.3
            @Override // org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction
            public void process(Row row, RuntimeContext runtimeContext, RequestIndexer requestIndexer) {
                requestIndexer.add(createIndexRequest(row));
            }

            /* JADX WARN: Multi-variable type inference failed */
            private IndexRequest createIndexRequest(Row row) {
                int arity = row.getArity();
                HashMap hashMap = new HashMap(arity);
                for (int i = 0; i < arity; i++) {
                    hashMap.put(fieldNames[i], row.getField(i));
                }
                return ((IndexRequest) Requests.indexRequest().index(Elasticsearch.this.indexName)).type(Elasticsearch.this.config.getString("index_type")).source(hashMap);
            }
        }));
        return this.config.hasPath(PARALLELISM) ? output.setParallelism(this.config.getInt(PARALLELISM)) : output;
    }
}
