package com.alicloud.openservices.tablestore;

import com.alicloud.openservices.tablestore.core.utils.ParamChecker;
import com.alicloud.openservices.tablestore.core.utils.Preconditions;
import com.alicloud.openservices.tablestore.model.ConsumedCapacity;
import com.alicloud.openservices.tablestore.model.DescribeTableRequest;
import com.alicloud.openservices.tablestore.model.RowChange;
import com.alicloud.openservices.tablestore.model.TableMeta;
import com.alicloud.openservices.tablestore.writer.DefaultWriterStatistics;
import com.alicloud.openservices.tablestore.writer.RowChangeEvent;
import com.alicloud.openservices.tablestore.writer.RowChangeEventHandler;
import com.alicloud.openservices.tablestore.writer.RowWriteResult;
import com.alicloud.openservices.tablestore.writer.WriterConfig;
import com.alicloud.openservices.tablestore.writer.WriterStatistics;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.InsufficientCapacityException;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/alicloud/openservices/tablestore/DefaultTableStoreWriter.class */
public class DefaultTableStoreWriter implements TableStoreWriter {
    private AsyncClientInterface ots;
    private Executor executor;
    private WriterConfig writerConfig;
    private TableStoreCallback<RowChange, ConsumedCapacity> callback;
    private TableStoreCallback<RowChange, RowWriteResult> resultCallback;
    private String tableName;
    private TableMeta tableMeta;
    private Timer flushTimer;
    private ReentrantLock lock;
    private Disruptor<RowChangeEvent> disruptor;
    private RingBuffer<RowChangeEvent> ringBuffer;
    private RowChangeEventHandler eventHandler;
    private ExecutorService disruptorExecutor;
    private DefaultWriterStatistics writerStatistics;
    private Logger logger = LoggerFactory.getLogger(TableStoreWriter.class);
    private AtomicBoolean closed = new AtomicBoolean(false);

    public DefaultTableStoreWriter(AsyncClientInterface asyncClientInterface, String str, WriterConfig writerConfig, TableStoreCallback<RowChange, ConsumedCapacity> tableStoreCallback, Executor executor) {
        Preconditions.checkNotNull(asyncClientInterface, "The ots client can not be null.");
        Preconditions.checkArgument((str == null || str.isEmpty()) ? false : true, "The table name can not be null or empty.");
        Preconditions.checkNotNull(executor, "The executor service can not be null.");
        this.writerStatistics = new DefaultWriterStatistics();
        this.ots = asyncClientInterface;
        this.tableName = str;
        this.writerConfig = writerConfig;
        this.callback = tableStoreCallback;
        this.resultCallback = createResultCallback(tableStoreCallback);
        this.executor = executor;
        this.flushTimer = new Timer();
        this.lock = new ReentrantLock();
        initialize();
        this.closed.set(false);
    }

    private TableStoreCallback<RowChange, RowWriteResult> createResultCallback(final TableStoreCallback<RowChange, ConsumedCapacity> tableStoreCallback) {
        if (tableStoreCallback != null) {
            return new TableStoreCallback<RowChange, RowWriteResult>() { // from class: com.alicloud.openservices.tablestore.DefaultTableStoreWriter.1
                @Override // com.alicloud.openservices.tablestore.TableStoreCallback
                public void onCompleted(RowChange rowChange, RowWriteResult rowWriteResult) {
                    tableStoreCallback.onCompleted(rowChange, rowWriteResult.getConsumedCapacity());
                }

                @Override // com.alicloud.openservices.tablestore.TableStoreCallback
                public void onFailed(RowChange rowChange, Exception exc) {
                    tableStoreCallback.onFailed(rowChange, exc);
                }
            };
        }
        return null;
    }

    private void initialize() {
        this.logger.info("Start initialize ots writer, table name: {}.", this.tableName);
        DescribeTableRequest describeTableRequest = new DescribeTableRequest();
        describeTableRequest.setTableName(this.tableName);
        try {
            this.tableMeta = this.ots.describeTable(describeTableRequest, null).get().getTableMeta();
            this.logger.info("End initialize with table meta: {}.", this.tableMeta);
            RowChangeEvent.RowChangeEventFactory rowChangeEventFactory = new RowChangeEvent.RowChangeEventFactory();
            this.disruptorExecutor = Executors.newFixedThreadPool(1);
            this.disruptor = new Disruptor<>(rowChangeEventFactory, this.writerConfig.getBufferSize(), this.disruptorExecutor);
            this.ringBuffer = this.disruptor.getRingBuffer();
            this.eventHandler = new RowChangeEventHandler(this.ots, this.writerConfig, this.resultCallback, this.executor, this.writerStatistics);
            this.disruptor.handleEventsWith(new EventHandler[]{this.eventHandler});
            this.disruptor.start();
            startFlushTimer(this.writerConfig.getFlushInterval());
        } catch (Exception e) {
            throw new ClientException(e);
        }
    }

    public void startFlushTimer(int i) {
        this.flushTimer.cancel();
        this.flushTimer = new Timer();
        this.flushTimer.scheduleAtFixedRate(new TimerTask() { // from class: com.alicloud.openservices.tablestore.DefaultTableStoreWriter.2
            @Override // java.util.TimerTask, java.lang.Runnable
            public void run() {
                DefaultTableStoreWriter.this.triggerFlush();
            }
        }, i, i);
    }

    @Override // com.alicloud.openservices.tablestore.TableStoreWriter
    public void addRowChange(RowChange rowChange) {
        if (this.writerConfig.isEnableSchemaCheck()) {
            ParamChecker.checkRowChange(this.tableMeta, rowChange, this.writerConfig);
        }
        while (!addRowChangeInternal(rowChange)) {
            try {
                Thread.sleep(1L);
            } catch (InterruptedException e) {
            }
        }
    }

    @Override // com.alicloud.openservices.tablestore.TableStoreWriter
    public boolean tryAddRowChange(RowChange rowChange) {
        if (this.writerConfig.isEnableSchemaCheck()) {
            ParamChecker.checkRowChange(this.tableMeta, rowChange, this.writerConfig);
        }
        return addRowChangeInternal(rowChange);
    }

    public boolean addRowChangeInternal(RowChange rowChange) {
        if (this.closed.get()) {
            throw new ClientException("The writer has been closed.");
        }
        try {
            long tryNext = this.ringBuffer.tryNext();
            ((RowChangeEvent) this.ringBuffer.get(tryNext)).setValue(rowChange);
            this.ringBuffer.publish(tryNext);
            return true;
        } catch (InsufficientCapacityException e) {
            return false;
        }
    }

    private void addSignal(ReentrantLock reentrantLock, Condition condition) {
        while (true) {
            try {
                long tryNext = this.ringBuffer.tryNext();
                ((RowChangeEvent) this.ringBuffer.get(tryNext)).setValue(reentrantLock, condition);
                this.ringBuffer.publish(tryNext);
                return;
            } catch (InsufficientCapacityException e) {
                try {
                    Thread.sleep(1L);
                } catch (InterruptedException e2) {
                }
            }
        }
    }

    @Override // com.alicloud.openservices.tablestore.TableStoreWriter
    public void addRowChange(List<RowChange> list, List<RowChange> list2) throws ClientException {
        list2.clear();
        for (RowChange rowChange : list) {
            try {
                addRowChange(rowChange);
            } catch (ClientException e) {
                list2.add(rowChange);
            }
        }
        if (!list2.isEmpty()) {
            throw new ClientException("There is dirty rows.");
        }
    }

    @Override // com.alicloud.openservices.tablestore.TableStoreWriter
    public void setCallback(TableStoreCallback<RowChange, ConsumedCapacity> tableStoreCallback) {
        this.callback = tableStoreCallback;
        this.resultCallback = createResultCallback(tableStoreCallback);
        this.eventHandler.setCallback(this.resultCallback);
    }

    @Override // com.alicloud.openservices.tablestore.TableStoreWriter
    public void setResultCallback(TableStoreCallback<RowChange, RowWriteResult> tableStoreCallback) {
        this.callback = null;
        this.resultCallback = tableStoreCallback;
        this.eventHandler.setCallback(tableStoreCallback);
    }

    @Override // com.alicloud.openservices.tablestore.TableStoreWriter
    public TableStoreCallback<RowChange, ConsumedCapacity> getCallback() {
        return this.callback;
    }

    @Override // com.alicloud.openservices.tablestore.TableStoreWriter
    public TableStoreCallback<RowChange, RowWriteResult> getResultCallback() {
        return this.resultCallback;
    }

    @Override // com.alicloud.openservices.tablestore.TableStoreWriter
    public WriterConfig getWriterConfig() {
        return this.writerConfig;
    }

    @Override // com.alicloud.openservices.tablestore.TableStoreWriter
    public WriterStatistics getWriterStatistics() {
        return this.writerStatistics;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void triggerFlush() {
        addSignal(this.lock, this.lock.newCondition());
    }

    @Override // com.alicloud.openservices.tablestore.TableStoreWriter
    public synchronized void flush() throws ClientException {
        this.logger.debug("trigger flush and waiting.");
        if (this.closed.get()) {
            throw new ClientException("The writer has been closed.");
        }
        Condition newCondition = this.lock.newCondition();
        this.lock.lock();
        try {
            try {
                addSignal(this.lock, newCondition);
                newCondition.await();
                this.lock.unlock();
                this.logger.debug("user trigger flush finished.");
            } catch (InterruptedException e) {
                throw new ClientException(e);
            }
        } catch (Throwable th) {
            this.lock.unlock();
            throw th;
        }
    }

    @Override // com.alicloud.openservices.tablestore.TableStoreWriter
    public synchronized void close() {
        if (this.closed.get()) {
            throw new ClientException("The writer has already been closed.");
        }
        this.flushTimer.cancel();
        flush();
        this.disruptor.shutdown();
        this.disruptorExecutor.shutdown();
        this.closed.set(true);
    }
}
