package org.apache.camel.component.aws2.ddbstream;

import java.util.ArrayDeque;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePropertyKey;
import org.apache.camel.Processor;
import org.apache.camel.support.ScheduledBatchPollingConsumer;
import org.apache.camel.util.CastUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.dynamodb.model.ExpiredIteratorException;
import software.amazon.awssdk.services.dynamodb.model.GetRecordsRequest;
import software.amazon.awssdk.services.dynamodb.model.GetRecordsResponse;
import software.amazon.awssdk.services.dynamodb.model.Record;

/* loaded from: input_file:org/apache/camel/component/aws2/ddbstream/Ddb2StreamConsumer.class */
public class Ddb2StreamConsumer extends ScheduledBatchPollingConsumer {
    private static final Logger LOG = LoggerFactory.getLogger(Ddb2StreamConsumer.class);
    private final ShardIteratorHandler shardIteratorHandler;
    private final Map<String, String> lastSeenSequenceNumbers;

    public Ddb2StreamConsumer(Ddb2StreamEndpoint ddb2StreamEndpoint, Processor processor) {
        this(ddb2StreamEndpoint, processor, new ShardIteratorHandler(ddb2StreamEndpoint));
    }

    Ddb2StreamConsumer(Ddb2StreamEndpoint ddb2StreamEndpoint, Processor processor, ShardIteratorHandler shardIteratorHandler) {
        super(ddb2StreamEndpoint, processor);
        this.lastSeenSequenceNumbers = new HashMap();
        this.shardIteratorHandler = shardIteratorHandler;
    }

    protected int poll() throws Exception {
        GetRecordsResponse records;
        int i = 0;
        Map<String, String> shardIterators = this.shardIteratorHandler.getShardIterators();
        forceConsumerAsReady();
        for (Map.Entry<String, String> entry : shardIterators.entrySet()) {
            int max = Math.max(1, m13getEndpoint().getConfiguration().getMaxResultsPerRequest() / shardIterators.size());
            String key = entry.getKey();
            try {
                records = m13getEndpoint().getClient().getRecords((GetRecordsRequest) GetRecordsRequest.builder().shardIterator(entry.getValue()).limit(Integer.valueOf(max)).build());
            } catch (ExpiredIteratorException e) {
                String str = this.lastSeenSequenceNumbers.get(key);
                LOG.warn("Expired Shard Iterator, attempting to resume from {}", str, e);
                records = m13getEndpoint().getClient().getRecords((GetRecordsRequest) GetRecordsRequest.builder().shardIterator(this.shardIteratorHandler.requestFreshShardIterator(key, str)).limit(Integer.valueOf(max)).build());
            }
            List records2 = records.records();
            ArrayDeque arrayDeque = new ArrayDeque();
            Iterator it = records2.iterator();
            while (it.hasNext()) {
                arrayDeque.add(createExchange((Record) it.next()));
            }
            i += processBatch(CastUtils.cast(arrayDeque));
            this.shardIteratorHandler.updateShardIterator(key, records.nextShardIterator());
            if (!records2.isEmpty()) {
                this.lastSeenSequenceNumbers.put(key, ((Record) records2.get(records2.size() - 1)).dynamodb().sequenceNumber());
            }
        }
        return i;
    }

    public int processBatch(Queue<Object> queue) throws Exception {
        int size = queue.size();
        int i = 0;
        int i2 = 0;
        while (i2 < size && isBatchAllowed()) {
            Exchange exchange = (Exchange) queue.poll();
            exchange.setProperty(ExchangePropertyKey.BATCH_INDEX, Integer.valueOf(i2));
            exchange.setProperty(ExchangePropertyKey.BATCH_SIZE, Integer.valueOf(size));
            exchange.setProperty(ExchangePropertyKey.BATCH_COMPLETE, Boolean.valueOf(i2 == size - 1));
            this.pendingExchanges = (size - i2) - 1;
            getAsyncProcessor().process(exchange, defaultConsumerCallback(exchange, true));
            i++;
            i2++;
        }
        return i;
    }

    protected Exchange createExchange(Record record) {
        Exchange createExchange = createExchange(true);
        createExchange.getIn().setBody(record, Record.class);
        return createExchange;
    }

    /* renamed from: getEndpoint, reason: merged with bridge method [inline-methods] */
    public Ddb2StreamEndpoint m13getEndpoint() {
        return super.getEndpoint();
    }
}
