/*
 * Decompiled with CFR 0.152.
 */
package org.apache.shenyu.register.client.server.etcd.client;

import io.etcd.jetcd.ByteSequence;
import io.etcd.jetcd.Client;
import io.etcd.jetcd.KV;
import io.etcd.jetcd.KeyValue;
import io.etcd.jetcd.Lease;
import io.etcd.jetcd.Watch;
import io.etcd.jetcd.kv.GetResponse;
import io.etcd.jetcd.lease.LeaseGrantResponse;
import io.etcd.jetcd.lease.LeaseKeepAliveResponse;
import io.etcd.jetcd.options.GetOption;
import io.etcd.jetcd.options.WatchOption;
import io.etcd.jetcd.watch.WatchEvent;
import io.grpc.stub.StreamObserver;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.shenyu.common.concurrent.ShenyuThreadFactory;
import org.apache.shenyu.register.client.server.etcd.client.EtcdListenHandler;
import org.apache.shenyu.register.client.server.etcd.client.Event;
import org.apache.shenyu.register.client.server.etcd.client.Node;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EtcdClient {
    private static final Logger LOGGER = LoggerFactory.getLogger(EtcdClient.class);
    private static final int EPHEMERAL_LEASE = 60;
    private static final int DEFAULT_CORE_POOL_SIZE = 10;
    private static final int DEFAULT_QUEUE_SIZE = 1000;
    private final ThreadPoolExecutor defaultPoolExecutor = new ThreadPoolExecutor(10, 20, 0L, TimeUnit.NANOSECONDS, new ArrayBlockingQueue<Runnable>(1000), ShenyuThreadFactory.create((String)"etcd register center watch-", (boolean)true));
    private final Client client;

    public EtcdClient(String urls) {
        this.client = Client.builder().endpoints(urls.split(",")).build();
        try {
            this.initLease();
        }
        catch (InterruptedException | ExecutionException e) {
            LOGGER.error("initLease error.", (Throwable)e);
        }
    }

    private void initLease() throws ExecutionException, InterruptedException {
        Lease lease = this.client.getLeaseClient();
        LeaseGrantResponse response = (LeaseGrantResponse)lease.grant(60L).get();
        long leaseId = response.getID();
        lease.keepAlive(leaseId, (StreamObserver)new StreamObserver<LeaseKeepAliveResponse>(){

            public void onNext(LeaseKeepAliveResponse leaseKeepAliveResponse) {
            }

            public void onError(Throwable throwable) {
                LOGGER.error("keep alive error", throwable);
            }

            public void onCompleted() {
            }
        });
    }

    public String read(String key) {
        KV kv = this.client.getKVClient();
        ByteSequence storeKey = ByteSequence.from((String)key, (Charset)StandardCharsets.UTF_8);
        GetResponse response = null;
        try {
            response = (GetResponse)kv.get(storeKey).get();
        }
        catch (InterruptedException | ExecutionException e) {
            LOGGER.error("read(key:{}) error.", (Object)key, (Object)e);
        }
        if (Objects.isNull(response)) {
            return null;
        }
        LOGGER.debug(String.valueOf(response.getHeader()));
        Node info = response.getKvs().stream().map(EtcdClient::kv2NodeInfo).findFirst().orElse(null);
        assert (info != null);
        return info.getValue();
    }

    public List<String> getChildren(String path) {
        try {
            return this.listKeys(path);
        }
        catch (InterruptedException | ExecutionException e) {
            LOGGER.error("getChildren(path:{}) error.", (Object)path, (Object)e);
            return null;
        }
    }

    private List<String> listKeys(String prefix) throws ExecutionException, InterruptedException {
        KV kv = this.client.getKVClient();
        ByteSequence storePrefix = ByteSequence.from((String)prefix, (Charset)StandardCharsets.UTF_8);
        GetOption option = GetOption.newBuilder().withKeysOnly(true).withPrefix(storePrefix).build();
        GetResponse response = (GetResponse)kv.get(storePrefix, option).get();
        return response.getKvs().stream().map(o -> o.getKey().toString()).filter(k -> !k.equals(prefix)).collect(Collectors.toList());
    }

    public void subscribeChildChanges(String key, EtcdListenHandler handler) {
        this.defaultPoolExecutor.execute(() -> {
            Stoppable stoppable = new Stoppable();
            try {
                this.watchChildren(key, stoppable, handler);
            }
            catch (Exception e) {
                stoppable.stop();
                LOGGER.warn(String.format("Watch exception of %s", "/s"), (Throwable)e);
            }
        });
    }

    private void watchChildren(String key, Supplier<Boolean> exitSignSupplier, BiConsumer<Event, Node> consumer) throws InterruptedException {
        ByteSequence storeKey = ByteSequence.from((String)key, (Charset)StandardCharsets.UTF_8);
        Watch.Listener listener = this.watch(exitSignSupplier, storeKey, consumer);
        WatchOption option = WatchOption.newBuilder().withPrefix(ByteSequence.from((String)key, (Charset)StandardCharsets.UTF_8)).build();
        Watch.Watcher watch = this.client.getWatchClient().watch(ByteSequence.from((String)key, (Charset)StandardCharsets.UTF_8), option, listener);
        watch.close();
    }

    private Watch.Listener watch(Supplier<Boolean> exitSignSupplier, ByteSequence storeKey, BiConsumer<Event, Node> consumer) {
        return Watch.listener(response -> {
            while (!((Boolean)exitSignSupplier.get()).booleanValue()) {
                for (WatchEvent watchEvent : response.getEvents()) {
                    Event event;
                    KeyValue keyValue = watchEvent.getKeyValue();
                    Node info = EtcdClient.kv2NodeInfo(keyValue);
                    if (watchEvent.getKeyValue().getKey().equals((Object)storeKey)) {
                        return;
                    }
                    switch (watchEvent.getEventType()) {
                        case PUT: {
                            event = Event.UPDATE;
                            break;
                        }
                        case DELETE: {
                            event = Event.DELETE;
                            break;
                        }
                        default: {
                            event = Event.UNRECOGNIZED;
                        }
                    }
                    consumer.accept(event, info);
                }
            }
        });
    }

    static Node kv2NodeInfo(KeyValue kv) {
        String key = kv.getKey().toString();
        String value = Optional.ofNullable(kv.getValue()).map(ByteSequence::toString).orElse("");
        return new Node(key, value, kv.getCreateRevision(), kv.getModRevision(), kv.getVersion());
    }

    public void close() {
        Optional.ofNullable(this.client).ifPresent(Client::close);
    }

    static class Stoppable
    implements Supplier<Boolean> {
        private boolean exit;

        Stoppable() {
        }

        @Override
        public Boolean get() {
            return this.exit;
        }

        void stop() {
            this.exit = true;
        }
    }
}

