package com.aliyun.openservices.ots.internal.writer;

import com.aliyun.openservices.ots.ClientException;
import com.aliyun.openservices.ots.OTSAsync;
import com.aliyun.openservices.ots.internal.OTSCallback;
import com.aliyun.openservices.ots.internal.writer.RowChangeEvent;
import com.aliyun.openservices.ots.model.BatchWriteRowRequest;
import com.aliyun.openservices.ots.model.ConsumedCapacity;
import com.aliyun.openservices.ots.model.OTSContext;
import com.aliyun.openservices.ots.model.RowChange;
import com.lmax.disruptor.EventHandler;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/aliyun/openservices/ots/internal/writer/RowChangeEventHandler.class */
public class RowChangeEventHandler implements EventHandler<RowChangeEvent> {
    private OTSAsync ots;
    private WriteRPCBuffer buffer;
    private int concurrency;
    private Semaphore semaphore;
    private OTSCallback<RowChange, ConsumedCapacity> callback;
    private Executor executor;
    private Logger logger = LoggerFactory.getLogger(RowChangeEventHandler.class);
    private AtomicLong totalRequestCount = new AtomicLong();

    public RowChangeEventHandler(OTSAsync oTSAsync, WriterConfig writerConfig, OTSCallback<RowChange, ConsumedCapacity> oTSCallback, Executor executor) {
        this.ots = oTSAsync;
        this.buffer = new WriteRPCBuffer(writerConfig);
        this.concurrency = writerConfig.getConcurrency();
        this.semaphore = new Semaphore(this.concurrency);
        this.callback = oTSCallback;
        this.executor = executor;
    }

    public void onEvent(RowChangeEvent rowChangeEvent, long j, boolean z) throws Exception {
        BatchWriteRowRequest batchWriteRowRequest = null;
        boolean z2 = false;
        CountDownLatch countDownLatch = null;
        if (rowChangeEvent.type == RowChangeEvent.EventType.FLUSH) {
            this.logger.debug("FlushSignal with QueueSize: {}", Integer.valueOf(this.buffer.getTotalRowsCount()));
            if (this.buffer.getTotalRowsCount() > 0) {
                batchWriteRowRequest = this.buffer.makeRequest();
                this.buffer.clear();
            }
            z2 = true;
            countDownLatch = rowChangeEvent.latch;
        } else {
            final RowChange rowChange = rowChangeEvent.rowChange;
            if (!this.buffer.appendRowChange(rowChange)) {
                batchWriteRowRequest = this.buffer.makeRequest();
                this.buffer.clear();
                if (!this.buffer.appendRowChange(rowChange)) {
                    this.executor.execute(new Runnable() { // from class: com.aliyun.openservices.ots.internal.writer.RowChangeEventHandler.1
                        @Override // java.lang.Runnable
                        public void run() {
                            RowChangeEventHandler.this.callback.onFailed(new OTSContext(rowChange, null), new ClientException("Can not even append only one row into buffer."));
                        }
                    });
                }
            }
        }
        if (batchWriteRowRequest != null) {
            this.semaphore.acquire();
            this.logger.debug("Acquire semaphore, start send async request.");
            final BatchWriteRowRequest batchWriteRowRequest2 = batchWriteRowRequest;
            this.executor.execute(new Runnable() { // from class: com.aliyun.openservices.ots.internal.writer.RowChangeEventHandler.2
                @Override // java.lang.Runnable
                public void run() {
                    RowChangeEventHandler.this.totalRequestCount.incrementAndGet();
                    RowChangeEventHandler.this.ots.batchWriteRow(batchWriteRowRequest2, new FlushCallback(RowChangeEventHandler.this.ots, new AtomicInteger(1), RowChangeEventHandler.this.semaphore, RowChangeEventHandler.this.callback, RowChangeEventHandler.this.executor));
                }
            });
        }
        if (z2) {
            waitFlush();
            countDownLatch.countDown();
        }
    }

    private void waitFlush() throws InterruptedException {
        this.logger.debug("Wait flush.");
        for (int i = 0; i < this.concurrency; i++) {
            this.semaphore.acquire();
            this.logger.debug("Wait flush: {}, {}", Integer.valueOf(i), Integer.valueOf(this.concurrency));
        }
        this.semaphore.release(this.concurrency);
        this.logger.debug("Wait flush finished.");
    }

    public long getTotalRPCCount() {
        return this.totalRequestCount.get();
    }
}
