package org.apache.dubbo.registry.xds.util;

import io.envoyproxy.envoy.config.core.v3.Node;
import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest;
import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse;
import io.grpc.stub.StreamObserver;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.threadpool.manager.FrameworkExecutorRepository;
import org.apache.dubbo.registry.xds.util.protocol.AbstractProtocol;
import org.apache.dubbo.registry.xds.util.protocol.DeltaResource;
import org.apache.dubbo.rpc.model.ApplicationModel;

/* loaded from: input_file:org/apache/dubbo/registry/xds/util/AdsObserver.class */
public class AdsObserver {
    private static final ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(AdsObserver.class);
    private final ApplicationModel applicationModel;
    private final URL url;
    private final Node node;
    private volatile XdsChannel xdsChannel;
    protected StreamObserver<DiscoveryRequest> requestObserver;
    private final Map<String, XdsListener> listeners = new ConcurrentHashMap();
    private final Map<String, DiscoveryRequest> observedResources = new ConcurrentHashMap();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/dubbo/registry/xds/util/AdsObserver$ResponseObserver.class */
    public static class ResponseObserver implements StreamObserver<DiscoveryResponse> {
        private AdsObserver adsObserver;

        public ResponseObserver(AdsObserver adsObserver) {
            this.adsObserver = adsObserver;
        }

        public void onNext(DiscoveryResponse discoveryResponse) {
            ((XdsListener) this.adsObserver.listeners.get(discoveryResponse.getTypeUrl())).process(discoveryResponse);
            this.adsObserver.requestObserver.onNext(buildAck(discoveryResponse));
        }

        protected DiscoveryRequest buildAck(DiscoveryResponse discoveryResponse) {
            return DiscoveryRequest.newBuilder().setNode(this.adsObserver.node).setTypeUrl(discoveryResponse.getTypeUrl()).setVersionInfo(discoveryResponse.getVersionInfo()).setResponseNonce(discoveryResponse.getNonce()).addAllResourceNames(((DiscoveryRequest) this.adsObserver.observedResources.get(discoveryResponse.getTypeUrl())).getResourceNamesList()).build();
        }

        public void onError(Throwable th) {
            AdsObserver.logger.error("1-30", "", "", "xDS Client received error message! detail:", th);
            this.adsObserver.triggerReConnectTask();
        }

        public void onCompleted() {
            AdsObserver.logger.info("xDS Client completed");
            this.adsObserver.triggerReConnectTask();
        }
    }

    public AdsObserver(URL url, Node node) {
        this.url = url;
        this.node = node;
        this.xdsChannel = new XdsChannel(url);
        this.applicationModel = url.getOrDefaultApplicationModel();
    }

    public <T, S extends DeltaResource<T>> void addListener(AbstractProtocol<T, S> abstractProtocol) {
        this.listeners.put(abstractProtocol.getTypeUrl(), abstractProtocol);
    }

    public void request(DiscoveryRequest discoveryRequest) {
        if (this.requestObserver == null) {
            this.requestObserver = this.xdsChannel.createDeltaDiscoveryRequest(new ResponseObserver(this));
        }
        this.requestObserver.onNext(discoveryRequest);
        this.observedResources.put(discoveryRequest.getTypeUrl(), discoveryRequest);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void triggerReConnectTask() {
        ((FrameworkExecutorRepository) this.applicationModel.getFrameworkModel().getBeanFactory().getBean(FrameworkExecutorRepository.class)).getSharedScheduledExecutor().schedule(this::recover, 3L, TimeUnit.SECONDS);
    }

    private void recover() {
        try {
            this.xdsChannel = new XdsChannel(this.url);
        } catch (Exception e) {
            logger.error("1-30", "", "", "Recover failed for xDS connection. Will retry.", e);
        }
        if (this.xdsChannel.getChannel() == null) {
            logger.error("1-30", "", "", "Recover failed for xDS connection. Will retry. Create channel failed.");
            triggerReConnectTask();
            return;
        }
        this.requestObserver = this.xdsChannel.createDeltaDiscoveryRequest(new ResponseObserver(this));
        Collection<DiscoveryRequest> values = this.observedResources.values();
        StreamObserver<DiscoveryRequest> streamObserver = this.requestObserver;
        streamObserver.getClass();
        values.forEach((v1) -> {
            r1.onNext(v1);
        });
    }
}
