package org.springframework.data.elasticsearch.client.reactive;

import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.data.elasticsearch.client.ElasticsearchHost;
import org.springframework.data.elasticsearch.client.NoReachableHostException;
import org.springframework.data.elasticsearch.client.reactive.HostProvider;
import org.springframework.lang.Nullable;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/springframework/data/elasticsearch/client/reactive/MultiNodeHostProvider.class */
public class MultiNodeHostProvider implements HostProvider<MultiNodeHostProvider> {
    private static final Log LOGGER = LogFactory.getLog(MultiNodeHostProvider.class);
    private final WebClientProvider clientProvider;
    private final Map<InetSocketAddress, ElasticsearchHost> hosts = new ConcurrentHashMap();

    /* JADX INFO: Access modifiers changed from: package-private */
    public MultiNodeHostProvider(WebClientProvider webClientProvider, InetSocketAddress... inetSocketAddressArr) {
        this.clientProvider = webClientProvider;
        for (InetSocketAddress inetSocketAddress : inetSocketAddressArr) {
            this.hosts.put(inetSocketAddress, new ElasticsearchHost(inetSocketAddress, ElasticsearchHost.State.UNKNOWN));
        }
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("initialized with " + this.hosts);
        }
    }

    @Override // org.springframework.data.elasticsearch.client.reactive.HostProvider
    public Mono<HostProvider.ClusterInformation> clusterInfo() {
        return checkNodes(null).map(this::updateNodeState).buffer(this.hosts.size()).then(Mono.just(new HostProvider.ClusterInformation(new LinkedHashSet(this.hosts.values()))));
    }

    @Override // org.springframework.data.elasticsearch.client.reactive.HostProvider
    public WebClient createWebClient(InetSocketAddress inetSocketAddress) {
        return this.clientProvider.get(inetSocketAddress);
    }

    @Override // org.springframework.data.elasticsearch.client.reactive.HostProvider
    public Mono<InetSocketAddress> lookupActiveHost(HostProvider.Verification verification) {
        if (LOGGER.isTraceEnabled()) {
            LOGGER.trace("lookupActiveHost " + verification + " from " + hosts());
        }
        if (HostProvider.Verification.LAZY.equals(verification)) {
            for (ElasticsearchHost elasticsearchHost : hosts()) {
                if (elasticsearchHost.isOnline()) {
                    if (LOGGER.isTraceEnabled()) {
                        LOGGER.trace("lookupActiveHost returning " + elasticsearchHost);
                    }
                    return Mono.just(elasticsearchHost.getEndpoint());
                }
            }
            if (LOGGER.isTraceEnabled()) {
                LOGGER.trace("no online host found with LAZY");
            }
        }
        if (LOGGER.isTraceEnabled()) {
            LOGGER.trace("searching for active host");
        }
        return findActiveHostInKnownActives().switchIfEmpty(findActiveHostInUnresolved()).switchIfEmpty(findActiveHostInDead()).switchIfEmpty(Mono.error(() -> {
            return new NoReachableHostException(new LinkedHashSet(getCachedHostState()));
        }));
    }

    Collection<ElasticsearchHost> getCachedHostState() {
        return this.hosts.values();
    }

    private Mono<InetSocketAddress> findActiveHostInKnownActives() {
        return findActiveForState(ElasticsearchHost.State.ONLINE);
    }

    private Mono<InetSocketAddress> findActiveHostInUnresolved() {
        return findActiveForState(ElasticsearchHost.State.UNKNOWN);
    }

    private Mono<InetSocketAddress> findActiveHostInDead() {
        return findActiveForState(ElasticsearchHost.State.OFFLINE);
    }

    private Mono<InetSocketAddress> findActiveForState(ElasticsearchHost.State state) {
        if (LOGGER.isTraceEnabled()) {
            LOGGER.trace("findActiveForState state " + state + ", current hosts: " + this.hosts);
        }
        return checkNodes(state).map(this::updateNodeState).filter((v0) -> {
            return v0.isOnline();
        }).map(elasticsearchHost -> {
            if (LOGGER.isTraceEnabled()) {
                LOGGER.trace("findActiveForState returning host " + elasticsearchHost);
            }
            return elasticsearchHost;
        }).map((v0) -> {
            return v0.getEndpoint();
        }).takeLast(1).next();
    }

    /* JADX WARN: Multi-variable type inference failed */
    private ElasticsearchHost updateNodeState(Tuple2<InetSocketAddress, ElasticsearchHost.State> tuple2) {
        ElasticsearchHost elasticsearchHost = new ElasticsearchHost((InetSocketAddress) tuple2.getT1(), (ElasticsearchHost.State) tuple2.getT2());
        this.hosts.put(tuple2.getT1(), elasticsearchHost);
        return elasticsearchHost;
    }

    private Flux<Tuple2<InetSocketAddress, ElasticsearchHost.State>> checkNodes(@Nullable ElasticsearchHost.State state) {
        if (LOGGER.isTraceEnabled()) {
            LOGGER.trace("checkNodes() with state " + state);
        }
        return Flux.fromIterable(hosts()).filter(elasticsearchHost -> {
            return state == null || elasticsearchHost.getState().equals(state);
        }).map((v0) -> {
            return v0.getEndpoint();
        }).concatMap(inetSocketAddress -> {
            if (LOGGER.isTraceEnabled()) {
                LOGGER.trace("checking host " + inetSocketAddress);
            }
            return Mono.just(inetSocketAddress).zipWith(createWebClient(inetSocketAddress).head().uri("/", new Object[0]).exchangeToMono((v0) -> {
                return Mono.just(v0);
            }).timeout(Duration.ofSeconds(1L)).doOnError(th -> {
                if (LOGGER.isTraceEnabled()) {
                    LOGGER.trace("error checking host " + inetSocketAddress + ", " + th.getMessage());
                }
                this.hosts.put(inetSocketAddress, new ElasticsearchHost(inetSocketAddress, ElasticsearchHost.State.OFFLINE));
                this.clientProvider.getErrorListener().accept(th);
            }).flatMap(clientResponse -> {
                return clientResponse.releaseBody().thenReturn(clientResponse.statusCode().isError() ? ElasticsearchHost.State.OFFLINE : ElasticsearchHost.State.ONLINE);
            }));
        }).map(tuple2 -> {
            if (LOGGER.isTraceEnabled()) {
                LOGGER.trace("check result " + tuple2);
            }
            return tuple2;
        }).onErrorContinue((th, obj) -> {
            this.clientProvider.getErrorListener().accept(th);
        });
    }

    private List<ElasticsearchHost> hosts() {
        ArrayList arrayList = new ArrayList(this.hosts.values());
        Collections.shuffle(arrayList);
        return arrayList;
    }
}
