package com.tencent.polaris.plugins.connector.grpc;

import com.google.protobuf.StringValue;
import com.google.protobuf.TextFormat;
import com.tencent.polaris.annonation.JustForTest;
import com.tencent.polaris.api.exception.ErrorCode;
import com.tencent.polaris.api.exception.PolarisException;
import com.tencent.polaris.api.exception.ServerCodes;
import com.tencent.polaris.api.exception.ServerErrorResponseException;
import com.tencent.polaris.api.plugin.server.ServerEvent;
import com.tencent.polaris.api.pojo.ServiceEventKey;
import com.tencent.polaris.api.pojo.ServiceKey;
import com.tencent.polaris.api.utils.CollectionUtils;
import com.tencent.polaris.api.utils.StringUtils;
import com.tencent.polaris.client.util.NamedThreadFactory;
import com.tencent.polaris.logging.LoggerFactory;
import com.tencent.polaris.plugins.connector.common.ServiceUpdateTask;
import com.tencent.polaris.plugins.connector.common.constant.ServiceUpdateTaskConstant;
import com.tencent.polaris.specification.api.v1.service.manage.PolarisGRPCGrpc;
import com.tencent.polaris.specification.api.v1.service.manage.RequestProto;
import com.tencent.polaris.specification.api.v1.service.manage.ResponseProto;
import com.tencent.polaris.specification.api.v1.service.manage.ServiceProto;
import io.grpc.stub.StreamObserver;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.slf4j.Logger;

/* loaded from: input_file:com/tencent/polaris/plugins/connector/grpc/SpecStreamClient.class */
public class SpecStreamClient implements StreamObserver<ResponseProto.DiscoverResponse> {
    private final Object clientLock;
    private final Map<ServiceEventKey, SpecTask> pendingTask;
    private final AtomicBoolean endStream;
    private final StreamObserver<RequestProto.DiscoverRequest> discoverClient;
    private final Connection connection;
    private final String reqId;
    private final AtomicLong lastRecvTimeMs;
    private final long createTimeMs;
    private final long connectionIdleTimeoutMs;
    private final ScheduledExecutorService executor;
    private static final Logger LOG = LoggerFactory.getLogger(SpecStreamClient.class);
    private static long PENDING_TASK_EXPIRE_MS = 30000;

    /* loaded from: input_file:com/tencent/polaris/plugins/connector/grpc/SpecStreamClient$ExpirePendingTaskCleaner.class */
    private static class ExpirePendingTaskCleaner implements Runnable {
        private final SpecStreamClient client;
        private final long pendingTaskExpireMs;

        private ExpirePendingTaskCleaner(SpecStreamClient specStreamClient, long j) {
            this.client = specStreamClient;
            this.pendingTaskExpireMs = j;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                realCheck();
            } finally {
                this.client.executor.schedule(this, 5L, TimeUnit.SECONDS);
            }
        }

        private void realCheck() {
            long currentTimeMillis = System.currentTimeMillis();
            synchronized (this.client.clientLock) {
                HashMap hashMap = new HashMap();
                for (Map.Entry entry : this.client.pendingTask.entrySet()) {
                    SpecTask specTask = (SpecTask) entry.getValue();
                    if (currentTimeMillis - specTask.submitTimeMs > this.pendingTaskExpireMs) {
                        hashMap.put(entry.getKey(), specTask.task);
                    }
                }
                hashMap.forEach((serviceEventKey, serviceUpdateTask) -> {
                    SpecStreamClient.LOG.info("[ServerConnector] retry pending task {}, because it's long time to running", serviceEventKey);
                    this.client.removePendingTask(serviceEventKey);
                    serviceUpdateTask.retry();
                });
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/tencent/polaris/plugins/connector/grpc/SpecStreamClient$SpecTask.class */
    public static class SpecTask {
        private final ServiceUpdateTask task;
        private final long submitTimeMs;

        private SpecTask(ServiceUpdateTask serviceUpdateTask) {
            this.task = serviceUpdateTask;
            this.submitTimeMs = System.currentTimeMillis();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/tencent/polaris/plugins/connector/grpc/SpecStreamClient$ValidResult.class */
    public static class ValidResult {
        final ServiceEventKey serviceEventKey;
        final ErrorCode errorCode;
        final String message;

        public ValidResult(ServiceEventKey serviceEventKey, ErrorCode errorCode, String str) {
            this.serviceEventKey = serviceEventKey;
            this.errorCode = errorCode;
            this.message = str;
        }

        public ServiceEventKey getServiceEventKey() {
            return this.serviceEventKey;
        }

        public ErrorCode getErrorCode() {
            return this.errorCode;
        }

        public String getMessage() {
            return this.message;
        }
    }

    public SpecStreamClient(Connection connection, long j, ServiceUpdateTask serviceUpdateTask) {
        this.clientLock = new Object();
        this.pendingTask = new HashMap();
        this.endStream = new AtomicBoolean(false);
        this.lastRecvTimeMs = new AtomicLong(0L);
        this.executor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("clean-expire-pendingtask"));
        this.connection = connection;
        this.connectionIdleTimeoutMs = j;
        this.createTimeMs = System.currentTimeMillis();
        this.reqId = GrpcUtil.nextGetInstanceReqId();
        PolarisGRPCGrpc.PolarisGRPCStub newStub = PolarisGRPCGrpc.newStub(connection.getChannel());
        GrpcUtil.attachRequestHeader(newStub, this.reqId);
        this.discoverClient = newStub.discover(this);
        putPendingTask(serviceUpdateTask);
    }

    @JustForTest
    public SpecStreamClient(long j) {
        this.clientLock = new Object();
        this.pendingTask = new HashMap();
        this.endStream = new AtomicBoolean(false);
        this.lastRecvTimeMs = new AtomicLong(0L);
        this.executor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("clean-expire-pendingtask"));
        this.connection = null;
        this.connectionIdleTimeoutMs = 0L;
        this.createTimeMs = System.currentTimeMillis();
        this.discoverClient = null;
        this.reqId = UUID.randomUUID().toString();
        this.executor.schedule(new ExpirePendingTaskCleaner(j), 5L, TimeUnit.SECONDS);
    }

    public void closeStream(boolean z) {
        if (this.endStream.compareAndSet(false, true)) {
            if (z) {
                LOG.info("[ServerConnector]connection {} start to closeSend", this.connection.getConnID());
                this.discoverClient.onCompleted();
            }
            this.connection.release(GrpcUtil.OP_KEY_DISCOVER);
        }
    }

    private boolean isEndStream() {
        return this.endStream.get();
    }

    public String getReqId() {
        return this.reqId;
    }

    public void sendRequest(ServiceUpdateTask serviceUpdateTask) {
        ServiceEventKey serviceEventKey = serviceUpdateTask.getServiceEventKey();
        ServiceProto.Service.Builder newBuilder = ServiceProto.Service.newBuilder();
        newBuilder.setName(StringValue.newBuilder().setValue(serviceEventKey.getServiceKey().getService()).build());
        newBuilder.setNamespace(StringValue.newBuilder().setValue(serviceEventKey.getServiceKey().getNamespace()).build());
        newBuilder.setRevision(StringValue.newBuilder().setValue(serviceUpdateTask.getEventHandler().getRevision()).build());
        RequestProto.DiscoverRequest.Builder newBuilder2 = RequestProto.DiscoverRequest.newBuilder();
        newBuilder2.setType(GrpcUtil.buildDiscoverRequestType(serviceEventKey.getEventType()));
        newBuilder2.setService(newBuilder);
        if (serviceUpdateTask.getTaskType() == ServiceUpdateTaskConstant.Type.FIRST) {
            LOG.info("[ServerConnector]send request(id={}) to {} for service {}", new Object[]{this.reqId, this.connection.getConnID(), serviceEventKey});
        } else {
            LOG.debug("[ServerConnector]send request(id={}) to {} for service {}", new Object[]{this.reqId, this.connection.getConnID(), serviceEventKey});
        }
        this.discoverClient.onNext(newBuilder2.build());
    }

    private ValidResult validMessage(ResponseProto.DiscoverResponse discoverResponse) {
        ErrorCode errorCode = ErrorCode.Success;
        if (discoverResponse.hasCode()) {
            errorCode = ServerCodes.convertServerErrorToRpcError(discoverResponse.getCode().getValue());
        }
        ServiceProto.Service service = discoverResponse.getService();
        ServiceEventKey.EventType buildEventType = GrpcUtil.buildEventType(discoverResponse.getType());
        if (!buildEventType.equals(ServiceEventKey.EventType.SERVICE) && (StringUtils.isEmpty(service.getNamespace().getValue()) || StringUtils.isEmpty(service.getName().getValue()))) {
            return new ValidResult(null, ErrorCode.INVALID_SERVER_RESPONSE, "service is empty, response text is " + discoverResponse.toString());
        }
        if (buildEventType == ServiceEventKey.EventType.UNKNOWN) {
            return new ValidResult(null, ErrorCode.INVALID_SERVER_RESPONSE, "invalid event type " + discoverResponse.getType());
        }
        ServiceEventKey serviceEventKey = new ServiceEventKey(new ServiceKey(service.getNamespace().getValue(), service.getName().getValue()), buildEventType);
        return errorCode == ErrorCode.SERVER_ERROR ? new ValidResult(serviceEventKey, errorCode, "invalid event type " + discoverResponse.getType()) : new ValidResult(serviceEventKey, ErrorCode.Success, "");
    }

    public void exceptionCallback(ValidResult validResult) {
        closeStream(false);
        if (validResult.getMessage().contains("EOF")) {
            LOG.debug("[ServerConnector]exceptionCallback: errCode {}, info {}, serviceEventKey {}", new Object[]{validResult.getErrorCode(), validResult.getMessage(), validResult.getServiceEventKey()});
        } else {
            LOG.error("[ServerConnector]exceptionCallback: errCode {}, info {}, serviceEventKey {}", new Object[]{validResult.getErrorCode(), validResult.getMessage(), validResult.getServiceEventKey()});
        }
        this.connection.reportFail(validResult.getErrorCode());
        ArrayList arrayList = new ArrayList();
        synchronized (this.clientLock) {
            ServiceEventKey serviceEventKey = validResult.getServiceEventKey();
            if (null != serviceEventKey) {
                ServiceUpdateTask removePendingTask = removePendingTask(serviceEventKey);
                if (null != removePendingTask) {
                    arrayList.add(removePendingTask);
                    removePendingTask.notifyServerEvent(new ServerEvent(removePendingTask.getServiceEventKey(), (Object) null, ServerErrorResponseException.build(ErrorCode.NETWORK_ERROR.getCode(), String.format("[ServerConnector]code %s, fail to query service %s from server(%s): %s", validResult.getErrorCode(), removePendingTask.getServiceEventKey(), this.connection.getConnID(), validResult.getMessage()))));
                }
            } else if (CollectionUtils.isNotEmpty(this.pendingTask.values())) {
                arrayList.addAll(listPendingTasks());
                Iterator<SpecTask> it = this.pendingTask.values().iterator();
                while (it.hasNext()) {
                    ServiceUpdateTask serviceUpdateTask = it.next().task;
                    serviceUpdateTask.notifyServerEvent(new ServerEvent(serviceUpdateTask.getServiceEventKey(), (Object) null, ServerErrorResponseException.build(ErrorCode.NETWORK_ERROR.getCode(), String.format("[ServerConnector]code %s, fail to query service %s from server(%s): %s", validResult.getErrorCode(), serviceUpdateTask.getServiceEventKey(), this.connection.getConnID(), validResult.getMessage()))));
                }
                this.pendingTask.clear();
            }
        }
        Iterator it2 = arrayList.iterator();
        while (it2.hasNext()) {
            ((ServiceUpdateTask) it2.next()).retry();
        }
    }

    public void onNext(ResponseProto.DiscoverResponse discoverResponse) {
        ServiceUpdateTask removePendingTask;
        this.lastRecvTimeMs.set(System.currentTimeMillis());
        ValidResult validMessage = validMessage(discoverResponse);
        if (validMessage.errorCode != ErrorCode.Success) {
            exceptionCallback(validMessage);
            return;
        }
        ServiceProto.Service service = discoverResponse.getService();
        ServiceEventKey serviceEventKey = new ServiceEventKey(new ServiceKey(service.getNamespace().getValue(), service.getName().getValue()), GrpcUtil.buildEventType(discoverResponse.getType()));
        synchronized (this.clientLock) {
            removePendingTask = removePendingTask(serviceEventKey);
        }
        if (null == removePendingTask) {
            LOG.warn("[ServerConnector]callback not found for:{}", TextFormat.shortDebugString(service));
            return;
        }
        if (removePendingTask.getTaskType() == ServiceUpdateTaskConstant.Type.FIRST) {
            LOG.info("[ServerConnector]request(id={}) receive response for {}", getReqId(), serviceEventKey);
        } else {
            LOG.debug("[ServerConnector]request(id={}) receive response for {}", getReqId(), serviceEventKey);
        }
        if (removePendingTask.notifyServerEvent(new ServerEvent(serviceEventKey, discoverResponse, (PolarisException) null))) {
            return;
        }
        removePendingTask.addUpdateTaskSet();
    }

    public void onError(Throwable th) {
        exceptionCallback(new ValidResult(null, ErrorCode.NETWORK_ERROR, String.format("stream %s received error from server(%s), error is %s", getReqId(), this.connection.getConnID().toString(), th.getMessage())));
    }

    public void onCompleted() {
        exceptionCallback(new ValidResult(null, ErrorCode.NETWORK_ERROR, String.format("stream %s EOF by server(%s)", getReqId(), this.connection.getConnID().toString())));
    }

    public void syncCloseExpireStream() {
        synchronized (this.clientLock) {
            closeExpireStream();
        }
    }

    private boolean closeExpireStream() {
        long j = this.lastRecvTimeMs.get();
        long currentTimeMillis = System.currentTimeMillis();
        if ((j == 0 ? currentTimeMillis - this.createTimeMs : currentTimeMillis - j) < this.connectionIdleTimeoutMs) {
            return false;
        }
        closeStream(true);
        return true;
    }

    public boolean checkAvailable(ServiceUpdateTask serviceUpdateTask) {
        if (isEndStream() || !Connection.isAvailableConnection(this.connection)) {
            return false;
        }
        synchronized (this.clientLock) {
            if (isEndStream()) {
                return false;
            }
            if (closeExpireStream()) {
                return false;
            }
            SpecTask specTask = this.pendingTask.get(serviceUpdateTask.getServiceEventKey());
            if (null != specTask) {
                LOG.warn("[ServerConnector]pending task {} has been overwritten", specTask);
            }
            putPendingTask(serviceUpdateTask);
            return true;
        }
    }

    private List<ServiceUpdateTask> listPendingTasks() {
        return (List) this.pendingTask.values().stream().map(specTask -> {
            return specTask.task;
        }).collect(Collectors.toList());
    }

    public void putPendingTask(ServiceUpdateTask serviceUpdateTask) {
        LOG.debug("Put " + serviceUpdateTask.getServiceEventKey() + "to pending task map. reqId: " + this.reqId);
        this.pendingTask.put(serviceUpdateTask.getServiceEventKey(), new SpecTask(serviceUpdateTask));
    }

    public ServiceUpdateTask removePendingTask(ServiceEventKey serviceEventKey) {
        LOG.debug("Remove " + serviceEventKey + "from pending task map. reqId: " + this.reqId);
        SpecTask remove = this.pendingTask.remove(serviceEventKey);
        if (Objects.isNull(remove)) {
            return null;
        }
        return remove.task;
    }
}
