package com.xiaomi.infra.galaxy.talos.consumer;

import com.xiaomi.infra.galaxy.talos.thrift.CheckPoint;
import com.xiaomi.infra.galaxy.talos.thrift.MessageAndOffset;
import com.xiaomi.infra.galaxy.talos.thrift.QueryOffsetRequest;
import com.xiaomi.infra.galaxy.talos.thrift.UpdateOffsetRequest;
import java.util.List;
import libthrift091.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/xiaomi/infra/galaxy/talos/consumer/TalosMessageReader.class */
public class TalosMessageReader extends MessageReader implements MessageCheckpointer {
    private static final Logger LOG = LoggerFactory.getLogger(TalosMessageReader.class);

    public TalosMessageReader(TalosConsumerConfig talosConsumerConfig) {
        super(talosConsumerConfig);
    }

    @Override // com.xiaomi.infra.galaxy.talos.consumer.MessageReader
    public void initStartOffset() throws Exception {
        long queryStartOffset = queryStartOffset();
        if (queryStartOffset == -1 || this.consumerConfig.isResetOffsetWhenStart()) {
            this.startOffset.set(this.consumerConfig.getResetOffsetValueWhenStart());
        } else {
            this.startOffset.set(queryStartOffset);
        }
        if (this.startOffset.longValue() > 0) {
            long j = this.startOffset.get() - 1;
            this.finishedOffset = j;
            this.lastCommitOffset = j;
        }
        LOG.info("Init startOffset: " + this.startOffset + " lastCommitOffset: " + this.lastCommitOffset + " for partition: " + this.topicAndPartition);
        this.messageProcessor.init(this.topicAndPartition, this.startOffset.get());
    }

    @Override // com.xiaomi.infra.galaxy.talos.consumer.MessageReader
    public void commitCheckPoint() throws Exception {
        innerCheckpoint();
        this.messageProcessor.shutdown(this);
    }

    @Override // com.xiaomi.infra.galaxy.talos.consumer.MessageReader
    public void fetchData() {
        if (System.currentTimeMillis() - this.lastFetchTime < this.fetchInterval) {
            try {
                Thread.sleep((this.lastFetchTime + this.fetchInterval) - System.currentTimeMillis());
            } catch (InterruptedException e) {
            }
        }
        try {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Reading message from offset: " + this.startOffset.get() + " of partition: " + this.topicAndPartition.getPartitionId());
            }
            List<MessageAndOffset> fetchMessage = this.simpleConsumer.fetchMessage(this.startOffset.get());
            this.lastFetchTime = System.currentTimeMillis();
            if (fetchMessage == null || fetchMessage.size() == 0) {
                return;
            }
            this.finishedOffset = fetchMessage.get(fetchMessage.size() - 1).getMessageOffset();
            this.messageProcessor.process(fetchMessage, this);
            this.startOffset.set(this.finishedOffset + 1);
            if (shoudCommit()) {
                try {
                    innerCheckpoint();
                } catch (TException e2) {
                    LOG.error("commit offset error: " + e2.toString() + " skip to it");
                }
            }
        } catch (Throwable th) {
            LOG.error("Error: " + th.toString() + " when getting messages from topic: " + this.topicAndPartition.getTopicTalosResourceName() + " partition: " + this.topicAndPartition.getPartitionId());
            processFetchException(th);
            this.lastFetchTime = System.currentTimeMillis();
        }
    }

    private long queryStartOffset() throws TException {
        long msgOffset = this.consumerClient.queryOffset(new QueryOffsetRequest(this.consumerGroup, this.topicAndPartition)).getMsgOffset();
        if (msgOffset == -1) {
            return -1L;
        }
        return msgOffset + 1;
    }

    private void innerCheckpoint() throws TException {
        if (this.consumerConfig.isCheckpointAutoCommit()) {
            commitOffset(this.finishedOffset);
        }
    }

    @Override // com.xiaomi.infra.galaxy.talos.consumer.MessageCheckpointer
    public boolean checkpoint() {
        return checkpoint(this.finishedOffset);
    }

    @Override // com.xiaomi.infra.galaxy.talos.consumer.MessageCheckpointer
    public boolean checkpoint(long j) {
        LOG.info("start checkpoint: " + j);
        if (this.consumerConfig.isCheckpointAutoCommit()) {
            LOG.info("You can not checkpoint through MessageCheckpointer when you set \"galaxy.talos.consumer.checkpoint.message.offset\" as \"true\"");
            return false;
        }
        if (j <= this.lastCommitOffset || j > this.finishedOffset) {
            LOG.info("checkpoint messageOffset: " + j + " in wrong range, lastCheckpoint messageOffset: " + this.lastCommitOffset + ", last deliver messageOffset: " + this.finishedOffset);
            return false;
        }
        try {
            commitOffset(j);
            return true;
        } catch (TException e) {
            return false;
        }
    }

    private void commitOffset(long j) throws TException {
        CheckPoint checkPoint = new CheckPoint(this.consumerGroup, this.topicAndPartition, j, this.workerId);
        if (this.lastCommitOffset != -1 && this.consumerConfig.isCheckLastCommitOffset()) {
            checkPoint.setLastCommitOffset(this.lastCommitOffset);
        }
        if (!this.consumerClient.updateOffset(new UpdateOffsetRequest(checkPoint)).isSuccess()) {
            LOG.error("Worker: " + this.workerId + " commit offset: " + this.lastCommitOffset + " for partition: " + this.topicAndPartition.getPartitionId() + " failed");
            return;
        }
        this.lastCommitOffset = j;
        this.lastCommitTime = System.currentTimeMillis();
        LOG.info("Worker: " + this.workerId + " commit offset: " + this.lastCommitOffset + " for partition: " + this.topicAndPartition.getPartitionId());
    }
}
