package org.apache.flink.connector.kinesis.source.reader;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Collections;
import java.util.Deque;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
import org.apache.flink.connector.kinesis.source.metrics.KinesisShardMetrics;
import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit;
import org.apache.flink.connector.kinesis.source.split.KinesisShardSplitState;
import org.apache.flink.connector.kinesis.source.split.StartingPosition;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.Record;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/connector/kinesis/source/reader/KinesisShardSplitReaderBase.class */
public abstract class KinesisShardSplitReaderBase implements SplitReader<Record, KinesisShardSplit> {
    private static final Logger LOG = LoggerFactory.getLogger(KinesisShardSplitReaderBase.class);
    private static final RecordsWithSplitIds<Record> INCOMPLETE_SHARD_EMPTY_RECORDS = new KinesisRecordsWithSplitIds(Collections.emptyIterator(), null, false);
    private final Deque<KinesisShardSplitState> assignedSplits = new ArrayDeque();
    private final Set<String> pausedSplitIds = new HashSet();
    private final Map<String, KinesisShardMetrics> shardMetricGroupMap;

    @Internal
    /* loaded from: input_file:org/apache/flink/connector/kinesis/source/reader/KinesisShardSplitReaderBase$KinesisRecordsWithSplitIds.class */
    private static class KinesisRecordsWithSplitIds implements RecordsWithSplitIds<Record> {
        private final Iterator<Record> recordsIterator;
        private final String splitId;
        private final boolean isComplete;

        public KinesisRecordsWithSplitIds(Iterator<Record> it, String str, boolean z) {
            this.recordsIterator = it;
            this.splitId = str;
            this.isComplete = z;
        }

        @Nullable
        public String nextSplit() {
            if (this.recordsIterator.hasNext()) {
                return this.splitId;
            }
            return null;
        }

        @Nullable
        /* renamed from: nextRecordFromSplit, reason: merged with bridge method [inline-methods] */
        public Record m21nextRecordFromSplit() {
            if (this.recordsIterator.hasNext()) {
                return this.recordsIterator.next();
            }
            return null;
        }

        public Set<String> finishedSplits() {
            if (this.splitId != null && !this.recordsIterator.hasNext() && this.isComplete) {
                return Collections.singleton(this.splitId);
            }
            return Collections.emptySet();
        }
    }

    @Internal
    /* loaded from: input_file:org/apache/flink/connector/kinesis/source/reader/KinesisShardSplitReaderBase$RecordBatch.class */
    protected static class RecordBatch {
        private final List<Record> records;
        private final long millisBehindLatest;
        private final boolean completed;

        public RecordBatch(List<Record> list, long j, boolean z) {
            this.records = list;
            this.millisBehindLatest = j;
            this.completed = z;
        }

        public List<Record> getRecords() {
            return this.records;
        }

        public long getMillisBehindLatest() {
            return this.millisBehindLatest;
        }

        public boolean isCompleted() {
            return this.completed;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public KinesisShardSplitReaderBase(Map<String, KinesisShardMetrics> map) {
        this.shardMetricGroupMap = map;
    }

    public RecordsWithSplitIds<Record> fetch() throws IOException {
        KinesisShardSplitState poll = this.assignedSplits.poll();
        if (poll == null) {
            return INCOMPLETE_SHARD_EMPTY_RECORDS;
        }
        if (this.pausedSplitIds.contains(poll.getSplitId())) {
            this.assignedSplits.add(poll);
            return INCOMPLETE_SHARD_EMPTY_RECORDS;
        }
        try {
            RecordBatch fetchRecords = fetchRecords(poll);
            if (fetchRecords == null) {
                this.assignedSplits.add(poll);
                return INCOMPLETE_SHARD_EMPTY_RECORDS;
            }
            if (!fetchRecords.isCompleted()) {
                this.assignedSplits.add(poll);
            }
            this.shardMetricGroupMap.get(poll.getShardId()).setMillisBehindLatest(fetchRecords.getMillisBehindLatest());
            if (fetchRecords.getRecords().isEmpty()) {
                return fetchRecords.isCompleted() ? new KinesisRecordsWithSplitIds(Collections.emptyIterator(), poll.getSplitId(), true) : INCOMPLETE_SHARD_EMPTY_RECORDS;
            }
            poll.setNextStartingPosition(StartingPosition.continueFromSequenceNumber(fetchRecords.getRecords().get(fetchRecords.getRecords().size() - 1).sequenceNumber()));
            return new KinesisRecordsWithSplitIds(fetchRecords.getRecords().iterator(), poll.getSplitId(), fetchRecords.isCompleted());
        } catch (ResourceNotFoundException e) {
            LOG.warn("Failed to fetch records from shard {}: shard no longer exists. Marking split as complete", poll.getSplitId());
            return new KinesisRecordsWithSplitIds(Collections.emptyIterator(), poll.getSplitId(), true);
        }
    }

    protected abstract RecordBatch fetchRecords(KinesisShardSplitState kinesisShardSplitState);

    public void handleSplitsChanges(SplitsChange<KinesisShardSplit> splitsChange) {
        Iterator it = splitsChange.splits().iterator();
        while (it.hasNext()) {
            this.assignedSplits.add(new KinesisShardSplitState((KinesisShardSplit) it.next()));
        }
    }

    public void wakeUp() {
    }

    public void pauseOrResumeSplits(Collection<KinesisShardSplit> collection, Collection<KinesisShardSplit> collection2) {
        collection.forEach(kinesisShardSplit -> {
            this.pausedSplitIds.add(kinesisShardSplit.splitId());
        });
        collection2.forEach(kinesisShardSplit2 -> {
            this.pausedSplitIds.remove(kinesisShardSplit2.splitId());
        });
    }
}
