package com.hazelcast.jet.elastic.impl;

import com.hazelcast.function.FunctionEx;
import com.hazelcast.jet.JetException;
import com.hazelcast.jet.Traverser;
import com.hazelcast.jet.Traversers;
import com.hazelcast.jet.core.AbstractProcessor;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.logging.ILogger;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import org.apache.http.HttpHost;
import org.apache.lucene.search.TotalHits;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.ClearScrollResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.client.Node;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.slice.SliceBuilder;

/* loaded from: input_file:com/hazelcast/jet/elastic/impl/ElasticSourceP.class */
final class ElasticSourceP<T> extends AbstractProcessor {
    private final ElasticSourceConfiguration<T> configuration;
    private final List<Shard> shards;
    private RestHighLevelClient client;
    private ILogger logger;
    private Traverser<T> traverser;
    private ElasticScrollTraverser scrollTraverser;

    /* loaded from: input_file:com/hazelcast/jet/elastic/impl/ElasticSourceP$ElasticScrollTraverser.class */
    static class ElasticScrollTraverser implements Traverser<SearchHit> {
        private final ILogger logger;
        private final RestHighLevelClient client;
        private final FunctionEx<? super ActionRequest, RequestOptions> optionsFn;
        private final String scrollKeepAlive;
        private final int retries;
        private SearchHits hits;
        private int nextHit;
        private String scrollId;

        ElasticScrollTraverser(ElasticSourceConfiguration<?> elasticSourceConfiguration, RestHighLevelClient restHighLevelClient, SearchRequest searchRequest, ILogger iLogger) {
            this.client = restHighLevelClient;
            this.optionsFn = elasticSourceConfiguration.optionsFn();
            this.scrollKeepAlive = elasticSourceConfiguration.scrollKeepAlive();
            this.retries = elasticSourceConfiguration.retries();
            this.logger = iLogger;
            try {
                RequestOptions requestOptions = (RequestOptions) this.optionsFn.apply(searchRequest);
                SearchResponse searchResponse = (SearchResponse) RetryUtils.withRetry(() -> {
                    return restHighLevelClient.search(searchRequest, requestOptions);
                }, this.retries);
                this.hits = (SearchHits) Objects.requireNonNull(searchResponse.getHits(), "null hits in the response");
                this.scrollId = searchResponse.getScrollId();
                if (this.scrollId == null && this.hits.getHits().length > 0) {
                    throw new IllegalStateException("Unexpected response: returned scrollId is null, but hits.length is not zero (" + this.hits.getHits().length + "). Please file a bug.");
                }
                TotalHits totalHits = this.hits.getTotalHits();
                if (totalHits != null) {
                    iLogger.fine("Initialized scroll with scrollId " + this.scrollId + ", total results " + totalHits.relation + ", " + totalHits.value);
                }
            } catch (Exception e) {
                throw new JetException("Could not execute SearchRequest to Elastic", e);
            }
        }

        /* renamed from: next, reason: merged with bridge method [inline-methods] */
        public SearchHit m1next() {
            if (this.hits.getHits().length == 0) {
                this.scrollId = null;
                return null;
            }
            if (this.nextHit >= this.hits.getHits().length) {
                try {
                    SearchScrollRequest searchScrollRequest = new SearchScrollRequest(this.scrollId);
                    searchScrollRequest.scroll(this.scrollKeepAlive);
                    this.hits = ((SearchResponse) RetryUtils.withRetry(() -> {
                        return this.client.scroll(searchScrollRequest, (RequestOptions) this.optionsFn.apply(searchScrollRequest));
                    }, this.retries)).getHits();
                    if (this.hits.getHits().length == 0) {
                        return null;
                    }
                    this.nextHit = 0;
                } catch (Exception e) {
                    throw new JetException("Could not execute SearchScrollRequest to Elastic", e);
                }
            }
            SearchHits searchHits = this.hits;
            int i = this.nextHit;
            this.nextHit = i + 1;
            return searchHits.getAt(i);
        }

        public void close() {
            if (this.scrollId != null) {
                clearScroll(this.scrollId);
                this.scrollId = null;
            }
        }

        private void clearScroll(String str) {
            ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
            clearScrollRequest.addScrollId(str);
            try {
                ClearScrollResponse clearScrollResponse = (ClearScrollResponse) RetryUtils.withRetry(() -> {
                    return this.client.clearScroll(clearScrollRequest, (RequestOptions) this.optionsFn.apply(clearScrollRequest));
                }, this.retries);
                if (clearScrollResponse.isSucceeded()) {
                    this.logger.fine("Succeeded clearing " + clearScrollResponse.getNumFreed() + " scrolls");
                } else {
                    this.logger.warning("Clearing scroll " + str + " failed");
                }
            } catch (Exception e) {
                this.logger.fine("Could not clear scroll with scrollId=" + str, e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ElasticSourceP(ElasticSourceConfiguration<T> elasticSourceConfiguration, List<Shard> list) {
        this.configuration = elasticSourceConfiguration;
        this.shards = list;
    }

    protected void init(@Nonnull Processor.Context context) throws Exception {
        super.init(context);
        this.logger = context.logger();
        this.logger.fine("init");
        this.client = (RestHighLevelClient) this.configuration.clientFn().get();
        SearchRequest searchRequest = (SearchRequest) this.configuration.searchRequestFn().get();
        searchRequest.scroll(this.configuration.scrollKeepAlive());
        if (this.configuration.isSlicingEnabled()) {
            if (this.configuration.isCoLocatedReadingEnabled()) {
                int localProcessorIndex = context.localProcessorIndex();
                int localParallelism = context.localParallelism();
                if (localParallelism > 1) {
                    this.logger.fine("Slice id=" + localProcessorIndex + ", max=" + localParallelism);
                    searchRequest.source().slice(new SliceBuilder(localProcessorIndex, localParallelism));
                }
            } else {
                int globalProcessorIndex = context.globalProcessorIndex();
                int i = context.totalParallelism();
                if (i > 1) {
                    this.logger.fine("Slice id=" + globalProcessorIndex + ", max=" + i);
                    searchRequest.source().slice(new SliceBuilder(globalProcessorIndex, i));
                }
            }
        }
        if (this.configuration.isCoLocatedReadingEnabled()) {
            this.logger.fine("Assigned shards: " + this.shards);
            if (this.shards.isEmpty()) {
                this.traverser = Traversers.empty();
                return;
            } else {
                this.client.getLowLevelClient().setNodes(Collections.singleton(createLocalElasticNode()));
                searchRequest.preference("_shards:" + ((String) this.shards.stream().map(shard -> {
                    return String.valueOf(shard.getShard());
                }).collect(Collectors.joining(","))) + "|_only_local");
            }
        }
        this.scrollTraverser = new ElasticScrollTraverser(this.configuration, this.client, searchRequest, this.logger);
        this.traverser = this.scrollTraverser.map(this.configuration.mapToItemFn());
    }

    private Node createLocalElasticNode() {
        List list = (List) this.shards.stream().map((v0) -> {
            return v0.getHttpAddress();
        }).distinct().collect(Collectors.toList());
        if (list.size() != 1) {
            throw new JetException("Should receive shards from single local node, got: " + list);
        }
        return new Node(HttpHost.create((String) list.get(0)));
    }

    public boolean isCooperative() {
        return false;
    }

    public boolean complete() {
        return emitFromTraverser(this.traverser);
    }

    public void close() throws Exception {
        if (this.scrollTraverser != null) {
            this.scrollTraverser.close();
        }
        try {
            this.client.close();
        } catch (Exception e) {
            this.logger.fine("Could not close client", e);
        }
    }
}
