package org.apache.flink.connector.kinesis.source.reader.fanout;

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import org.apache.flink.annotation.Internal;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
import org.apache.flink.connector.kinesis.source.metrics.KinesisShardMetrics;
import org.apache.flink.connector.kinesis.source.proxy.AsyncStreamProxy;
import org.apache.flink.connector.kinesis.source.reader.KinesisShardSplitReaderBase;
import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit;
import org.apache.flink.connector.kinesis.source.split.KinesisShardSplitState;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent;

@Internal
/* loaded from: input_file:org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSplitReader.class */
public class FanOutKinesisShardSplitReader extends KinesisShardSplitReaderBase {
    private final AsyncStreamProxy asyncStreamProxy;
    private final String consumerArn;
    private final Duration subscriptionTimeout;
    private final Map<String, FanOutKinesisShardSubscription> splitSubscriptions;

    public FanOutKinesisShardSplitReader(AsyncStreamProxy asyncStreamProxy, String str, Map<String, KinesisShardMetrics> map, Duration duration) {
        super(map);
        this.splitSubscriptions = new HashMap();
        this.asyncStreamProxy = asyncStreamProxy;
        this.consumerArn = str;
        this.subscriptionTimeout = duration;
    }

    @Override // org.apache.flink.connector.kinesis.source.reader.KinesisShardSplitReaderBase
    protected KinesisShardSplitReaderBase.RecordBatch fetchRecords(KinesisShardSplitState kinesisShardSplitState) {
        SubscribeToShardEvent nextEvent = this.splitSubscriptions.get(kinesisShardSplitState.getShardId()).nextEvent();
        if (nextEvent == null) {
            return null;
        }
        boolean z = nextEvent.continuationSequenceNumber() == null;
        if (z) {
            this.splitSubscriptions.remove(kinesisShardSplitState.getShardId());
        }
        return new KinesisShardSplitReaderBase.RecordBatch(nextEvent.records(), nextEvent.millisBehindLatest().longValue(), z);
    }

    @Override // org.apache.flink.connector.kinesis.source.reader.KinesisShardSplitReaderBase
    public void handleSplitsChanges(SplitsChange<KinesisShardSplit> splitsChange) {
        super.handleSplitsChanges(splitsChange);
        for (KinesisShardSplit kinesisShardSplit : splitsChange.splits()) {
            FanOutKinesisShardSubscription fanOutKinesisShardSubscription = new FanOutKinesisShardSubscription(this.asyncStreamProxy, this.consumerArn, kinesisShardSplit.getShardId(), kinesisShardSplit.getStartingPosition(), this.subscriptionTimeout);
            fanOutKinesisShardSubscription.activateSubscription();
            this.splitSubscriptions.put(kinesisShardSplit.splitId(), fanOutKinesisShardSubscription);
        }
    }

    public void close() throws Exception {
        this.asyncStreamProxy.close();
    }
}
