package com.alibaba.nacos.istio.xds;

import com.alibaba.nacos.istio.api.ApiConstants;
import com.alibaba.nacos.istio.api.ApiGeneratorFactory;
import com.alibaba.nacos.istio.common.AbstractConnection;
import com.alibaba.nacos.istio.common.Event;
import com.alibaba.nacos.istio.common.NacosResourceManager;
import com.alibaba.nacos.istio.common.ResourceSnapshot;
import com.alibaba.nacos.istio.common.WatchedStatus;
import com.alibaba.nacos.istio.misc.Loggers;
import com.alibaba.nacos.istio.util.NonceGenerator;
import io.envoyproxy.envoy.service.discovery.v3.AggregatedDiscoveryServiceGrpc;
import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest;
import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse;
import io.grpc.stub.StreamObserver;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
/* loaded from: input_file:com/alibaba/nacos/istio/xds/NacosXdsService.class */
public class NacosXdsService extends AggregatedDiscoveryServiceGrpc.AggregatedDiscoveryServiceImplBase {
    private final Map<String, AbstractConnection<DiscoveryResponse>> connections = new ConcurrentHashMap(16);

    @Autowired
    ApiGeneratorFactory apiGeneratorFactory;

    @Autowired
    NacosResourceManager resourceManager;

    public boolean hasClientConnection() {
        return this.connections.size() != 0;
    }

    public StreamObserver<DiscoveryRequest> streamAggregatedResources(final StreamObserver<DiscoveryResponse> streamObserver) {
        this.resourceManager.initResourceSnapshot();
        final XdsConnection xdsConnection = new XdsConnection(streamObserver);
        return new StreamObserver<DiscoveryRequest>() { // from class: com.alibaba.nacos.istio.xds.NacosXdsService.1
            private boolean initRequest = true;

            public void onNext(DiscoveryRequest discoveryRequest) {
                if (this.initRequest) {
                    xdsConnection.setConnectionId(discoveryRequest.getNode().getId());
                    NacosXdsService.this.connections.put(xdsConnection.getConnectionId(), xdsConnection);
                    this.initRequest = false;
                }
                NacosXdsService.this.process(discoveryRequest, xdsConnection);
            }

            public void onError(Throwable th) {
                Loggers.MAIN.error("xds: {} stream error.", xdsConnection.getConnectionId(), th);
                clear();
            }

            public void onCompleted() {
                Loggers.MAIN.info("xds: {} stream close.", xdsConnection.getConnectionId());
                streamObserver.onCompleted();
                clear();
            }

            private void clear() {
                NacosXdsService.this.connections.remove(xdsConnection.getConnectionId());
            }
        };
    }

    public void process(DiscoveryRequest discoveryRequest, AbstractConnection<DiscoveryResponse> abstractConnection) {
        if (shouldPush(discoveryRequest, abstractConnection)) {
            abstractConnection.push(buildDiscoveryResponse(discoveryRequest.getTypeUrl(), this.resourceManager.getResourceSnapshot()), abstractConnection.getWatchedStatusByType(discoveryRequest.getTypeUrl()));
        }
    }

    private boolean shouldPush(DiscoveryRequest discoveryRequest, AbstractConnection<DiscoveryResponse> abstractConnection) {
        String typeUrl = discoveryRequest.getTypeUrl();
        String connectionId = abstractConnection.getConnectionId();
        if (typeUrl.equals(ApiConstants.MESH_CONFIG_PROTO_PACKAGE)) {
            Loggers.MAIN.info("xds: type {} should be ignored.", typeUrl);
            return false;
        }
        if (discoveryRequest.getErrorDetail().getCode() != 0) {
            Loggers.MAIN.error("xds: ACK error, connection-id: {}, code: {}, message: {}", new Object[]{connectionId, Integer.valueOf(discoveryRequest.getErrorDetail().getCode()), discoveryRequest.getErrorDetail().getMessage()});
            return false;
        }
        if (discoveryRequest.getResponseNonce().isEmpty()) {
            Loggers.MAIN.info("xds: init request, type {}, connection-id {}, version {}", new Object[]{typeUrl, connectionId, discoveryRequest.getVersionInfo()});
            WatchedStatus watchedStatus = new WatchedStatus();
            watchedStatus.setType(discoveryRequest.getTypeUrl());
            abstractConnection.addWatchedResource(discoveryRequest.getTypeUrl(), watchedStatus);
            return true;
        }
        WatchedStatus watchedStatusByType = abstractConnection.getWatchedStatusByType(discoveryRequest.getTypeUrl());
        if (watchedStatusByType == null) {
            Loggers.MAIN.info("xds: reconnect, type {}, connection-id {}, version {}, nonce {}.", new Object[]{typeUrl, connectionId, discoveryRequest.getVersionInfo(), discoveryRequest.getResponseNonce()});
            WatchedStatus watchedStatus2 = new WatchedStatus();
            watchedStatus2.setType(discoveryRequest.getTypeUrl());
            abstractConnection.addWatchedResource(discoveryRequest.getTypeUrl(), watchedStatus2);
            return true;
        }
        if (!watchedStatusByType.getLatestNonce().equals(discoveryRequest.getResponseNonce())) {
            Loggers.MAIN.warn("xds: request dis match, type {}, connection-id {}", discoveryRequest.getTypeUrl(), abstractConnection.getConnectionId());
            return false;
        }
        watchedStatusByType.setAckedVersion(discoveryRequest.getVersionInfo());
        watchedStatusByType.setAckedNonce(discoveryRequest.getResponseNonce());
        Loggers.MAIN.info("xds: ack, type {}, connection-id {}, version {}, nonce {}", new Object[]{typeUrl, connectionId, discoveryRequest.getVersionInfo(), discoveryRequest.getResponseNonce()});
        return false;
    }

    public void handleEvent(ResourceSnapshot resourceSnapshot, Event event) {
        switch (event.getType()) {
            case Service:
                if (this.connections.size() == 0) {
                    return;
                }
                Loggers.MAIN.info("xds: event {} trigger push.", event.getType());
                DiscoveryResponse buildDiscoveryResponse = buildDiscoveryResponse(ApiConstants.SERVICE_ENTRY_PROTO_PACKAGE, resourceSnapshot);
                for (AbstractConnection<DiscoveryResponse> abstractConnection : this.connections.values()) {
                    WatchedStatus watchedStatusByType = abstractConnection.getWatchedStatusByType(ApiConstants.SERVICE_ENTRY_PROTO_PACKAGE);
                    if (watchedStatusByType != null) {
                        abstractConnection.push(buildDiscoveryResponse, watchedStatusByType);
                    }
                }
                return;
            case Endpoint:
                Loggers.MAIN.warn("Currently, endpoint event is not supported.");
                return;
            default:
                Loggers.MAIN.warn("Invalid event {}, ignore it.", event.getType());
                return;
        }
    }

    private DiscoveryResponse buildDiscoveryResponse(String str, ResourceSnapshot resourceSnapshot) {
        List<?> generate = this.apiGeneratorFactory.getApiGenerator(str).generate(resourceSnapshot);
        return DiscoveryResponse.newBuilder().setTypeUrl(str).addAllResources(generate).setVersionInfo(resourceSnapshot.getVersion()).setNonce(NonceGenerator.generateNonce()).build();
    }
}
