/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.hologres.client.impl.collector;

import com.alibaba.hologres.client.Get;
import com.alibaba.hologres.client.HoloConfig;
import com.alibaba.hologres.client.exception.HoloClientException;
import com.alibaba.hologres.client.exception.HoloClientWithDetailsException;
import com.alibaba.hologres.client.impl.ExecutionPool;
import com.alibaba.hologres.client.impl.collector.DefaultResizePolicy;
import com.alibaba.hologres.client.impl.collector.ResizePolicy;
import com.alibaba.hologres.client.impl.collector.TableCollector;
import com.alibaba.hologres.client.model.Partition;
import com.alibaba.hologres.client.model.Record;
import com.alibaba.hologres.client.model.TableName;
import com.alibaba.hologres.client.model.TableSchema;
import com.alibaba.hologres.client.utils.IdentifierUtil;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ActionCollector {
    private static final Logger LOGGER = LoggerFactory.getLogger(ActionCollector.class);
    Map<TableName, TableCollector> map;
    private ReentrantReadWriteLock flushLock = new ReentrantReadWriteLock();
    final HoloConfig config;
    final ExecutionPool pool;
    final ArrayBlockingQueue<Get> queue;
    private final ResizePolicy resizePolicy;
    private final long writerShardCountResizeIntervalNano;
    AtomicReference<HoloClientWithDetailsException> lastException = new AtomicReference<Object>(null);

    public ActionCollector(HoloConfig config, ExecutionPool pool, ArrayBlockingQueue<Get> queue) {
        this.map = new ConcurrentHashMap<TableName, TableCollector>();
        this.config = config;
        this.pool = pool;
        this.queue = queue;
        this.resizePolicy = new DefaultResizePolicy();
        this.resizePolicy.init(config);
        this.writerShardCountResizeIntervalNano = config.getWriterShardCountResizeIntervalMs() * 1000000L;
    }

    public long getByteSize() {
        return this.map.values().stream().collect(Collectors.summingLong(TableCollector::getByteSize));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void append(Record record) throws HoloClientException {
        this.flushLock.readLock().lock();
        try {
            TableCollector pairArray = this.map.computeIfAbsent(record.getSchema().getTableNameObj(), tableName -> new TableCollector(this.config, this.pool));
            pairArray.append(record);
            HoloClientException exception = this.lastException.getAndSet(null);
            if (null != exception) {
                throw exception;
            }
        }
        finally {
            this.flushLock.readLock().unlock();
        }
    }

    private boolean rewriteForPartitionTable(Get get) {
        Record record = get.getRecord();
        TableSchema schema = record.getSchema();
        TableName tableName = schema.getTableNameObj();
        if (schema.isPartitionParentTable() && schema.getPartitionIndex() > -1) {
            try {
                Partition partition = this.pool.getOrSubmitPartition(tableName, String.valueOf(record.getObject(schema.getPartitionIndex())), 12 == schema.getColumn(schema.getPartitionIndex()).getType());
                if (partition == null) {
                    get.getFuture().complete(null);
                    return true;
                }
                TableSchema newSchema = this.pool.getOrSubmitTableSchema(TableName.valueOf(IdentifierUtil.quoteIdentifier(partition.getSchemaName(), true), IdentifierUtil.quoteIdentifier(partition.getTableName(), true)), false);
                record.changeToChildSchema(newSchema);
            }
            catch (HoloClientException e) {
                get.getFuture().completeExceptionally(e);
                return true;
            }
        }
        return false;
    }

    public void appendGet(Get get) {
        get.setFuture(new CompletableFuture<Record>());
        if (get.isFullColumn()) {
            for (int i = 0; i < get.getRecord().getSchema().getColumnSchema().length; ++i) {
                if (get.getRecord().isSet(i)) continue;
                get.getRecord().setObject(i, null);
            }
        }
        if (this.rewriteForPartitionTable(get)) {
            return;
        }
        try {
            if (!this.queue.offer(get, 10000L, TimeUnit.MILLISECONDS)) {
                get.getFuture().completeExceptionally(new TimeoutException());
            }
        }
        catch (InterruptedException e) {
            get.getFuture().completeExceptionally(e);
        }
    }

    public void appendGet(List<Get> list) {
        for (Get get : list) {
            get.setFuture(new CompletableFuture<Record>());
            if (!get.isFullColumn()) continue;
            for (int i = 0; i < get.getRecord().getSchema().getColumnSchema().length; ++i) {
                if (get.getRecord().isSet(i)) continue;
                get.getRecord().setObject(i, null);
            }
        }
        try {
            long start = System.currentTimeMillis();
            boolean timeout = false;
            for (Get get : list) {
                if (timeout) {
                    get.getFuture().completeExceptionally(new TimeoutException());
                    continue;
                }
                if (this.rewriteForPartitionTable(get) || this.queue.offer(get, 10000L, TimeUnit.MILLISECONDS)) continue;
                get.getFuture().completeExceptionally(new TimeoutException());
                timeout = true;
            }
        }
        catch (InterruptedException e) {
            for (Get get : list) {
                get.getFuture().completeExceptionally(e);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void tryFlush() throws HoloClientException {
        this.flushLock.readLock().lock();
        try {
            Iterator<Map.Entry<TableName, TableCollector>> iter = this.map.entrySet().iterator();
            while (iter.hasNext()) {
                TableCollector array = iter.next().getValue();
                try {
                    array.flush(false);
                }
                catch (HoloClientWithDetailsException e) {
                    this.lastException.accumulateAndGet(e, (lastOne, newOne) -> {
                        if (lastOne == null) {
                            return newOne;
                        }
                        return lastOne.merge((HoloClientWithDetailsException)newOne);
                    });
                }
            }
        }
        finally {
            this.flushLock.readLock().unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void flush(boolean internal) throws HoloClientException {
        block12: {
            this.flushLock.writeLock().lock();
            try {
                HoloClientWithDetailsException exception = null;
                int doneCount = 0;
                AtomicInteger uncommittedActionCount = new AtomicInteger(0);
                boolean async = true;
                while (true) {
                    doneCount = 0;
                    uncommittedActionCount.set(0);
                    Iterator<Map.Entry<TableName, TableCollector>> iter = this.map.entrySet().iterator();
                    while (iter.hasNext()) {
                        TableCollector array = iter.next().getValue();
                        try {
                            if (!array.flush(true, async, uncommittedActionCount)) continue;
                            ++doneCount;
                        }
                        catch (HoloClientWithDetailsException e) {
                            if (null == exception) {
                                exception = e;
                                continue;
                            }
                            exception.merge(e);
                        }
                        catch (HoloClientException e) {
                            throw e;
                        }
                    }
                    if (doneCount == this.map.size()) break;
                    if (uncommittedActionCount.get() != 0) continue;
                    async = false;
                }
                this.resize();
                if (exception == null) break block12;
                if (internal) {
                    this.lastException.accumulateAndGet(exception, (lastOne, newOne) -> {
                        if (lastOne == null) {
                            return newOne;
                        }
                        return lastOne.merge((HoloClientWithDetailsException)newOne);
                    });
                    break block12;
                }
                HoloClientWithDetailsException last = this.lastException.getAndSet(null);
                if (null == last) {
                    last = exception;
                } else {
                    last.merge(exception);
                }
                throw last;
            }
            finally {
                this.flushLock.writeLock().unlock();
            }
        }
    }

    private void resize() {
        long currentNano = System.nanoTime();
        for (Map.Entry<TableName, TableCollector> entry : this.map.entrySet()) {
            int size;
            TableName tableName = entry.getKey();
            TableCollector tableCollector = entry.getValue();
            if (tableCollector.getStat().getNanoTime() + this.writerShardCountResizeIntervalNano >= currentNano) continue;
            int currentSize = tableCollector.getShardCount();
            if (currentSize != (size = this.resizePolicy.calculate(tableName, tableCollector.getStat(), tableCollector.getShardCount(), this.pool.getWorkerCount(), currentNano))) {
                LOGGER.info("resize table {} shard size , {} -> {}", new Object[]{tableName, currentSize, size});
                tableCollector.resize(size);
            }
            tableCollector.getStat().clear();
        }
    }
}

