package com.starrocks.data.load.stream;

import com.starrocks.data.load.stream.LabelGeneratorFactory;
import com.starrocks.data.load.stream.StreamLoadDataFormat;
import com.starrocks.data.load.stream.StreamLoadStrategy;
import com.starrocks.data.load.stream.properties.StreamLoadProperties;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.LockSupport;
import java.util.concurrent.locks.ReentrantLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/starrocks/data/load/stream/DefaultStreamLoadManager.class */
public class DefaultStreamLoadManager implements StreamLoadManager, Serializable {
    private static final long serialVersionUID = 1;
    private static final Logger LOG = LoggerFactory.getLogger(DefaultStreamLoadManager.class);
    private static final Logger log = LoggerFactory.getLogger(DefaultStreamLoadManager.class);
    private final StreamLoadProperties properties;
    private final StreamLoader streamLoader;
    private final long maxCacheBytes;
    private final long maxWriteBlockCacheBytes;
    private final Map<String, TableRegion> regions;
    private final AtomicLong currentCacheBytes;
    private final AtomicLong totalFlushRows;
    private final AtomicLong numberTotalRows;
    private final AtomicLong numberLoadRows;
    private final StreamLoadStrategy loadStrategy;
    private final long scanningFrequency;
    private Thread current;
    private Thread manager;
    private volatile boolean savepoint;
    private final Lock lock;
    private final Condition writable;
    private final Condition flushable;
    private final AtomicReference<State> state;
    private volatile Throwable e;
    private final Queue<TableRegion> waitQ;
    private final Queue<TableRegion> prepareQ;
    private final Queue<TableRegion> commitQ;
    private transient LabelGeneratorFactory labelGeneratorFactory;
    private transient AtomicBoolean writeTriggerFlush;
    private transient LoadMetrics loadMetrics;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/starrocks/data/load/stream/DefaultStreamLoadManager$State.class */
    public enum State {
        ACTIVE,
        INACTIVE
    }

    public DefaultStreamLoadManager(StreamLoadProperties streamLoadProperties) {
        this(streamLoadProperties, new StreamLoadStrategy.DefaultLoadStrategy(streamLoadProperties));
    }

    public DefaultStreamLoadManager(StreamLoadProperties streamLoadProperties, StreamLoadStrategy streamLoadStrategy) {
        this.regions = new HashMap();
        this.currentCacheBytes = new AtomicLong(0L);
        this.totalFlushRows = new AtomicLong(0L);
        this.numberTotalRows = new AtomicLong(0L);
        this.numberLoadRows = new AtomicLong(0L);
        this.savepoint = false;
        this.lock = new ReentrantLock();
        this.writable = this.lock.newCondition();
        this.flushable = this.lock.newCondition();
        this.state = new AtomicReference<>(State.INACTIVE);
        this.waitQ = new ConcurrentLinkedQueue();
        this.prepareQ = new LinkedList();
        this.commitQ = new LinkedList();
        this.properties = streamLoadProperties;
        this.streamLoader = streamLoadProperties.isEnableTransaction() ? new TransactionStreamLoader() : new DefaultStreamLoader();
        this.maxCacheBytes = streamLoadProperties.getMaxCacheBytes();
        this.maxWriteBlockCacheBytes = 2 * this.maxCacheBytes;
        this.scanningFrequency = streamLoadProperties.getScanningFrequency();
        this.loadStrategy = streamLoadStrategy;
    }

    @Override // com.starrocks.data.load.stream.StreamLoadManager
    public void init() {
        this.labelGeneratorFactory = new LabelGeneratorFactory.DefaultLabelGeneratorFactory(this.properties.getLabelPrefix());
        this.writeTriggerFlush = new AtomicBoolean(false);
        this.loadMetrics = new LoadMetrics();
        if (this.state.compareAndSet(State.INACTIVE, State.ACTIVE)) {
            this.manager = new Thread(() -> {
                TableRegion orElse;
                Long l = null;
                log.info("manager running, scanningFrequency : {}", Long.valueOf(this.scanningFrequency));
                while (true) {
                    this.lock.lock();
                    try {
                        try {
                            this.flushable.await(this.scanningFrequency, TimeUnit.MILLISECONDS);
                            this.lock.unlock();
                            if (l == null || System.currentTimeMillis() - l.longValue() > 4999) {
                                log.info("manager report, current Bytes : {},  waitQ : {}, prepareQ : {}, commitQ : {}", new Object[]{Long.valueOf(this.currentCacheBytes.get()), Integer.valueOf(this.waitQ.size()), Integer.valueOf(this.prepareQ.size()), Integer.valueOf(this.commitQ.size())});
                                l = Long.valueOf(System.currentTimeMillis());
                            }
                            Iterator<TableRegion> it = this.waitQ.iterator();
                            while (it.hasNext()) {
                                TableRegion next = it.next();
                                if (next.isReadable()) {
                                    this.prepareQ.offer(next);
                                    it.remove();
                                    LOG.debug("Move table region {}.{} from waitQ to prepareQ", next.getDatabase(), next.getTable());
                                } else {
                                    next.getAndIncrementAge();
                                    LOG.debug("Increment age of table region {}.{} in waitQ", next.getDatabase(), next.getTable());
                                }
                            }
                            Iterator<TableRegion> it2 = this.prepareQ.iterator();
                            while (it2.hasNext()) {
                                TableRegion next2 = it2.next();
                                if (next2.testPrepare()) {
                                    if (!next2.isReadable()) {
                                        next2.cancel();
                                        this.waitQ.offer(next2);
                                        it2.remove();
                                        LOG.debug("Move table region {}.{} from prepareQ to waitQ", next2.getDatabase(), next2.getTable());
                                    } else if (next2.prepare()) {
                                        this.commitQ.offer(next2);
                                        it2.remove();
                                        LOG.debug("Move table region {}.{} from prepareQ to commitQ", next2.getDatabase(), next2.getTable());
                                    }
                                }
                            }
                            boolean z = false;
                            if (this.savepoint) {
                                Iterator<TableRegion> it3 = this.commitQ.iterator();
                                while (it3.hasNext()) {
                                    TableRegion next3 = it3.next();
                                    next3.getAndIncrementAge();
                                    if (next3.flush()) {
                                        this.waitQ.offer(next3);
                                        if (next3.isFlushing()) {
                                            z = true;
                                        }
                                        it3.remove();
                                        LOG.debug("Move table region {}.{} from commitQ to waitQ because of savepoint", next3.getDatabase(), next3.getTable());
                                    }
                                }
                            } else {
                                for (TableRegion tableRegion : this.loadStrategy.select(this.commitQ)) {
                                    if (tableRegion.flush()) {
                                        this.waitQ.offer(tableRegion);
                                        this.commitQ.remove(tableRegion);
                                        if (tableRegion.isFlushing()) {
                                            z = true;
                                        }
                                        LOG.debug("Move table region {}.{} from commitQ to waitQ for normal", tableRegion.getDatabase(), tableRegion.getTable());
                                    }
                                }
                            }
                            if (!z && this.currentCacheBytes.get() >= this.maxCacheBytes && (orElse = this.commitQ.stream().max((tableRegion2, tableRegion3) -> {
                                return tableRegion2.getFlushBytes() != tableRegion3.getFlushBytes() ? Long.compare(tableRegion3.getFlushBytes(), tableRegion2.getFlushBytes()) : Long.compare(tableRegion3.getCacheBytes(), tableRegion2.getCacheBytes());
                            }).orElse(null)) != null && orElse.flush()) {
                                this.commitQ.remove(orElse);
                                this.waitQ.offer(orElse);
                                LOG.debug("Move table region {}.{} from commitQ to waitQ for max cache bytes", orElse.getDatabase(), orElse.getTable());
                            }
                            if (this.savepoint) {
                                LockSupport.unpark(this.current);
                            }
                        } catch (InterruptedException e) {
                            if (this.savepoint) {
                                this.savepoint = false;
                                LockSupport.unpark(this.current);
                            }
                            this.lock.unlock();
                            return;
                        }
                    } catch (Throwable th) {
                        this.lock.unlock();
                        throw th;
                    }
                }
            }, "StarRocks-Sink-Manager");
            this.manager.setDaemon(true);
            this.manager.start();
            this.manager.setUncaughtExceptionHandler((thread, th) -> {
                log.error("StarRocks-Sink-Manager error", th);
                this.e = th;
            });
            log.info("StarRocks-Sink-Manager start, {}", EnvUtils.getGitInformation());
            this.streamLoader.start(this.properties, this);
        }
    }

    @Override // com.starrocks.data.load.stream.StreamLoadManager
    public void write(String str, String str2, String str3, String... strArr) {
        TableRegion cacheRegion = getCacheRegion(str, str2, str3);
        for (String str4 : strArr) {
            AssertNotException();
            if (LOG.isTraceEnabled()) {
                Logger logger = LOG;
                Object[] objArr = new Object[4];
                objArr[0] = str == null ? "null" : str;
                objArr[1] = str2;
                objArr[2] = str3;
                objArr[3] = str4;
                logger.trace("Write uniqueKey {}, database {}, table {}, row {}", objArr);
            }
            long addAndGet = this.currentCacheBytes.addAndGet(cacheRegion.write(str4.getBytes(StandardCharsets.UTF_8)));
            if (addAndGet >= this.maxWriteBlockCacheBytes) {
                this.lock.lock();
                int i = 0;
                while (this.currentCacheBytes.get() >= this.maxWriteBlockCacheBytes) {
                    try {
                        try {
                            AssertNotException();
                            log.info("Cache full, wait flush, currentBytes: {}, maxWriteBlockCacheBytes: {}", Long.valueOf(this.currentCacheBytes.get()), Long.valueOf(this.maxWriteBlockCacheBytes));
                            this.flushable.signal();
                            i++;
                            this.writable.await(Math.min(i, 5), TimeUnit.SECONDS);
                        } catch (InterruptedException e) {
                            this.e = e;
                            throw new RuntimeException(e);
                        }
                    } finally {
                        this.lock.unlock();
                    }
                }
            } else if (addAndGet >= this.maxCacheBytes && this.writeTriggerFlush.compareAndSet(false, true)) {
                this.lock.lock();
                try {
                    this.flushable.signal();
                    this.lock.unlock();
                    LOG.info("Trigger flush, currentBytes: {}, maxCacheBytes: {}", Long.valueOf(addAndGet), Long.valueOf(this.maxCacheBytes));
                } finally {
                    this.lock.unlock();
                }
            }
        }
    }

    @Override // com.starrocks.data.load.stream.StreamLoadManager
    public void callback(StreamLoadResponse streamLoadResponse) {
        long andAdd = streamLoadResponse.getFlushBytes() != null ? this.currentCacheBytes.getAndAdd(-streamLoadResponse.getFlushBytes().longValue()) : this.currentCacheBytes.get();
        if (streamLoadResponse.getFlushRows() != null) {
            this.totalFlushRows.addAndGet(streamLoadResponse.getFlushRows().longValue());
        }
        this.writeTriggerFlush.set(false);
        log.info("pre bytes : {}, current bytes : {}, totalFlushRows : {}", new Object[]{Long.valueOf(andAdd), Long.valueOf(this.currentCacheBytes.get()), Long.valueOf(this.totalFlushRows.get())});
        this.lock.lock();
        try {
            this.writable.signal();
            this.lock.unlock();
            if (streamLoadResponse.getException() != null) {
                log.error("Stream load failed", streamLoadResponse.getException());
                this.e = streamLoadResponse.getException();
            }
            if (streamLoadResponse.getBody() != null) {
                if (streamLoadResponse.getBody().getNumberTotalRows() != null) {
                    this.numberTotalRows.addAndGet(streamLoadResponse.getBody().getNumberTotalRows().longValue());
                }
                if (streamLoadResponse.getBody().getNumberLoadedRows() != null) {
                    this.numberLoadRows.addAndGet(streamLoadResponse.getBody().getNumberLoadedRows().longValue());
                }
            }
            if (streamLoadResponse.getException() != null) {
                this.loadMetrics.updateFailedLoad();
            } else {
                this.loadMetrics.updateSuccessLoad(streamLoadResponse);
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug("{}", this.loadMetrics);
            }
        } catch (Throwable th) {
            this.lock.unlock();
            throw th;
        }
    }

    @Override // com.starrocks.data.load.stream.StreamLoadManager
    public void callback(Throwable th) {
        log.error("Stream load failed", th);
        this.e = th;
    }

    @Override // com.starrocks.data.load.stream.StreamLoadManager
    public void flush() {
        log.info("Stream load manager flush");
        this.savepoint = true;
        this.current = Thread.currentThread();
        while (!check()) {
            AssertNotException();
            this.lock.lock();
            try {
                this.flushable.signal();
                LockSupport.park(this.current);
                if (!this.savepoint) {
                    break;
                }
                try {
                    Iterator<TableRegion> it = this.regions.values().iterator();
                    while (it.hasNext()) {
                        Future<?> result = it.next().getResult();
                        if (result != null) {
                            result.get();
                        }
                    }
                } catch (InterruptedException | ExecutionException e) {
                    log.warn("Flush get result failed", e);
                }
            } finally {
                this.lock.unlock();
            }
        }
        this.savepoint = false;
    }

    @Override // com.starrocks.data.load.stream.StreamLoadManager
    public StreamLoadSnapshot snapshot() {
        StreamLoadSnapshot snapshot = StreamLoadSnapshot.snapshot(this.regions.values());
        Iterator<TableRegion> it = this.regions.values().iterator();
        while (it.hasNext()) {
            it.next().setLabel(null);
        }
        return snapshot;
    }

    @Override // com.starrocks.data.load.stream.StreamLoadManager
    public boolean prepare(StreamLoadSnapshot streamLoadSnapshot) {
        return this.streamLoader.prepare(streamLoadSnapshot);
    }

    @Override // com.starrocks.data.load.stream.StreamLoadManager
    public boolean commit(StreamLoadSnapshot streamLoadSnapshot) {
        return this.streamLoader.commit(streamLoadSnapshot);
    }

    @Override // com.starrocks.data.load.stream.StreamLoadManager
    public boolean abort(StreamLoadSnapshot streamLoadSnapshot) {
        return this.streamLoader.rollback(streamLoadSnapshot);
    }

    @Override // com.starrocks.data.load.stream.StreamLoadManager
    public void close() {
        if (this.state.compareAndSet(State.ACTIVE, State.INACTIVE)) {
            log.info("Stream load manger close, current bytes : {}, flush rows : {}, numberTotalRows : {}, numberLoadRows : {}, loadMetrics: {}", new Object[]{Long.valueOf(this.currentCacheBytes.get()), Long.valueOf(this.totalFlushRows.get()), Long.valueOf(this.numberTotalRows.get()), Long.valueOf(this.numberLoadRows.get()), this.loadMetrics});
            this.manager.interrupt();
            this.streamLoader.close();
        }
    }

    private boolean check() {
        return this.currentCacheBytes.compareAndSet(0L, 0L);
    }

    private void AssertNotException() {
        if (this.e != null) {
            log.error("catch exception, wait rollback ", this.e);
            this.streamLoader.rollback(snapshot());
            close();
            throw new RuntimeException(this.e);
        }
    }

    protected TableRegion getCacheRegion(String str, String str2, String str3) {
        if (str == null) {
            str = StreamLoadUtils.getTableUniqueKey(str2, str3);
        }
        TableRegion tableRegion = this.regions.get(str);
        if (tableRegion == null) {
            synchronized (this.regions) {
                tableRegion = this.regions.get(str);
                if (tableRegion == null) {
                    tableRegion = new BatchTableRegion(str, str2, str3, this, this.properties.getTableProperties(str), this.streamLoader, this.labelGeneratorFactory.create(str2, str3));
                    this.regions.put(str, tableRegion);
                    this.waitQ.offer(tableRegion);
                }
            }
        }
        return tableRegion;
    }

    static boolean useBatchTableRegion(StreamLoadDataFormat streamLoadDataFormat, boolean z, StarRocksVersion starRocksVersion) {
        if (streamLoadDataFormat instanceof StreamLoadDataFormat.CSVFormat) {
            return false;
        }
        if (z) {
            return true;
        }
        if (starRocksVersion == null) {
            return false;
        }
        return starRocksVersion.getMajor() < 2 || (starRocksVersion.getMajor() == 2 && starRocksVersion.getMinor() <= 2);
    }
}
