package com.ontotext.raft;

import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.protobuf.Message;
import com.ontotext.graphdb.Config;
import com.ontotext.graphdb.raft.node.ClusterFactory;
import com.ontotext.raft.config.ClusterConfig;
import com.ontotext.raft.config.ClusterConfigService;
import common.GraphDBMDCExecutorBuilder;
import io.grpc.ManagedChannel;
import io.grpc.stub.AbstractStub;
import io.grpc.stub.StreamObserver;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.commons.lang3.tuple.Pair;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/ontotext/raft/RpcMulticastService.class */
public class RpcMulticastService<T extends AbstractStub<T>> {
    private Logger logger;
    private Function<ManagedChannel, T> targetService;
    private ExecutorService executorService;
    private Supplier<ClusterConfigService> clusterConfigServiceProvider;

    /* loaded from: input_file:com/ontotext/raft/RpcMulticastService$Builder.class */
    public static class Builder {
        public static final String DEFAULT_THREAD_NAME = "rpc-multicast-%d";
        private String threadPattern;
        private ExecutorService executorService;
        private Logger logger;
        private Supplier<ClusterConfigService> clusterConfigServiceProvider;

        public Builder setThreadPattern(String str) {
            this.threadPattern = str;
            return this;
        }

        public Builder setExecutorService(ExecutorService executorService) {
            this.executorService = executorService;
            return this;
        }

        public Builder setLogger(Logger logger) {
            this.logger = logger;
            return this;
        }

        public Builder setClusterConfigServiceProvider(Supplier<ClusterConfigService> supplier) {
            this.clusterConfigServiceProvider = supplier;
            return this;
        }

        public <T extends AbstractStub<T>> RpcMulticastService<T> build(Function<ManagedChannel, T> function) {
            return new RpcMulticastService<>(function, this);
        }

        Logger buildLogger() {
            return this.logger != null ? this.logger : LoggerFactory.getLogger(RpcMulticastService.class);
        }

        ExecutorService buildExecutorService() {
            if (this.executorService != null) {
                return this.executorService;
            }
            ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue(), new ThreadFactoryBuilder().setDaemon(true).setPriority(4).setNameFormat(Objects.toString(this.threadPattern, DEFAULT_THREAD_NAME)).build());
            threadPoolExecutor.allowCoreThreadTimeOut(true);
            this.executorService = threadPoolExecutor;
            return threadPoolExecutor;
        }

        Supplier<ClusterConfigService> buildClusterConfigServiceProvider() {
            return this.clusterConfigServiceProvider != null ? this.clusterConfigServiceProvider : ClusterConfigService::defaultService;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/ontotext/raft/RpcMulticastService$ClosingStreamObserver.class */
    public static class ClosingStreamObserver<M> implements StreamObserver<M> {
        private final ManagedChannel managedChannel;
        private final StreamObserver<M> delegate;

        private ClosingStreamObserver(ManagedChannel managedChannel, StreamObserver<M> streamObserver) {
            this.managedChannel = managedChannel;
            this.delegate = streamObserver;
        }

        public void onNext(M m) {
            this.delegate.onNext(m);
        }

        public void onError(Throwable th) {
            try {
                this.delegate.onError(th);
            } finally {
                this.managedChannel.shutdown();
            }
        }

        public void onCompleted() {
            try {
                this.delegate.onCompleted();
            } finally {
                this.managedChannel.shutdown();
            }
        }
    }

    private RpcMulticastService(Function<ManagedChannel, T> function, Builder builder) {
        this.targetService = function;
        this.logger = builder.buildLogger();
        this.executorService = GraphDBMDCExecutorBuilder.build(builder.buildExecutorService());
        this.clusterConfigServiceProvider = builder.buildClusterConfigServiceProvider();
    }

    public static Builder builder() {
        return new Builder();
    }

    public <R, E> E callPeersBlocking(Function<T, ListenableFuture<R>> function, Function<Map<String, ListenableFuture<R>>, E> function2) {
        Map<String, ListenableFuture<R>> callPeers = callPeers(function);
        return (E) callWhenAllFuturesAreDone(callPeers.values(), () -> {
            return function2.apply(callPeers);
        });
    }

    @Nullable
    public <R, E> E callPeersBlocking(List<String> list, Function<T, ListenableFuture<R>> function, Function<Map<String, ListenableFuture<R>>, E> function2) {
        Map<String, ListenableFuture<R>> callPeers = callPeers(list, function);
        return (E) callWhenAllFuturesAreDone(callPeers.values(), () -> {
            return function2.apply(callPeers);
        });
    }

    @NotNull
    public <R> Map<String, ListenableFuture<R>> callPeers(Function<T, ListenableFuture<R>> function) {
        return callPeers((List) getClusterConfig().streamRpcNodes().collect(Collectors.toList()), function);
    }

    @NotNull
    public <R> Map<String, ListenableFuture<R>> callPeers(List<String> list, Function<T, ListenableFuture<R>> function) {
        ArrayList arrayList = new ArrayList(list);
        arrayList.remove(getCurrentRPCAddress());
        return callNodes(arrayList, function);
    }

    @NotNull
    public <R> Map<String, ListenableFuture<R>> callNodes(List<String> list, Function<T, ListenableFuture<R>> function) {
        LinkedHashMap linkedHashMap = new LinkedHashMap(list.size());
        for (String str : list) {
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("{} contacting {}", getCurrentRPCAddress(), str);
            }
            this.logger.info("{} contacting {}", getCurrentRPCAddress(), str);
            ManagedChannel build = ClusterFactory.createChannelTo(str).executor(this.executorService).build();
            ListenableFuture<R> apply = function.apply(this.targetService.apply(build));
            Objects.requireNonNull(build);
            apply.addListener(build::shutdown, this.executorService);
            linkedHashMap.put(str, apply);
        }
        return linkedHashMap;
    }

    @Nullable
    public <R extends Message, M extends Message> Map<String, Pair<List<R>, Throwable>> callPeersBlocking(Function<T, Function<StreamObserver<R>, StreamObserver<M>>> function, Iterable<M> iterable) {
        return callPeersBlocking((List<String>) getClusterConfig().streamRpcNodes().collect(Collectors.toList()), iterable, function);
    }

    @Nullable
    public <R extends Message, M extends Message> Map<String, Pair<List<R>, Throwable>> callPeersBlocking(List<String> list, Iterable<M> iterable, Function<T, Function<StreamObserver<R>, StreamObserver<M>>> function) {
        return collectStreamResults(callPeers(list, iterable, function, CollectingObserver::new));
    }

    public <R extends Message> Map<String, Pair<List<R>, Throwable>> collectStreamResults(Map<String, CollectingObserver<R>> map) {
        LinkedList linkedList = new LinkedList(map.values());
        while (!linkedList.isEmpty()) {
            linkedList.removeIf(collectingObserver -> {
                try {
                    return collectingObserver.waitToFinish(1L, TimeUnit.SECONDS);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    return true;
                }
            });
        }
        return (Map) map.entrySet().stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry -> {
            return Pair.of(((CollectingObserver) entry.getValue()).getMessages(), ((CollectingObserver) entry.getValue()).getError());
        }));
    }

    @NotNull
    public <R extends Message, M extends Message, O extends StreamObserver<R>> Map<String, O> callPeers(Function<T, Function<StreamObserver<R>, StreamObserver<M>>> function, Iterable<M> iterable, Supplier<O> supplier) {
        return callPeers((List) getClusterConfig().streamRpcNodes().collect(Collectors.toList()), iterable, function, supplier);
    }

    @NotNull
    public <R extends Message, M extends Message, O extends StreamObserver<R>> Map<String, O> callPeers(List<String> list, Iterable<M> iterable, Function<T, Function<StreamObserver<R>, StreamObserver<M>>> function, Supplier<O> supplier) {
        ArrayList<String> arrayList = new ArrayList(list);
        arrayList.remove(getCurrentRPCAddress());
        LinkedHashMap linkedHashMap = new LinkedHashMap(arrayList.size());
        for (String str : arrayList) {
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("{} contacting {}", getCurrentRPCAddress(), str);
            }
            ManagedChannel build = ClusterFactory.createChannelTo(str).executor(this.executorService).build();
            Function<StreamObserver<R>, StreamObserver<M>> apply = function.apply(this.targetService.apply(build));
            O o = supplier.get();
            StreamObserver<M> streamObserver = null;
            try {
                streamObserver = apply.apply(new ClosingStreamObserver<>(build, o));
                Objects.requireNonNull(streamObserver);
                iterable.forEach((v1) -> {
                    r1.onNext(v1);
                });
                streamObserver.onCompleted();
                linkedHashMap.put(str, o);
            } catch (RuntimeException e) {
                if (streamObserver != null) {
                    streamObserver.onError(e);
                }
                throw e;
            }
        }
        return linkedHashMap;
    }

    @Nullable
    public <R, E> E callWhenAllFuturesAreDone(Collection<ListenableFuture<R>> collection, Callable<E> callable) {
        try {
            return (E) Futures.whenAllComplete(collection).call(callable, this.executorService).get();
        } catch (InterruptedException e) {
            this.logger.warn("Interrupted while waiting for peer status");
            Thread.currentThread().interrupt();
            return null;
        } catch (ExecutionException e2) {
            this.logger.warn("Fail to process collected peer response", e2);
            return null;
        }
    }

    @Nonnull
    public String getCurrentRPCAddress() {
        ClusterConfig fetchClusterConfig = this.clusterConfigServiceProvider.get().fetchClusterConfig();
        return fetchClusterConfig == null ? Config.getRPCAddress() : fetchClusterConfig.getExternalAddress().getRpcAddress();
    }

    @NotNull
    private ClusterConfig getClusterConfig() {
        ClusterConfig fetchClusterConfig = this.clusterConfigServiceProvider.get().fetchClusterConfig();
        if (fetchClusterConfig == null) {
            throw new IllegalStateException("Cluster not enabled");
        }
        return fetchClusterConfig;
    }

    public ExecutorService getExecutorService() {
        return this.executorService;
    }

    public void shutdown() {
        this.executorService.shutdown();
    }
}
