package org.apache.shenyu.plugin.tencent.cls.client;

import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.tencentcloudapi.cls.producer.AsyncProducerClient;
import com.tencentcloudapi.cls.producer.AsyncProducerConfig;
import com.tencentcloudapi.cls.producer.Result;
import com.tencentcloudapi.cls.producer.common.LogItem;
import com.tencentcloudapi.cls.producer.errors.LogSizeTooLargeException;
import com.tencentcloudapi.cls.producer.errors.MaxBatchCountExceedException;
import com.tencentcloudapi.cls.producer.errors.ProducerException;
import com.tencentcloudapi.cls.producer.errors.ResultFailedException;
import com.tencentcloudapi.cls.producer.util.NetworkUtils;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.commons.lang3.StringUtils;
import org.apache.shenyu.common.concurrent.ShenyuThreadFactory;
import org.apache.shenyu.common.utils.GsonUtils;
import org.apache.shenyu.plugin.logging.common.client.AbstractLogConsumeClient;
import org.apache.shenyu.plugin.logging.common.constant.GenericLoggingConstant;
import org.apache.shenyu.plugin.logging.common.entity.ShenyuRequestLog;
import org.apache.shenyu.plugin.tencent.cls.config.TencentLogCollectConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.lang.NonNull;

/* loaded from: input_file:org/apache/shenyu/plugin/tencent/cls/client/TencentClsLogCollectClient.class */
public class TencentClsLogCollectClient extends AbstractLogConsumeClient<TencentLogCollectConfig.TencentClsLogConfig, ShenyuRequestLog> {
    private AsyncProducerClient client;
    private String topic;
    private ThreadPoolExecutor threadExecutor;

    /* loaded from: input_file:org/apache/shenyu/plugin/tencent/cls/client/TencentClsLogCollectClient$ProducerFutureCallback.class */
    private static final class ProducerFutureCallback implements FutureCallback<Result> {
        private static final Logger LOGGER = LoggerFactory.getLogger(ProducerFutureCallback.class);
        private final String topic;

        ProducerFutureCallback(String str) {
            this.topic = str;
        }

        public void onSuccess(@Nullable Result result) {
            LOGGER.info("Send logs to Tencent cls successfully.");
        }

        public void onFailure(Throwable th) {
            if (!(th instanceof ResultFailedException)) {
                LOGGER.error("Failed to send log, e={}", th.getMessage());
            } else {
                LOGGER.error("Failed to send logs, topic={}, result={}", this.topic, ((ResultFailedException) th).getResult());
            }
        }
    }

    public void initClient0(@NonNull TencentLogCollectConfig.TencentClsLogConfig tencentClsLogConfig) {
        String secretId = tencentClsLogConfig.getSecretId();
        String secretKey = tencentClsLogConfig.getSecretKey();
        String endpoint = tencentClsLogConfig.getEndpoint();
        this.topic = tencentClsLogConfig.getTopic();
        if (StringUtils.isBlank(secretId) || StringUtils.isBlank(secretKey) || StringUtils.isBlank(this.topic) || StringUtils.isBlank(endpoint)) {
            LOG.error("init Tencent cls client error, please check secretId, secretKey, topic or host");
            return;
        }
        AsyncProducerConfig asyncProducerConfig = new AsyncProducerConfig(endpoint, secretId, secretKey, NetworkUtils.getLocalMachineIP());
        Optional map = Optional.ofNullable(tencentClsLogConfig.getTotalSizeInBytes()).map(Integer::valueOf);
        asyncProducerConfig.getClass();
        map.ifPresent((v1) -> {
            r1.setTotalSizeInBytes(v1);
        });
        Optional map2 = Optional.ofNullable(tencentClsLogConfig.getMaxSendThreadCount()).map(Integer::valueOf);
        asyncProducerConfig.getClass();
        map2.ifPresent((v1) -> {
            r1.setSendThreadCount(v1);
        });
        Optional map3 = Optional.ofNullable(tencentClsLogConfig.getMaxBlockSec()).map(Long::valueOf);
        asyncProducerConfig.getClass();
        map3.ifPresent((v1) -> {
            r1.setMaxBlockMs(v1);
        });
        Optional map4 = Optional.ofNullable(tencentClsLogConfig.getMaxBatchSize()).map(Integer::valueOf);
        asyncProducerConfig.getClass();
        map4.ifPresent((v1) -> {
            r1.setBatchSizeThresholdInBytes(v1);
        });
        Optional map5 = Optional.ofNullable(tencentClsLogConfig.getMaxBatchCount()).map(Integer::valueOf);
        asyncProducerConfig.getClass();
        map5.ifPresent((v1) -> {
            r1.setBatchCountThreshold(v1);
        });
        Optional map6 = Optional.ofNullable(tencentClsLogConfig.getLingerMs()).map(Integer::valueOf);
        asyncProducerConfig.getClass();
        map6.ifPresent((v1) -> {
            r1.setLingerMs(v1);
        });
        Optional map7 = Optional.ofNullable(tencentClsLogConfig.getRetries()).map(Integer::valueOf);
        asyncProducerConfig.getClass();
        map7.ifPresent((v1) -> {
            r1.setRetries(v1);
        });
        Optional map8 = Optional.ofNullable(tencentClsLogConfig.getMaxReservedAttempts()).map(Integer::valueOf);
        asyncProducerConfig.getClass();
        map8.ifPresent((v1) -> {
            r1.setMaxReservedAttempts(v1);
        });
        Optional map9 = Optional.ofNullable(tencentClsLogConfig.getBaseRetryBackoffMs()).map(Long::valueOf);
        asyncProducerConfig.getClass();
        map9.ifPresent((v1) -> {
            r1.setBaseRetryBackoffMs(v1);
        });
        Optional map10 = Optional.ofNullable(tencentClsLogConfig.getMaxRetryBackoffMs()).map(Long::valueOf);
        asyncProducerConfig.getClass();
        map10.ifPresent((v1) -> {
            r1.setMaxRetryBackoffMs(v1);
        });
        this.threadExecutor = createThreadPoolExecutor(tencentClsLogConfig.getSendThreadCount().intValue());
        try {
            this.client = new AsyncProducerClient(asyncProducerConfig);
        } catch (Exception e) {
            LOG.warn("TencentClsLogCollectClient initClient error message:{}", e.getMessage());
        }
    }

    public void consume0(@NonNull List<ShenyuRequestLog> list) {
        list.forEach(this::sendLog);
    }

    public void close0() {
        if (Objects.nonNull(this.client)) {
            try {
                this.client.close();
            } catch (InterruptedException | ProducerException e) {
                LOG.error("Close producer error.");
            }
        }
    }

    private void sendLog(ShenyuRequestLog shenyuRequestLog) {
        ArrayList arrayList = new ArrayList();
        LogItem logItem = new LogItem((int) (System.currentTimeMillis() / 1000));
        logItem.PushBack("level", "info");
        logItem.PushBack("name", shenyuRequestLog.getRequestUri());
        logItem.PushBack("message", GsonUtils.getGson().toJson(shenyuRequestLog));
        arrayList.add(logItem);
        try {
            Futures.addCallback(this.client.putLogs(this.topic, arrayList, result -> {
            }), new ProducerFutureCallback(this.topic), this.threadExecutor);
        } catch (InterruptedException e) {
            LOG.warn("The current thread has been interrupted during send logs.");
        } catch (Exception e2) {
            if (e2 instanceof MaxBatchCountExceedException) {
                LOG.error("The logs exceeds the maximum batch count, e={}", e2.getMessage());
            } else if (e2 instanceof LogSizeTooLargeException) {
                LOG.error("The size of log is larger than the maximum allowable size, e={}", e2.getMessage());
            } else {
                LOG.error("Failed to send logs, e={}", e2.getMessage());
            }
        }
    }

    private static ThreadPoolExecutor createThreadPoolExecutor(int i) {
        int i2 = i;
        if (i2 > GenericLoggingConstant.MAX_ALLOW_THREADS.intValue()) {
            LOG.warn("send thread count number too large!");
            i2 = GenericLoggingConstant.MAX_ALLOW_THREADS.intValue();
        }
        return new ThreadPoolExecutor(i2, GenericLoggingConstant.MAX_ALLOW_THREADS.intValue(), 60000L, TimeUnit.MICROSECONDS, new LinkedBlockingQueue(GenericLoggingConstant.MAX_QUEUE_NUMBER.intValue()), ShenyuThreadFactory.create("shenyu-tencent-cls", true), new ThreadPoolExecutor.AbortPolicy());
    }
}
