package com.aliyun.lindorm.tsdb.client.impl;

import com.aliyun.lindorm.tsdb.client.ClientOptions;
import com.aliyun.lindorm.tsdb.client.CodecType;
import com.aliyun.lindorm.tsdb.client.codec.WriteCodecFactory;
import com.aliyun.lindorm.tsdb.client.exception.ClientException;
import com.aliyun.lindorm.tsdb.client.exception.LindormTSDBException;
import com.aliyun.lindorm.tsdb.client.model.ErrorResult;
import com.aliyun.lindorm.tsdb.client.shaded.com.squareup.okhttp3.HttpUrl;
import com.aliyun.lindorm.tsdb.client.shaded.com.squareup.okhttp3.RequestBody;
import com.aliyun.lindorm.tsdb.client.shaded.com.squareup.okhttp3.ResponseBody;
import com.aliyun.lindorm.tsdb.client.shaded.retrofit2.Call;
import com.aliyun.lindorm.tsdb.client.shaded.retrofit2.Response;
import com.aliyun.lindorm.tsdb.client.utils.LockedBarrier;
import com.aliyun.lindorm.tsdb.client.utils.UnsafeBuffer;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/aliyun/lindorm/tsdb/client/impl/RecordBatchSender.class */
public class RecordBatchSender implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(RecordBatchSender.class);
    private final int retries;
    private final int requestTimeoutMs;
    private final long retryBackoffMs;
    private final BatchProcessor batchProcessor;
    private final LindormTSDBService service;
    private final LockedBarrier barrier;
    private final String schemaPolicy;
    private final CodecType codecType;
    private int numEmptyBatches = 0;
    private final long maxWaitTimeMs = 100;
    private volatile boolean running = true;

    public RecordBatchSender(ClientOptions clientOptions, LindormTSDBService lindormTSDBService, BatchProcessor batchProcessor, LockedBarrier lockedBarrier) {
        this.retries = clientOptions.getMaxRetries();
        this.requestTimeoutMs = clientOptions.getRequestTimeoutMs();
        this.retryBackoffMs = clientOptions.getRetryBackoffMs();
        this.schemaPolicy = clientOptions.getSchemaPolicy().name();
        this.codecType = clientOptions.getCodecType();
        this.service = lindormTSDBService;
        this.batchProcessor = batchProcessor;
        this.barrier = lockedBarrier;
    }

    @Override // java.lang.Runnable
    public void run() {
        LOG.debug("Starting Lindorm TSDB sender I/O thread.");
        while (this.running) {
            try {
                runOnce();
            } catch (Exception e) {
                LOG.error("Uncaught error in Lindorm TSDB sender I/O thread: ", e);
            }
        }
        LOG.debug("Beginning shutdown of Lindorm TSDB sender I/O thread, sending remaining points.");
        while (true) {
            long currentTimeMillis = System.currentTimeMillis();
            Map<String, List<RecordBatch>> drain = this.batchProcessor.drain(currentTimeMillis);
            if (drain.isEmpty()) {
                LOG.debug("Shutdown of Lindorm TSDB sender I/O thread has completed.");
                return;
            }
            sendPointRequests(drain, currentTimeMillis, false);
        }
    }

    private void runOnce() throws InterruptedException {
        long currentTimeMillis = System.currentTimeMillis();
        Map<String, List<RecordBatch>> drain = this.batchProcessor.drain(currentTimeMillis);
        if (!drain.isEmpty()) {
            this.numEmptyBatches = 0;
            sendPointRequests(drain, currentTimeMillis, true);
        } else {
            this.numEmptyBatches++;
            if (this.numEmptyBatches > 3) {
                this.barrier.await(100L);
            }
        }
    }

    public void sendPointRequests(Map<String, List<RecordBatch>> map, long j, boolean z) {
        for (Map.Entry<String, List<RecordBatch>> entry : map.entrySet()) {
            sendPointRequest(j, entry.getKey(), this.requestTimeoutMs, entry.getValue(), z);
        }
    }

    private void sendPointRequest(long j, String str, int i, List<RecordBatch> list, boolean z) {
        for (RecordBatch recordBatch : list) {
            try {
                Call<ResponseBody> write = this.service.write(str, this.codecType.name(), this.schemaPolicy, RequestBody.create(WriteCodecFactory.encode(recordBatch.getRecords(), this.codecType), BatchProcessor.MEDIA_TYPE_STRING));
                try {
                    completeBatch(recordBatch, write, write.execute(), j, z, this.codecType);
                } catch (Exception e) {
                    failBatch(recordBatch, write, new ClientException(e), j);
                }
            } catch (Exception e2) {
                LOG.error("Failed to send record batch for {}", recordBatch.getDatabase(), e2);
                recordBatch.done(new ClientException(e2));
            }
        }
    }

    private void completeBatch(RecordBatch recordBatch, Call<ResponseBody> call, Response<ResponseBody> response, long j, boolean z, CodecType codecType) {
        int code = response.code();
        if (response.isSuccessful()) {
            recordBatch.done(null);
            return;
        }
        if (code >= 300 && code < 400) {
            recordBatch.done(new LindormTSDBException(code, HttpUrl.FRAGMENT_ENCODE_SET, "The request redirected."));
            return;
        }
        if (code >= 400) {
            try {
                byte[] bytes = response.errorBody().bytes();
                LOG.error("Failed to send points. {}", bytes);
                failBatch(recordBatch, code, bytes, j, z, codecType);
            } catch (Exception e) {
                LOG.error("Failed to parse response body", e);
                recordBatch.done(new ClientException(e));
            }
        }
    }

    private void failBatch(RecordBatch recordBatch, int i, byte[] bArr, long j, boolean z, CodecType codecType) {
        try {
            if (i >= 500) {
                if (z && canRetry(recordBatch, j)) {
                    LOG.warn("Got error send points on database {}, retrying ({} attempts left). Error: {}", new Object[]{recordBatch.getDatabase(), Integer.valueOf((this.retries - recordBatch.attempts()) - 1), bArr});
                    reenqueueBatch(recordBatch, j);
                } else {
                    recordBatch.done(convert(bArr, codecType));
                }
            } else if (i == 400) {
                recordBatch.done(convert(bArr, codecType));
            } else {
                recordBatch.done(new ClientException("status code : " + i + ", msg: " + new String(bArr, StandardCharsets.UTF_8)));
            }
        } catch (Exception e) {
            recordBatch.done(new ClientException(e));
        }
    }

    public static LindormTSDBException convert(byte[] bArr, CodecType codecType) throws Exception {
        switch (codecType) {
            case LINE_PROTOCOL:
            case JSON:
                ErrorResult fromJSON = ErrorResult.fromJSON(new String(bArr, StandardCharsets.UTF_8));
                return new LindormTSDBException(fromJSON.getCode(), fromJSON.getSqlstate(), fromJSON.getMessage());
            case BINARY:
                UnsafeBuffer unsafeBuffer = new UnsafeBuffer(bArr);
                return new LindormTSDBException(unsafeBuffer.readInt(), unsafeBuffer.readString(), unsafeBuffer.readString());
            default:
                throw new UnsupportedOperationException("Unsupported codec type: " + codecType.name());
        }
    }

    private void reenqueueBatch(RecordBatch recordBatch, long j) {
        this.batchProcessor.reenqueue(recordBatch, j);
    }

    private void failBatch(RecordBatch recordBatch, Call<ResponseBody> call, Throwable th, long j) {
        LOG.error("Failed to send points.", th);
        recordBatch.done(th);
    }

    private boolean canRetry(RecordBatch recordBatch, long j) {
        return recordBatch.attempts() < this.retries && !recordBatch.isDone();
    }

    public void setRunning(boolean z) {
        this.running = z;
    }
}
