package com.alibaba.nacos.istio.mcp;

import com.alibaba.nacos.common.utils.StringUtils;
import com.alibaba.nacos.istio.misc.Loggers;
import io.grpc.stub.StreamObserver;
import istio.mcp.v1alpha1.Mcp;
import istio.mcp.v1alpha1.ResourceOuterClass;
import istio.mcp.v1alpha1.ResourceSourceGrpc;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.springframework.stereotype.Service;

@Service
/* loaded from: input_file:com/alibaba/nacos/istio/mcp/NacosMcpService.class */
public class NacosMcpService extends ResourceSourceGrpc.ResourceSourceImplBase {
    private final AtomicInteger connectIdGenerator = new AtomicInteger(0);
    private final Map<Integer, StreamObserver<Mcp.Resources>> connnections = new ConcurrentHashMap(16);
    private Map<String, ResourceOuterClass.Resource> resourceMapCache;

    public void sendResources(Map<String, ResourceOuterClass.Resource> map) {
        this.resourceMapCache = map;
        Loggers.MAIN.info("send resources for mcp,count : {}", Integer.valueOf(map.size()));
        Mcp.Resources generateResponse = generateResponse(map);
        if (Loggers.MAIN.isDebugEnabled()) {
            Loggers.MAIN.debug("mcp resources:{}", generateResponse.toString());
        }
        for (StreamObserver<Mcp.Resources> streamObserver : this.connnections.values()) {
            Loggers.MAIN.info("mcp send to:{}", streamObserver.toString());
            streamObserver.onNext(generateResponse);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Mcp.Resources generateResponse(Map<String, ResourceOuterClass.Resource> map) {
        return Mcp.Resources.newBuilder().addAllResources(map.values()).setCollection(CollectionTypes.SERVICE_ENTRY).setNonce(String.valueOf(System.currentTimeMillis())).m1079build();
    }

    @Override // istio.mcp.v1alpha1.ResourceSourceGrpc.ResourceSourceImplBase
    public StreamObserver<Mcp.RequestResources> establishResourceStream(final StreamObserver<Mcp.Resources> streamObserver) {
        final int incrementAndGet = this.connectIdGenerator.incrementAndGet();
        this.connnections.put(Integer.valueOf(incrementAndGet), streamObserver);
        return new StreamObserver<Mcp.RequestResources>() { // from class: com.alibaba.nacos.istio.mcp.NacosMcpService.1
            private final int connectionId;

            {
                this.connectionId = incrementAndGet;
            }

            public void onNext(Mcp.RequestResources requestResources) {
                Loggers.MAIN.info("receiving request, sink: {}, type: {}", requestResources.getSinkNode(), requestResources.getCollection());
                if (requestResources.getErrorDetail() != null && requestResources.getErrorDetail().getCode() != 0) {
                    Loggers.MAIN.error("NACK error code: {}, message: {}", Integer.valueOf(requestResources.getErrorDetail().getCode()), requestResources.getErrorDetail().getMessage());
                    return;
                }
                if (StringUtils.isNotBlank(requestResources.getResponseNonce())) {
                    Loggers.MAIN.info("ACK nonce: {}, type: {}", requestResources.getResponseNonce(), requestResources.getCollection());
                } else if (CollectionTypes.SERVICE_ENTRY.equals(requestResources.getCollection())) {
                    streamObserver.onNext(NacosMcpService.this.generateResponse(NacosMcpService.this.resourceMapCache));
                } else {
                    streamObserver.onNext(Mcp.Resources.newBuilder().setCollection(requestResources.getCollection()).setNonce(String.valueOf(System.currentTimeMillis())).m1079build());
                }
            }

            public void onError(Throwable th) {
                Loggers.MAIN.error("stream error.", th);
                NacosMcpService.this.connnections.remove(Integer.valueOf(this.connectionId));
            }

            public void onCompleted() {
                streamObserver.onCompleted();
            }
        };
    }
}
