package com.aliyun.lindorm.tsdb.client.impl;

import com.aliyun.lindorm.tsdb.client.ClientOptions;
import com.aliyun.lindorm.tsdb.client.model.Record;
import com.aliyun.lindorm.tsdb.client.model.WriteResult;
import com.aliyun.lindorm.tsdb.client.shaded.com.squareup.okhttp3.MediaType;
import com.aliyun.lindorm.tsdb.client.utils.LockedBarrier;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/aliyun/lindorm/tsdb/client/impl/BatchProcessor.class */
public class BatchProcessor {
    private static final Logger LOG = LoggerFactory.getLogger(BatchProcessor.class);
    public static final MediaType MEDIA_TYPE_STRING = MediaType.parse("text/plain");
    private int batchSize;
    private long retryBackOffMs;
    private int maxPointBatches;
    private final LockedBarrier barrier;
    private final long maxWaitTimeMs;
    private Map<String, BatchQueue> batchQueues = new ConcurrentHashMap();
    private boolean closed = false;
    private LockedBarrier forceFlushBarrier = new LockedBarrier();
    private AtomicBoolean forceFlushing = new AtomicBoolean(false);
    private final AtomicInteger appendsInProgress = new AtomicInteger(0);

    public BatchProcessor(ClientOptions clientOptions, LockedBarrier lockedBarrier) {
        this.batchSize = 500;
        this.maxPointBatches = 256;
        this.retryBackOffMs = clientOptions.getRetryBackoffMs();
        this.maxWaitTimeMs = clientOptions.getMaxWaitTimeMs();
        this.batchSize = clientOptions.getBatchSize();
        this.maxPointBatches = clientOptions.getMaxPointBatches();
        this.barrier = lockedBarrier;
    }

    public CompletableFuture<WriteResult> append(String str, Record record, long j) {
        verify();
        this.appendsInProgress.incrementAndGet();
        BatchQueue orCreateBatchQueue = getOrCreateBatchQueue(str);
        orCreateBatchQueue.lock();
        try {
            CompletableFuture<WriteResult> tryAppend = tryAppend(orCreateBatchQueue, record, j);
            if (tryAppend != null) {
                orCreateBatchQueue.unlock();
                this.appendsInProgress.decrementAndGet();
                return tryAppend;
            }
            while (orCreateBatchQueue.isReachedMaxPointBatch()) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Reached max record batch size. Await the queue is has space to append.");
                }
                try {
                    orCreateBatchQueue.await();
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Signaled after await.");
                    }
                    CompletableFuture<WriteResult> tryAppend2 = tryAppend(orCreateBatchQueue, record, j);
                    if (tryAppend2 != null) {
                        orCreateBatchQueue.unlock();
                        this.appendsInProgress.decrementAndGet();
                        return tryAppend2;
                    }
                } catch (InterruptedException e) {
                    CompletableFuture<WriteResult> completableFuture = new CompletableFuture<>();
                    completableFuture.completeExceptionally(e);
                    orCreateBatchQueue.unlock();
                    this.appendsInProgress.decrementAndGet();
                    return completableFuture;
                }
            }
            RecordBatch recordBatch = new RecordBatch(str, this.batchSize);
            CompletableFuture<WriteResult> completableFuture2 = (CompletableFuture) Objects.requireNonNull(recordBatch.tryAppend(record, j));
            orCreateBatchQueue.addLast(recordBatch);
            orCreateBatchQueue.unlock();
            this.appendsInProgress.decrementAndGet();
            return completableFuture2;
        } catch (Throwable th) {
            orCreateBatchQueue.unlock();
            this.appendsInProgress.decrementAndGet();
            throw th;
        }
    }

    public CompletableFuture<WriteResult> append(String str, List<Record> list, long j) {
        verify();
        BatchQueue orCreateBatchQueue = getOrCreateBatchQueue(str);
        int size = list.size();
        this.appendsInProgress.addAndGet(size);
        orCreateBatchQueue.lock();
        try {
            Iterator<Record> it = list.iterator();
            CompletableFuture<WriteResult> tryAppend = tryAppend(orCreateBatchQueue, it, j);
            if (tryAppend != null && !it.hasNext()) {
                orCreateBatchQueue.unlock();
                this.appendsInProgress.addAndGet(-size);
                return tryAppend;
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug("Append to multi point batch.");
            }
            CompletableFuture<WriteResult> completedFuture = tryAppend != null ? tryAppend : CompletableFuture.completedFuture(WriteResult.success());
            int i = tryAppend == null ? 0 : 1;
            while (it.hasNext()) {
                while (orCreateBatchQueue.isReachedMaxPointBatch()) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Reached max point batch size. Await the queue is has space to append.");
                    }
                    try {
                        orCreateBatchQueue.await();
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Signaled after await.");
                        }
                    } catch (InterruptedException e) {
                        LOG.error("Interrupted.", e);
                        CompletableFuture<WriteResult> completableFuture = new CompletableFuture<>();
                        completableFuture.completeExceptionally(e);
                        orCreateBatchQueue.unlock();
                        this.appendsInProgress.addAndGet(-size);
                        return completableFuture;
                    }
                }
                RecordBatch recordBatch = new RecordBatch(str, this.batchSize);
                completedFuture = completedFuture.thenCombine((CompletionStage) Objects.requireNonNull(recordBatch.tryAppend(it, j)), (writeResult, writeResult2) -> {
                    return new WriteResult(writeResult.isSuccessful() && writeResult2.isSuccessful());
                });
                orCreateBatchQueue.addLast(recordBatch);
                i++;
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug("The records split into {} batches for database[{}].", Integer.valueOf(i), str);
            }
            signal();
            CompletableFuture<WriteResult> completableFuture2 = completedFuture;
            orCreateBatchQueue.unlock();
            this.appendsInProgress.addAndGet(-size);
            return completableFuture2;
        } catch (Throwable th) {
            orCreateBatchQueue.unlock();
            this.appendsInProgress.addAndGet(-size);
            throw th;
        }
    }

    private void signal() {
        this.barrier.signalAll();
    }

    private void verify() {
        if (this.closed) {
            throw new IllegalStateException("Lindorm TSDB closed while send in progress");
        }
        while (this.forceFlushing.get()) {
            LOG.info("Wait for the forced flush to end.");
            try {
                this.forceFlushBarrier.await(500L);
            } catch (InterruptedException e) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Ignored interrupt event.", e);
                }
            }
        }
    }

    public void startForceFlushing() {
        this.forceFlushing.set(true);
    }

    public void finishForceFlushing() {
        this.forceFlushing.set(false);
        this.forceFlushBarrier.signalAll();
        Iterator<Map.Entry<String, BatchQueue>> it = this.batchQueues.entrySet().iterator();
        while (it.hasNext()) {
            BatchQueue value = it.next().getValue();
            value.lock();
            try {
                value.signalAll();
                value.unlock();
            } catch (Throwable th) {
                value.unlock();
                throw th;
            }
        }
    }

    private CompletableFuture<WriteResult> tryAppend(BatchQueue batchQueue, Record record, long j) {
        RecordBatch peekLast = batchQueue.peekLast();
        if (peekLast == null) {
            return null;
        }
        CompletableFuture<WriteResult> tryAppend = peekLast.tryAppend(record, j);
        if (tryAppend == null) {
            peekLast.closeForPointsAppends();
            return null;
        }
        signal();
        return tryAppend;
    }

    private CompletableFuture<WriteResult> tryAppend(BatchQueue batchQueue, Iterator<Record> it, long j) {
        RecordBatch peekLast = batchQueue.peekLast();
        if (peekLast == null) {
            return null;
        }
        CompletableFuture<WriteResult> tryAppend = peekLast.tryAppend(it, j);
        if (tryAppend != null) {
            return tryAppend;
        }
        peekLast.closeForPointsAppends();
        return null;
    }

    private BatchQueue getOrCreateBatchQueue(String str) {
        return this.batchQueues.computeIfAbsent(str, str2 -> {
            return new BatchQueue(str, this.maxPointBatches);
        });
    }

    public Map<String, List<RecordBatch>> drain(long j) {
        HashMap hashMap = new HashMap();
        for (Map.Entry<String, BatchQueue> entry : this.batchQueues.entrySet()) {
            String key = entry.getKey();
            BatchQueue value = entry.getValue();
            value.lock();
            try {
                RecordBatch peekFirst = value.peekFirst();
                if (peekFirst == null) {
                    value.unlock();
                } else if (peekFirst.attempts() > 0 && peekFirst.waitedTimeMs(j) < this.retryBackOffMs) {
                    value.unlock();
                } else if (peekFirst.isFrozen() || peekFirst.isFull() || isReachedMaxWaitTime(peekFirst, j)) {
                    peekFirst.closeForPointsAppends();
                    RecordBatch pollFirst = value.pollFirst();
                    pollFirst.drained(j);
                    hashMap.put(key, Collections.singletonList(pollFirst));
                    value.signalAll();
                }
            } finally {
                value.unlock();
            }
        }
        return hashMap;
    }

    public Map<String, List<RecordBatch>> drainAll() {
        HashMap hashMap = new HashMap();
        for (Map.Entry<String, BatchQueue> entry : this.batchQueues.entrySet()) {
            String key = entry.getKey();
            BatchQueue value = entry.getValue();
            value.lock();
            try {
                List<RecordBatch> drainAll = value.drainAll();
                if (!drainAll.isEmpty()) {
                    hashMap.put(key, drainAll);
                    value.unlock();
                }
            } finally {
                value.unlock();
            }
        }
        return hashMap;
    }

    private boolean isReachedMaxWaitTime(RecordBatch recordBatch, long j) {
        return j - recordBatch.getCreatedMs() > this.maxWaitTimeMs;
    }

    public void close() {
        this.closed = true;
    }

    public void reenqueue(RecordBatch recordBatch, long j) {
        recordBatch.reenqueued(j);
        BatchQueue orCreateBatchQueue = getOrCreateBatchQueue(recordBatch.getDatabase());
        orCreateBatchQueue.lock();
        try {
            orCreateBatchQueue.addFirst(recordBatch);
            orCreateBatchQueue.unlock();
        } catch (Throwable th) {
            orCreateBatchQueue.unlock();
            throw th;
        }
    }
}
