package mydataharbor.sink;

import java.io.IOException;
import java.net.ConnectException;
import java.util.List;
import mydataharbor.IDataSink;
import mydataharbor.elasticsearch.common.sink.ElasticsearchSinkConfig;
import mydataharbor.elasticsearch.common.sink.ElasticsearchSinkReq;
import mydataharbor.exception.ResetException;
import mydataharbor.setting.BaseSettingContext;
import mydataharbor.sink.es.IEsClient;
import mydataharbor.sink.exception.EsException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:mydataharbor/sink/AbstractEsSink.class */
public abstract class AbstractEsSink implements IDataSink<ElasticsearchSinkReq, BaseSettingContext> {
    private static final Logger log = LoggerFactory.getLogger(AbstractEsSink.class);
    private IEsClient esClient;

    public AbstractEsSink(ElasticsearchSinkConfig elasticsearchSinkConfig) {
        IEsClient initEsClient = initEsClient(elasticsearchSinkConfig);
        this.esClient = initEsClient;
        if (!elasticsearchSinkConfig.getWriteIndexConfig().isAutoCreate() || initEsClient.checkIndexExist(elasticsearchSinkConfig.getWriteIndexConfig().getIndexName())) {
            return;
        }
        synchronized (AbstractEsSink.class) {
            if (!initEsClient.checkIndexExist(elasticsearchSinkConfig.getWriteIndexConfig().getIndexName())) {
                initEsClient.createIndex(elasticsearchSinkConfig.getWriteIndexConfig().getIndexName(), elasticsearchSinkConfig.getWriteIndexConfig().getSettings(), elasticsearchSinkConfig.getWriteIndexConfig().getMapping());
            }
        }
    }

    public abstract IEsClient initEsClient(ElasticsearchSinkConfig elasticsearchSinkConfig);

    public IDataSink.WriterResult write(ElasticsearchSinkReq elasticsearchSinkReq, BaseSettingContext baseSettingContext) throws ResetException {
        IDataSink.WriterResult.WriterResultBuilder builder = IDataSink.WriterResult.builder();
        try {
            Object write = this.esClient.write(elasticsearchSinkReq);
            log.info("写入结果：{}", write);
            builder.writeReturn(write);
            return builder.success(true).commit(true).msg("ok").build();
        } catch (Exception e) {
            if (e instanceof ConnectException) {
                throw new ResetException("连接异常", e);
            }
            throw new EsException("请求es发生异常", e);
        }
    }

    public IDataSink.WriterResult write(List<ElasticsearchSinkReq> list, BaseSettingContext baseSettingContext) throws ResetException {
        IDataSink.WriterResult.WriterResultBuilder builder = IDataSink.WriterResult.builder();
        try {
            builder.writeReturn(this.esClient.batchWrite(list));
            return builder.success(true).commit(true).msg("ok").build();
        } catch (Exception e) {
            log.error("写入es发生异常！", e);
            if (e instanceof ConnectException) {
                throw new ResetException("写入es发生异常！", e);
            }
            throw new EsException("", e);
        }
    }

    public void close() throws IOException {
        if (this.esClient != null) {
            this.esClient.close();
        }
    }
}
