package com.aliyun.lindorm.tsdb.client.impl;

import com.alibaba.fastjson.JSON;
import com.aliyun.lindorm.tsdb.client.Callback;
import com.aliyun.lindorm.tsdb.client.Cancellable;
import com.aliyun.lindorm.tsdb.client.ClientOptions;
import com.aliyun.lindorm.tsdb.client.CodecType;
import com.aliyun.lindorm.tsdb.client.LindormTSDBClient;
import com.aliyun.lindorm.tsdb.client.codec.WriteCodecFactory;
import com.aliyun.lindorm.tsdb.client.exception.ClientException;
import com.aliyun.lindorm.tsdb.client.exception.LindormTSDBException;
import com.aliyun.lindorm.tsdb.client.model.ErrorResult;
import com.aliyun.lindorm.tsdb.client.model.Query;
import com.aliyun.lindorm.tsdb.client.model.QueryResult;
import com.aliyun.lindorm.tsdb.client.model.Record;
import com.aliyun.lindorm.tsdb.client.model.Result;
import com.aliyun.lindorm.tsdb.client.model.ResultSet;
import com.aliyun.lindorm.tsdb.client.model.VersionInfo;
import com.aliyun.lindorm.tsdb.client.model.WriteResult;
import com.aliyun.lindorm.tsdb.client.shaded.com.squareup.moshi.JsonAdapter;
import com.aliyun.lindorm.tsdb.client.shaded.com.squareup.moshi.Moshi;
import com.aliyun.lindorm.tsdb.client.shaded.com.squareup.okhttp3.ConnectionSpec;
import com.aliyun.lindorm.tsdb.client.shaded.com.squareup.okhttp3.OkHttpClient;
import com.aliyun.lindorm.tsdb.client.shaded.com.squareup.okhttp3.RequestBody;
import com.aliyun.lindorm.tsdb.client.shaded.com.squareup.okhttp3.ResponseBody;
import com.aliyun.lindorm.tsdb.client.shaded.com.squareup.okio.BufferedSource;
import com.aliyun.lindorm.tsdb.client.shaded.retrofit2.Call;
import com.aliyun.lindorm.tsdb.client.shaded.retrofit2.Response;
import com.aliyun.lindorm.tsdb.client.shaded.retrofit2.Retrofit;
import com.aliyun.lindorm.tsdb.client.shaded.retrofit2.converter.moshi.MoshiConverterFactory;
import com.aliyun.lindorm.tsdb.client.utils.ComparableVersion;
import com.aliyun.lindorm.tsdb.client.utils.LockedBarrier;
import java.io.EOFException;
import java.io.IOException;
import java.security.SecureRandom;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSession;
import javax.net.ssl.SSLSocketFactory;
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/aliyun/lindorm/tsdb/client/impl/LindormTSDBClientImpl.class */
public class LindormTSDBClientImpl implements LindormTSDBClient {
    private final ClientOptions options;
    private final Retrofit retrofit;
    private final OkHttpClient client;
    private LindormTSDBService service;
    private BatchProcessor batchProcessor;
    private RecordBatchSender[] batchSenders;
    private AtomicBoolean closed = new AtomicBoolean();
    private final LockedBarrier barrier = new LockedBarrier();
    private final ChunkProccesor chunkProccesor;
    private final JsonAdapter<QueryResult> adapter;
    private final int queryChunkSize;
    private final String defaultDatabase;
    private final String schemaPolicy;
    private final CodecType codecType;
    private static final Logger LOG = LoggerFactory.getLogger(LindormTSDBClientImpl.class);
    private static final ComparableVersion MIN_SERVER_VERSION = new ComparableVersion("3.4.8");

    /* loaded from: input_file:com/aliyun/lindorm/tsdb/client/impl/LindormTSDBClientImpl$ChunkProccesor.class */
    private interface ChunkProccesor {
        void process(ResponseBody responseBody, Cancellable cancellable, BiConsumer<Cancellable, QueryResult> biConsumer, Runnable runnable) throws IOException;
    }

    /* loaded from: input_file:com/aliyun/lindorm/tsdb/client/impl/LindormTSDBClientImpl$HostNameVerifierImpl.class */
    public static class HostNameVerifierImpl implements HostnameVerifier {
        @Override // javax.net.ssl.HostnameVerifier
        public boolean verify(String str, SSLSession sSLSession) {
            return true;
        }
    }

    /* loaded from: input_file:com/aliyun/lindorm/tsdb/client/impl/LindormTSDBClientImpl$JSONChunkProccesor.class */
    private class JSONChunkProccesor implements ChunkProccesor {
        private JsonAdapter<QueryResult> adapter;

        public JSONChunkProccesor(JsonAdapter<QueryResult> jsonAdapter) {
            this.adapter = jsonAdapter;
        }

        @Override // com.aliyun.lindorm.tsdb.client.impl.LindormTSDBClientImpl.ChunkProccesor
        public void process(ResponseBody responseBody, Cancellable cancellable, BiConsumer<Cancellable, QueryResult> biConsumer, Runnable runnable) throws IOException {
            try {
                if (responseBody == null) {
                    biConsumer.accept(cancellable, new QueryResult());
                    if (cancellable.isCanceled()) {
                        return;
                    }
                    runnable.run();
                    return;
                }
                try {
                    BufferedSource source = responseBody.source();
                    while (!cancellable.isCanceled()) {
                        QueryResult fromJson = this.adapter.fromJson(source);
                        if (fromJson != null) {
                            if (fromJson.getCode() != 0) {
                                throw new LindormTSDBException(fromJson.getCode(), fromJson.getSqlstate(), fromJson.getMessage());
                            }
                            biConsumer.accept(cancellable, fromJson);
                        }
                    }
                    responseBody.close();
                } catch (EOFException e) {
                    biConsumer.accept(cancellable, new QueryResult());
                    if (!cancellable.isCanceled()) {
                        runnable.run();
                    }
                    responseBody.close();
                }
            } catch (Throwable th) {
                responseBody.close();
                throw th;
            }
        }
    }

    /* loaded from: input_file:com/aliyun/lindorm/tsdb/client/impl/LindormTSDBClientImpl$TrustManagerImpl.class */
    public static class TrustManagerImpl implements X509TrustManager, TrustManager {
        @Override // javax.net.ssl.X509TrustManager
        public void checkClientTrusted(X509Certificate[] x509CertificateArr, String str) throws CertificateException {
        }

        @Override // javax.net.ssl.X509TrustManager
        public void checkServerTrusted(X509Certificate[] x509CertificateArr, String str) throws CertificateException {
        }

        @Override // javax.net.ssl.X509TrustManager
        public X509Certificate[] getAcceptedIssuers() {
            return new X509Certificate[0];
        }
    }

    public LindormTSDBClientImpl(ClientOptions clientOptions) {
        this.options = clientOptions;
        String username = clientOptions.getUsername();
        String password = clientOptions.getPassword();
        this.queryChunkSize = clientOptions.getQueryChunkSize();
        this.defaultDatabase = clientOptions.getDefaultDatabase();
        this.schemaPolicy = clientOptions.getSchemaPolicy().name();
        this.codecType = clientOptions.getCodecType();
        OkHttpClient.Builder builder = new OkHttpClient.Builder();
        if (username != null && password != null) {
            builder.addInterceptor(new BasicAuthInterceptor(username, password));
        }
        builder.connectionSpecs(Arrays.asList(ConnectionSpec.MODERN_TLS, ConnectionSpec.COMPATIBLE_TLS, ConnectionSpec.CLEARTEXT));
        builder.sslSocketFactory(socketFactory(), new TrustManagerImpl());
        builder.hostnameVerifier(new HostNameVerifierImpl());
        builder.connectTimeout(clientOptions.getConnectTimeoutMs(), TimeUnit.MILLISECONDS);
        builder.callTimeout(clientOptions.getRequestTimeoutMs(), TimeUnit.MILLISECONDS);
        builder.writeTimeout(clientOptions.getRequestTimeoutMs(), TimeUnit.MILLISECONDS);
        builder.readTimeout(clientOptions.getRequestTimeoutMs(), TimeUnit.MILLISECONDS);
        MoshiConverterFactory create = MoshiConverterFactory.create();
        this.client = builder.build();
        this.retrofit = new Retrofit.Builder().baseUrl(clientOptions.getUrl()).client(this.client).addConverterFactory(create).build();
        this.adapter = new Moshi.Builder().build().adapter(QueryResult.class);
        this.chunkProccesor = new JSONChunkProccesor(this.adapter);
        this.service = (LindormTSDBService) this.retrofit.create(LindormTSDBService.class);
        this.batchProcessor = new BatchProcessor(clientOptions, this.barrier);
        VersionInfo serverVersion = getServerVersion();
        LOG.info("The server version is {}", serverVersion.getVersion());
        verifyServerVersion(serverVersion.getVersion());
        int numBatchThreads = this.options.getNumBatchThreads();
        this.batchSenders = new RecordBatchSender[numBatchThreads];
        for (int i = 0; i < numBatchThreads; i++) {
            RecordBatchSender recordBatchSender = new RecordBatchSender(clientOptions, this.service, this.batchProcessor, this.barrier);
            this.batchSenders[i] = recordBatchSender;
            Thread thread = new Thread(recordBatchSender, "lindorm-tsdb-batch-sender-" + i);
            thread.setDaemon(true);
            thread.start();
        }
    }

    private void verifyServerVersion(String str) {
        if (MIN_SERVER_VERSION.compareTo(new ComparableVersion(str)) > 0) {
            throw new ClientException("The server version is lower than " + MIN_SERVER_VERSION + ", the client is not supported. Please upgrade the server version.");
        }
    }

    @Override // com.aliyun.lindorm.tsdb.client.LindormTSDBClient
    public CompletableFuture<WriteResult> write(Record record) {
        return write(this.defaultDatabase, record);
    }

    @Override // com.aliyun.lindorm.tsdb.client.LindormTSDBClient
    public CompletableFuture<WriteResult> write(List<Record> list) {
        return write(this.defaultDatabase, list);
    }

    @Override // com.aliyun.lindorm.tsdb.client.LindormTSDBClient
    public void write(Record record, Callback callback) {
        transfer(write(this.defaultDatabase, record), Collections.singletonList(record), callback);
    }

    @Override // com.aliyun.lindorm.tsdb.client.LindormTSDBClient
    public void write(List<Record> list, Callback callback) {
        transfer(write(this.defaultDatabase, list), list, callback);
    }

    @Override // com.aliyun.lindorm.tsdb.client.LindormTSDBClient
    public CompletableFuture<WriteResult> write(String str, Record record) {
        Objects.requireNonNull(str);
        Objects.requireNonNull(record);
        return this.batchProcessor.append(str, record, System.currentTimeMillis());
    }

    @Override // com.aliyun.lindorm.tsdb.client.LindormTSDBClient
    public CompletableFuture<WriteResult> write(String str, List<Record> list) {
        Objects.requireNonNull(str);
        if (list == null || list.isEmpty()) {
            throw new IllegalArgumentException("The records must be not null or empty.");
        }
        return this.batchProcessor.append(str, list, System.currentTimeMillis());
    }

    @Override // com.aliyun.lindorm.tsdb.client.LindormTSDBClient
    public WriteResult writeSync(List<Record> list) {
        return writeSync(this.defaultDatabase, list, Collections.emptyMap());
    }

    @Override // com.aliyun.lindorm.tsdb.client.LindormTSDBClient
    public WriteResult writeSync(List<Record> list, Map<String, String> map) {
        return writeSync(this.defaultDatabase, list, map);
    }

    @Override // com.aliyun.lindorm.tsdb.client.LindormTSDBClient
    public WriteResult writeSync(String str, List<Record> list) {
        return writeSync(str, list, Collections.emptyMap());
    }

    @Override // com.aliyun.lindorm.tsdb.client.LindormTSDBClient
    public WriteResult writeSync(String str, List<Record> list, Map<String, String> map) {
        Objects.requireNonNull(str);
        if (map == null) {
            map = Collections.emptyMap();
        }
        try {
            Response<ResponseBody> execute = this.service.write(str, this.codecType.name(), this.schemaPolicy, map, RequestBody.create(WriteCodecFactory.encode(list, this.codecType), BatchProcessor.MEDIA_TYPE_STRING)).execute();
            int code = execute.code();
            if (execute.isSuccessful()) {
                return WriteResult.success();
            }
            if (code < 400) {
                throw new LindormTSDBException(code, "unknown", "Unknown error.");
            }
            try {
                byte[] bytes = execute.errorBody().bytes();
                LOG.error("Failed to send points. {}", bytes);
                throw RecordBatchSender.convert(bytes, this.codecType);
            } catch (Exception e) {
                LOG.error("Failed to parse response body", e);
                throw new ClientException(e);
            }
        } catch (LindormTSDBException e2) {
            throw e2;
        } catch (Exception e3) {
            throw new ClientException(e3);
        }
    }

    @Override // com.aliyun.lindorm.tsdb.client.LindormTSDBClient
    public void flush() {
        if (this.closed.get()) {
            throw new IllegalStateException("Lindorm TSDB already closed.");
        }
        this.batchProcessor.startForceFlushing();
        RecordBatchSender recordBatchSender = null;
        int i = 0;
        while (true) {
            try {
                if (i >= this.batchSenders.length) {
                    break;
                }
                if (this.batchSenders[i] != null) {
                    recordBatchSender = this.batchSenders[i];
                    break;
                }
                i++;
            } finally {
                this.batchProcessor.finishForceFlushing();
            }
        }
        if (recordBatchSender == null) {
            throw new IllegalStateException("RecordBatchSender is null, can't flush.");
        }
        doFlushAllBatchesInQueue(recordBatchSender);
    }

    private void doFlushAllBatchesInQueue(RecordBatchSender recordBatchSender) {
        Map<String, List<RecordBatch>> drainAll = this.batchProcessor.drainAll();
        if (drainAll == null || drainAll.isEmpty()) {
            return;
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("start to force flush records for {}", drainAll.keySet());
        }
        recordBatchSender.sendPointRequests(drainAll, System.currentTimeMillis(), false);
    }

    @Override // com.aliyun.lindorm.tsdb.client.LindormTSDBClient
    public Result execute(Query query) {
        Objects.requireNonNull(query);
        Objects.requireNonNull(query.getDatabase());
        Objects.requireNonNull(query.getCommand());
        try {
            Response<QueryResult> execute = this.service.query(query.getDatabase(), RequestBody.create(BatchProcessor.MEDIA_TYPE_STRING, query.getCommand())).execute();
            if (!execute.isSuccessful()) {
                ErrorResult fromJSON = ErrorResult.fromJSON(execute.errorBody().string());
                throw new LindormTSDBException(fromJSON.getCode(), fromJSON.getSqlstate(), fromJSON.getMessage());
            }
            QueryResult body = execute.body();
            if (body == null) {
                body = new QueryResult();
            }
            return body;
        } catch (IOException e) {
            throw new ClientException(e);
        }
    }

    @Override // com.aliyun.lindorm.tsdb.client.LindormTSDBClient
    public void query(Query query, final Consumer<QueryResult> consumer, final Consumer<Throwable> consumer2) {
        Objects.requireNonNull(query);
        Objects.requireNonNull(query.getDatabase());
        Objects.requireNonNull(query.getCommand());
        this.service.query(query.getDatabase(), RequestBody.create(BatchProcessor.MEDIA_TYPE_STRING, query.getCommand())).enqueue(new com.aliyun.lindorm.tsdb.client.shaded.retrofit2.Callback<QueryResult>() { // from class: com.aliyun.lindorm.tsdb.client.impl.LindormTSDBClientImpl.1
            @Override // com.aliyun.lindorm.tsdb.client.shaded.retrofit2.Callback
            public void onResponse(Call<QueryResult> call, Response<QueryResult> response) {
                if (response.isSuccessful()) {
                    consumer.accept(response.body());
                    return;
                }
                response.code();
                IOException iOException = null;
                String str = null;
                try {
                    str = response.errorBody().string();
                } catch (IOException e) {
                    iOException = e;
                }
                if (iOException != null) {
                    consumer2.accept(new ClientException(iOException));
                    return;
                }
                ErrorResult fromJSON = ErrorResult.fromJSON(str);
                consumer2.accept(new LindormTSDBException(fromJSON.getCode(), fromJSON.getSqlstate(), fromJSON.getMessage()));
            }

            @Override // com.aliyun.lindorm.tsdb.client.shaded.retrofit2.Callback
            public void onFailure(Call<QueryResult> call, Throwable th) {
                consumer2.accept(new ClientException(th));
            }
        });
    }

    @Override // com.aliyun.lindorm.tsdb.client.LindormTSDBClient
    public ResultSet query(String str) {
        return query(this.defaultDatabase, str);
    }

    @Override // com.aliyun.lindorm.tsdb.client.LindormTSDBClient
    public ResultSet query(String str, String str2) {
        return query(str, str2, this.queryChunkSize);
    }

    @Override // com.aliyun.lindorm.tsdb.client.LindormTSDBClient
    public ResultSet query(String str, String str2, int i) {
        ResponseBody body;
        try {
            Response<ResponseBody> execute = this.service.query(str, i, RequestBody.create(BatchProcessor.MEDIA_TYPE_STRING, str2)).execute();
            if (execute.isSuccessful()) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Read a chunked body");
                }
                if (execute.code() != 204 && (body = execute.body()) != null) {
                    return new ResultSetImpl(this.adapter, body);
                }
                return ResultSet.EMPTY;
            }
            int code = execute.code();
            String string = execute.errorBody().string();
            if (LOG.isDebugEnabled()) {
                LOG.error("Encountered error while query points. errorCode: {}, message: {}", Integer.valueOf(code), string);
            }
            ErrorResult fromJSON = ErrorResult.fromJSON(string);
            throw new LindormTSDBException(fromJSON.getCode(), fromJSON.getSqlstate(), fromJSON.getMessage());
        } catch (Exception e) {
            throw new ClientException(e);
        }
    }

    @Override // com.aliyun.lindorm.tsdb.client.LindormTSDBClient
    public VersionInfo getServerVersion() {
        try {
            Response<ResponseBody> execute = this.service.version().execute();
            if (execute.isSuccessful()) {
                return (VersionInfo) JSON.parseObject(execute.body().string(), VersionInfo.class);
            }
            int code = execute.code();
            String string = execute.errorBody().string();
            if (LOG.isDebugEnabled()) {
                LOG.error("Encountered error while get server version. errorCode: {}, message: {}", Integer.valueOf(code), string);
            }
            ErrorResult fromJSON = ErrorResult.fromJSON(string);
            throw new LindormTSDBException(fromJSON.getCode(), fromJSON.getSqlstate(), fromJSON.getMessage());
        } catch (Exception e) {
            throw new ClientException(e);
        }
    }

    @Override // com.aliyun.lindorm.tsdb.client.LindormTSDBClient
    public boolean isHealth() {
        try {
            return this.service.health().execute().isSuccessful();
        } catch (Exception e) {
            throw new ClientException(e);
        }
    }

    @Override // com.aliyun.lindorm.tsdb.client.LindormTSDBClient
    public void query(Query query, int i, final BiConsumer<Cancellable, QueryResult> biConsumer, final Runnable runnable, final Consumer<Throwable> consumer) {
        Objects.requireNonNull(query);
        Objects.requireNonNull(query.getDatabase());
        Objects.requireNonNull(query.getCommand());
        this.service.query(query.getDatabase(), i, RequestBody.create(BatchProcessor.MEDIA_TYPE_STRING, query.getCommand())).enqueue(new com.aliyun.lindorm.tsdb.client.shaded.retrofit2.Callback<ResponseBody>() { // from class: com.aliyun.lindorm.tsdb.client.impl.LindormTSDBClientImpl.2
            @Override // com.aliyun.lindorm.tsdb.client.shaded.retrofit2.Callback
            public void onResponse(final Call<ResponseBody> call, Response<ResponseBody> response) {
                Cancellable cancellable = new Cancellable() { // from class: com.aliyun.lindorm.tsdb.client.impl.LindormTSDBClientImpl.2.1
                    @Override // com.aliyun.lindorm.tsdb.client.Cancellable
                    public void cancel() {
                        call.cancel();
                    }

                    @Override // com.aliyun.lindorm.tsdb.client.Cancellable
                    public boolean isCanceled() {
                        return call.isCanceled();
                    }
                };
                try {
                    if (response.isSuccessful()) {
                        if (LindormTSDBClientImpl.LOG.isDebugEnabled()) {
                            LindormTSDBClientImpl.LOG.debug("Read a chunked body");
                        }
                        LindormTSDBClientImpl.this.chunkProccesor.process(response.body(), cancellable, biConsumer, runnable);
                    } else {
                        int code = response.code();
                        String string = response.errorBody().string();
                        if (LindormTSDBClientImpl.LOG.isDebugEnabled()) {
                            LindormTSDBClientImpl.LOG.error("Encountered error while query points. errorCode: {}, message: {}", Integer.valueOf(code), string);
                        }
                        ErrorResult fromJSON = ErrorResult.fromJSON(string);
                        LindormTSDBException lindormTSDBException = new LindormTSDBException(fromJSON.getCode(), fromJSON.getSqlstate(), fromJSON.getMessage());
                        if (consumer == null) {
                            throw lindormTSDBException;
                        }
                        consumer.accept(lindormTSDBException);
                    }
                } catch (IOException e) {
                    LindormTSDBClientImpl.LOG.error("Encountered IO error while query points.", e);
                    QueryResult queryResult = new QueryResult();
                    queryResult.setError(e.toString());
                    biConsumer.accept(cancellable, queryResult);
                    if (consumer != null) {
                        consumer.accept(e);
                    }
                } catch (Exception e2) {
                    LindormTSDBClientImpl.LOG.error("Encountered error while query points.", e2);
                    call.cancel();
                    if (consumer != null) {
                        consumer.accept(e2);
                    }
                }
            }

            @Override // com.aliyun.lindorm.tsdb.client.shaded.retrofit2.Callback
            public void onFailure(Call<ResponseBody> call, Throwable th) {
                if (consumer == null) {
                    throw new ClientException(th);
                }
                consumer.accept(th);
            }
        });
    }

    @Override // com.aliyun.lindorm.tsdb.client.LindormTSDBClient
    public void shutdown() {
        if (!this.closed.compareAndSet(false, true)) {
            LOG.warn("Lindorm TSDB already closed.");
            return;
        }
        LOG.debug("Beginning shutdown of Lindorm TSDB Client.");
        this.batchProcessor.close();
        for (int i = 0; i < this.batchSenders.length; i++) {
            RecordBatchSender recordBatchSender = this.batchSenders[i];
            if (recordBatchSender != null) {
                recordBatchSender.setRunning(false);
            }
        }
        int i2 = 0;
        while (true) {
            if (i2 >= this.batchSenders.length) {
                break;
            }
            if (this.batchSenders[i2] != null) {
                doFlushAllBatchesInQueue(this.batchSenders[i2]);
                break;
            }
            i2++;
        }
        LOG.debug("Shutdown of Lindorm TSDB Client has completed.");
    }

    public static SSLSocketFactory socketFactory() {
        SSLSocketFactory sSLSocketFactory = null;
        try {
            SSLContext sSLContext = SSLContext.getInstance("TLS");
            sSLContext.init(null, new TrustManager[]{new TrustManagerImpl()}, new SecureRandom());
            sSLSocketFactory = sSLContext.getSocketFactory();
        } catch (Exception e) {
        }
        return sSLSocketFactory;
    }

    void stopAllSenders() {
        for (int i = 0; i < this.batchSenders.length; i++) {
            this.batchSenders[i].setRunning(false);
        }
    }
}
