package com.starrocks.connector.flink.manager;

import com.starrocks.data.load.stream.StreamLoadDataFormat;
import com.starrocks.data.load.stream.StreamLoadManager;
import com.starrocks.data.load.stream.StreamLoadResponse;
import com.starrocks.data.load.stream.StreamLoadSnapshot;
import com.starrocks.data.load.stream.StreamLoader;
import com.starrocks.data.load.stream.TableRegion;
import com.starrocks.data.load.stream.exception.StreamLoadFailException;
import com.starrocks.data.load.stream.http.StreamLoadEntityMeta;
import com.starrocks.data.load.stream.properties.StreamLoadTableProperties;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/starrocks/connector/flink/manager/TransactionTableRegion.class */
public class TransactionTableRegion implements TableRegion {
    private static final Logger LOG = LoggerFactory.getLogger(TransactionTableRegion.class);
    private final StreamLoadManager manager;
    private final StreamLoader streamLoader;
    private final String uniqueKey;
    private final String database;
    private final String table;
    private final StreamLoadTableProperties properties;
    private final StreamLoadDataFormat dataFormat;
    private volatile Queue<byte[]> inBuffer;
    private volatile StreamLoadEntityMeta entityMeta;
    private volatile String label;
    private volatile Future<?> responseFuture;
    private volatile boolean flushing;
    private final AtomicLong age = new AtomicLong(0);
    private final AtomicLong cacheBytes = new AtomicLong();
    private final AtomicLong flushBytes = new AtomicLong();
    private final AtomicLong flushRows = new AtomicLong();
    private final AtomicBoolean ctl = new AtomicBoolean(false);
    private volatile Queue<byte[]> outBuffer = new LinkedList();
    private volatile long lastWriteTimeMillis = Long.MAX_VALUE;
    private final AtomicReference<State> state = new AtomicReference<>(State.ACTIVE);
    private volatile long lastCommitTimeMills = System.currentTimeMillis();

    /* loaded from: input_file:com/starrocks/connector/flink/manager/TransactionTableRegion$State.class */
    enum State {
        ACTIVE,
        FLUSHING,
        COMMITTING
    }

    public TransactionTableRegion(String str, String str2, String str3, StreamLoadManager streamLoadManager, StreamLoadTableProperties streamLoadTableProperties, StreamLoader streamLoader) {
        this.uniqueKey = str;
        this.database = str2;
        this.table = str3;
        this.manager = streamLoadManager;
        this.properties = streamLoadTableProperties;
        this.dataFormat = streamLoadTableProperties.getDataFormat();
        this.streamLoader = streamLoader;
    }

    @Override // com.starrocks.data.load.stream.TableRegion
    public StreamLoadTableProperties getProperties() {
        return this.properties;
    }

    @Override // com.starrocks.data.load.stream.TableRegion
    public String getUniqueKey() {
        return this.uniqueKey;
    }

    @Override // com.starrocks.data.load.stream.TableRegion
    public String getDatabase() {
        return this.database;
    }

    @Override // com.starrocks.data.load.stream.TableRegion
    public String getTable() {
        return this.table;
    }

    @Override // com.starrocks.data.load.stream.TableRegion
    public void setLabel(String str) {
        this.label = str;
    }

    @Override // com.starrocks.data.load.stream.TableRegion
    public String getLabel() {
        return this.label;
    }

    @Override // com.starrocks.data.load.stream.TableRegion
    public long getCacheBytes() {
        return this.cacheBytes.get();
    }

    @Override // com.starrocks.data.load.stream.TableRegion
    public long getFlushBytes() {
        return this.flushBytes.get();
    }

    @Override // com.starrocks.data.load.stream.TableRegion
    public StreamLoadEntityMeta getEntityMeta() {
        return this.entityMeta;
    }

    @Override // com.starrocks.data.load.stream.TableRegion
    public long getLastWriteTimeMillis() {
        return this.lastWriteTimeMillis;
    }

    @Override // com.starrocks.data.load.stream.TableRegion
    public void resetAge() {
        this.age.set(0L);
    }

    @Override // com.starrocks.data.load.stream.TableRegion
    public long getAndIncrementAge() {
        return this.age.getAndIncrement();
    }

    @Override // com.starrocks.data.load.stream.TableRegion
    public long getAge() {
        return this.age.get();
    }

    @Override // com.starrocks.data.load.stream.TableRegion
    public int write(byte[] bArr) {
        int write0;
        if (bArr == null) {
            return 0;
        }
        if (this.ctl.compareAndSet(false, true)) {
            write0 = write0(bArr);
            this.ctl.set(false);
            return write0;
        }
        do {
        } while (!this.ctl.compareAndSet(false, true));
        write0 = write0(bArr);
        this.ctl.set(false);
        return write0;
    }

    protected int write0(byte[] bArr) {
        if (this.outBuffer == null) {
            this.outBuffer = new LinkedList();
        }
        this.outBuffer.offer(bArr);
        this.cacheBytes.addAndGet(bArr.length);
        this.lastWriteTimeMillis = System.currentTimeMillis();
        return bArr.length;
    }

    @Override // com.starrocks.data.load.stream.TableRegion
    public byte[] read() {
        if (this.flushRows.get() == this.entityMeta.getRows()) {
            this.flushing = false;
            return null;
        }
        byte[] poll = this.inBuffer.poll();
        if (poll == null) {
            this.flushing = false;
            return null;
        }
        if (!this.flushing) {
            this.flushing = true;
        }
        this.cacheBytes.addAndGet(-poll.length);
        this.flushBytes.addAndGet(poll.length);
        this.flushRows.incrementAndGet();
        return poll;
    }

    @Override // com.starrocks.data.load.stream.TableRegion
    public boolean isFlushing() {
        return this.state.get() == State.FLUSHING;
    }

    @Override // com.starrocks.data.load.stream.TableRegion
    public boolean flush() {
        if (!this.state.compareAndSet(State.ACTIVE, State.FLUSHING)) {
            return false;
        }
        do {
        } while (!this.ctl.compareAndSet(false, true));
        LOG.info("Flush uniqueKey : {}, label : {}, bytes : {}", new Object[]{this.uniqueKey, this.label, Long.valueOf(this.cacheBytes.get())});
        this.inBuffer = this.outBuffer;
        this.outBuffer = null;
        this.ctl.set(false);
        if (this.inBuffer == null || this.inBuffer.isEmpty()) {
            this.state.compareAndSet(State.FLUSHING, State.ACTIVE);
            return false;
        }
        streamLoad();
        return true;
    }

    public boolean commit() {
        boolean z;
        if (!this.state.compareAndSet(State.ACTIVE, State.COMMITTING)) {
            return false;
        }
        if (this.label != null) {
            StreamLoadSnapshot.Transaction transaction = new StreamLoadSnapshot.Transaction(this.database, this.table, this.label);
            try {
                if (!this.streamLoader.prepare(transaction)) {
                    throw new StreamLoadFailException("Failed to prepare transaction, please check taskmanager log for details, " + transaction);
                }
                if (!this.streamLoader.commit(transaction)) {
                    throw new StreamLoadFailException("Failed to commit transaction, please check taskmanager log for details, " + transaction);
                }
                this.label = null;
                long currentTimeMillis = System.currentTimeMillis();
                long j = currentTimeMillis - this.lastCommitTimeMills;
                this.lastCommitTimeMills = currentTimeMillis;
                z = true;
                LOG.info("Success to commit transaction: {}, duration: {} ms", transaction, Long.valueOf(j));
            } catch (Exception e) {
                LOG.error("TransactionTableRegion commit failed, db: {}, table: {}, label: {}", new Object[]{this.database, this.table, this.label, e});
                callback(e);
                return false;
            }
        } else {
            z = this.cacheBytes.get() == 0;
        }
        this.state.compareAndSet(State.COMMITTING, State.ACTIVE);
        return z;
    }

    @Override // com.starrocks.data.load.stream.TableRegion
    public void callback(StreamLoadResponse streamLoadResponse) {
        this.manager.callback(streamLoadResponse);
    }

    @Override // com.starrocks.data.load.stream.TableRegion
    public void callback(Throwable th) {
        this.manager.callback(th);
    }

    @Override // com.starrocks.data.load.stream.TableRegion
    public void complete(StreamLoadResponse streamLoadResponse) {
        streamLoadResponse.setFlushBytes(this.flushBytes.get());
        streamLoadResponse.setFlushRows(this.flushRows.get());
        callback(streamLoadResponse);
        LOG.info("Stream load flushed, db: {}, table: {}, label : {}", new Object[]{this.database, this.table, this.label});
        if (!this.inBuffer.isEmpty()) {
            LOG.info("Stream load continue, db: {}, table: {}, label : {}", new Object[]{this.database, this.table, this.label});
            streamLoad();
        } else if (this.state.compareAndSet(State.FLUSHING, State.ACTIVE)) {
            LOG.info("Stream load completed, db: {}, table: {}, label : {}", new Object[]{this.database, this.table, this.label});
        }
    }

    @Override // com.starrocks.data.load.stream.TableRegion
    public void setResult(Future<?> future) {
        this.responseFuture = future;
    }

    @Override // com.starrocks.data.load.stream.TableRegion
    public Future<?> getResult() {
        return this.responseFuture;
    }

    protected void flip() {
        this.flushBytes.set(0L);
        this.flushRows.set(0L);
        this.responseFuture = null;
        StreamLoadEntityMeta genEntityMeta = genEntityMeta();
        this.entityMeta = genEntityMeta;
        LOG.info("Generate entity meta, db: {}, table: {}, total rows : {}, entity rows : {}, entity bytes : {}", new Object[]{this.database, this.table, Integer.valueOf(this.inBuffer.size()), Long.valueOf(genEntityMeta.getRows()), Long.valueOf(genEntityMeta.getBytes())});
    }

    protected void streamLoad() {
        try {
            flip();
            setResult(this.streamLoader.send(this));
        } catch (Exception e) {
            callback(e);
        }
    }

    protected StreamLoadEntityMeta genEntityMeta() {
        long j = 0;
        int length = this.dataFormat.delimiter() == null ? 0 : this.dataFormat.delimiter().length;
        long length2 = this.dataFormat.first() != null ? 0 + this.dataFormat.first().length : 0L;
        if (this.dataFormat.end() != null) {
            length2 += this.dataFormat.end().length;
        }
        boolean z = true;
        for (byte[] bArr : this.inBuffer) {
            int i = z ? 0 : length;
            z = false;
            if (length2 + i + bArr.length > this.properties.getChunkLimit().longValue()) {
                break;
            }
            length2 += bArr.length + i;
            j++;
        }
        return new StreamLoadEntityMeta(length2, j);
    }

    @Override // com.starrocks.data.load.stream.TableRegion
    public boolean testPrepare() {
        throw new UnsupportedOperationException();
    }

    @Override // com.starrocks.data.load.stream.TableRegion
    public boolean prepare() {
        throw new UnsupportedOperationException();
    }

    @Override // com.starrocks.data.load.stream.TableRegion
    public boolean cancel() {
        throw new UnsupportedOperationException();
    }

    @Override // com.starrocks.data.load.stream.TableRegion
    public boolean isReadable() {
        throw new UnsupportedOperationException();
    }
}
