/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.io.elasticsearch;

import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.client.api.schema.GenericObject;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.elasticsearch.ElasticSearchConfig;
import org.apache.pulsar.io.elasticsearch.ElasticSearchConnectionException;
import org.apache.pulsar.io.elasticsearch.IndexNameFormatter;
import org.apache.pulsar.io.elasticsearch.RandomExponentialRetry;
import org.apache.pulsar.io.elasticsearch.client.BulkProcessor;
import org.apache.pulsar.io.elasticsearch.client.RestClient;
import org.apache.pulsar.io.elasticsearch.client.RestClientFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ElasticSearchClient
implements AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger(ElasticSearchClient.class);
    static final String[] MALFORMED_ERRORS = new String[]{"mapper_parsing_exception", "action_request_validation_exception", "illegal_argument_exception"};
    private ElasticSearchConfig config;
    private RestClient client;
    private final RandomExponentialRetry backoffRetry;
    final Set<String> indexCache = new HashSet<String>();
    final Map<String, String> topicToIndexCache = new HashMap<String, String>();
    final AtomicReference<Exception> irrecoverableError = new AtomicReference();
    private final IndexNameFormatter indexNameFormatter;

    public ElasticSearchClient(ElasticSearchConfig elasticSearchConfig) {
        this.config = elasticSearchConfig;
        this.indexNameFormatter = this.config.getIndexName() != null ? new IndexNameFormatter(this.config.getIndexName()) : null;
        BulkProcessor.Listener bulkListener = new BulkProcessor.Listener(){

            @Override
            public void afterBulk(long executionId, List<BulkProcessor.BulkOperationRequest> bulkOperationList, List<BulkProcessor.BulkOperationResult> results) {
                if (log.isTraceEnabled()) {
                    log.trace("Bulk request id={} size={}:", (Object)executionId, (Object)bulkOperationList.size());
                }
                int index = 0;
                for (BulkProcessor.BulkOperationResult result : results) {
                    Record record = bulkOperationList.get(index++).getPulsarRecord();
                    if (result.isError()) {
                        record.fail();
                        ElasticSearchClient.this.checkForIrrecoverableError(record, result);
                        continue;
                    }
                    record.ack();
                }
            }

            @Override
            public void afterBulk(long executionId, List<BulkProcessor.BulkOperationRequest> bulkOperationList, Throwable throwable) {
                log.warn("Bulk request id={} failed:", (Object)executionId, (Object)throwable);
                for (BulkProcessor.BulkOperationRequest operation : bulkOperationList) {
                    Record record = operation.getPulsarRecord();
                    record.fail();
                }
            }
        };
        this.backoffRetry = new RandomExponentialRetry(elasticSearchConfig.getMaxRetryTimeInSec());
        this.client = this.retry(() -> RestClientFactory.createClient(this.config, bulkListener), -1, "client creation");
    }

    void failed(Exception e) {
        if (this.irrecoverableError.compareAndSet(null, e)) {
            log.error("Irrecoverable error:", (Throwable)e);
        }
    }

    boolean isFailed() {
        return this.irrecoverableError.get() != null;
    }

    void checkForIrrecoverableError(Record<?> record, BulkProcessor.BulkOperationResult result) {
        if (!result.isError()) {
            return;
        }
        String errorCause = result.getError();
        boolean isMalformed = false;
        block5: for (String error : MALFORMED_ERRORS) {
            if (!errorCause.contains(error)) continue;
            isMalformed = true;
            switch (this.config.getMalformedDocAction()) {
                case IGNORE: {
                    continue block5;
                }
                case WARN: {
                    log.warn("Ignoring malformed document index={} id={}", new Object[]{result.getIndex(), result.getDocumentId(), error});
                    continue block5;
                }
                case FAIL: {
                    log.error("Failure due to the malformed document index={} id={}", new Object[]{result.getIndex(), result.getDocumentId(), error});
                    this.failed(new Exception(error));
                }
            }
        }
        if (!isMalformed) {
            log.warn("Bulk request failed, message id=[{}] index={} error={}", new Object[]{record.getMessage().map(m -> m.getMessageId().toString()).orElse(""), result.getIndex(), result.getError()});
        }
    }

    public void bulkIndex(Record record, Pair<String, String> idAndDoc) throws Exception {
        try {
            this.checkNotFailed();
            this.checkIndexExists((Record<GenericObject>)record);
            String indexName = this.indexName((Record<GenericObject>)record);
            String documentId = (String)idAndDoc.getLeft();
            String documentSource = (String)idAndDoc.getRight();
            BulkProcessor.BulkIndexRequest bulkIndexRequest = BulkProcessor.BulkIndexRequest.builder().index(indexName).documentId(documentId).documentSource(documentSource).record(record).build();
            this.client.getBulkProcessor().appendIndexRequest(bulkIndexRequest);
        }
        catch (Exception e) {
            log.debug("index failed id=" + (String)idAndDoc.getLeft(), (Throwable)e);
            record.fail();
            throw e;
        }
    }

    public boolean indexDocument(Record<GenericObject> record, Pair<String, String> idAndDoc) throws Exception {
        try {
            this.checkNotFailed();
            this.checkIndexExists(record);
            String indexName = this.indexName(record);
            String documentId = (String)idAndDoc.getLeft();
            String documentSource = (String)idAndDoc.getRight();
            boolean createdOrUpdated = this.client.indexDocument(indexName, documentId, documentSource);
            if (createdOrUpdated) {
                record.ack();
            } else {
                record.fail();
            }
            return createdOrUpdated;
        }
        catch (Exception ex) {
            log.error("index failed id=" + (String)idAndDoc.getLeft(), (Throwable)ex);
            record.fail();
            throw ex;
        }
    }

    public void bulkDelete(Record<GenericObject> record, String id) throws Exception {
        try {
            this.checkNotFailed();
            this.checkIndexExists(record);
            String indexName = this.indexName(record);
            BulkProcessor.BulkDeleteRequest bulkDeleteRequest = BulkProcessor.BulkDeleteRequest.builder().index(indexName).documentId(id).record(record).build();
            this.client.getBulkProcessor().appendDeleteRequest(bulkDeleteRequest);
        }
        catch (Exception e) {
            log.debug("delete failed id: {}", (Object)id, (Object)e);
            record.fail();
            throw e;
        }
    }

    public boolean deleteDocument(Record<GenericObject> record, String id) throws Exception {
        try {
            this.checkNotFailed();
            this.checkIndexExists(record);
            String indexName = this.indexName(record);
            boolean deleted = this.client.deleteDocument(indexName, id);
            if (deleted) {
                record.ack();
            } else {
                record.fail();
            }
            return deleted;
        }
        catch (Exception ex) {
            log.debug("index failed id: {}", (Object)id, (Object)ex);
            record.fail();
            throw ex;
        }
    }

    public void flush() {
        this.client.getBulkProcessor().flush();
    }

    @Override
    public void close() {
        if (this.client != null) {
            this.client.close();
            this.client = null;
        }
    }

    @VisibleForTesting
    void setClient(RestClient client) {
        this.client = client;
    }

    private void checkNotFailed() throws Exception {
        if (this.irrecoverableError.get() != null) {
            throw this.irrecoverableError.get();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void checkIndexExists(Record<GenericObject> record) throws IOException {
        if (!this.config.isCreateIndexIfNeeded()) {
            return;
        }
        String indexName = this.indexName(record);
        if (!this.indexCache.contains(indexName)) {
            ElasticSearchClient elasticSearchClient = this;
            synchronized (elasticSearchClient) {
                if (!this.indexCache.contains(indexName)) {
                    this.createIndexIfNeeded(indexName);
                    this.indexCache.add(indexName);
                }
            }
        }
    }

    String indexName(Record<GenericObject> record) throws IOException {
        if (this.indexNameFormatter != null) {
            return this.indexNameFormatter.indexName(record);
        }
        if (!record.getTopicName().isPresent()) {
            throw new IOException("Elasticsearch index name configuration and topic name are empty");
        }
        return this.topicToIndexName((String)record.getTopicName().get());
    }

    @VisibleForTesting
    public String topicToIndexName(String topicName) {
        return this.topicToIndexCache.computeIfAbsent(topicName, k -> {
            String indexName = topicName.toLowerCase(Locale.ROOT);
            String[] parts = indexName.split("/");
            if (parts.length > 1) {
                indexName = parts[parts.length - 1];
            }
            while (indexName.getBytes(StandardCharsets.UTF_8).length > 255) {
                indexName = indexName.substring(0, indexName.length() - 1);
            }
            if (indexName.length() <= 0 || !indexName.matches("[a-zA-Z\\.0-9][a-zA-Z_\\.\\-\\+0-9]*")) {
                throw new RuntimeException(new IOException("Cannot convert the topic name='" + topicName + "' to a valid elasticsearch index name"));
            }
            if (log.isDebugEnabled()) {
                log.debug("Translate topic={} to index={}", k, (Object)indexName);
            }
            return indexName;
        });
    }

    @VisibleForTesting
    public boolean createIndexIfNeeded(String indexName) {
        if (this.indexExists(indexName)) {
            return false;
        }
        return this.retry(() -> this.client.createIndex(indexName), "create index");
    }

    public boolean indexExists(String indexName) {
        return this.retry(() -> this.client.indexExists(indexName), "index exists");
    }

    private <T> T retry(Callable<T> callable, String source) {
        return this.retry(callable, this.config.getMaxRetries(), source);
    }

    private <T> T retry(Callable<T> callable, int maxRetries, String source) {
        try {
            return this.backoffRetry.retry(callable, maxRetries, this.config.getRetryBackoffInMs(), source);
        }
        catch (Exception e) {
            log.error("error in command {} wth retry", (Object)source, (Object)e);
            throw new ElasticSearchConnectionException(source + " failed", e);
        }
    }

    RestClient getRestClient() {
        return this.client;
    }
}

