package org.apache.flink.connector.kinesis.source.reader;

import java.util.HashSet;
import java.util.List;
import java.util.Map;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.RecordEmitter;
import org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase;
import org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager;
import org.apache.flink.connector.kinesis.source.event.SplitsFinishedEvent;
import org.apache.flink.connector.kinesis.source.metrics.KinesisShardMetrics;
import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit;
import org.apache.flink.connector.kinesis.source.split.KinesisShardSplitState;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.Record;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/connector/kinesis/source/reader/KinesisStreamsSourceReader.class */
public class KinesisStreamsSourceReader<T> extends SingleThreadMultiplexSourceReaderBase<Record, T, KinesisShardSplit, KinesisShardSplitState> {
    private static final Logger LOG = LoggerFactory.getLogger(KinesisStreamsSourceReader.class);
    private final Map<String, KinesisShardMetrics> shardMetricGroupMap;

    public KinesisStreamsSourceReader(SingleThreadFetcherManager<Record, KinesisShardSplit> singleThreadFetcherManager, RecordEmitter<Record, T, KinesisShardSplitState> recordEmitter, Configuration configuration, SourceReaderContext sourceReaderContext, Map<String, KinesisShardMetrics> map) {
        super(singleThreadFetcherManager, recordEmitter, configuration, sourceReaderContext);
        this.shardMetricGroupMap = map;
    }

    protected void onSplitFinished(Map<String, KinesisShardSplitState> map) {
        this.context.sendSourceEventToCoordinator(new SplitsFinishedEvent(new HashSet(map.keySet())));
        map.keySet().forEach(this::unregisterShardMetricGroup);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public KinesisShardSplitState initializedState(KinesisShardSplit kinesisShardSplit) {
        return new KinesisShardSplitState(kinesisShardSplit);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public KinesisShardSplit toSplitType(String str, KinesisShardSplitState kinesisShardSplitState) {
        return kinesisShardSplitState.getKinesisShardSplit();
    }

    public void addSplits(List<KinesisShardSplit> list) {
        list.forEach(this::registerShardMetricGroup);
        super.addSplits(list);
    }

    public void close() throws Exception {
        super.close();
        this.shardMetricGroupMap.keySet().forEach(this::unregisterShardMetricGroup);
    }

    private void registerShardMetricGroup(KinesisShardSplit kinesisShardSplit) {
        if (this.shardMetricGroupMap.containsKey(kinesisShardSplit.getShardId())) {
            LOG.warn("Metric group for shard with id {} has already been registered.", kinesisShardSplit.getShardId());
        } else {
            this.shardMetricGroupMap.put(kinesisShardSplit.getShardId(), new KinesisShardMetrics(kinesisShardSplit, this.context.metricGroup()));
        }
    }

    private void unregisterShardMetricGroup(String str) {
        KinesisShardMetrics remove = this.shardMetricGroupMap.remove(str);
        if (remove != null) {
            remove.unregister();
        } else {
            LOG.warn("Shard metric group unregister failed. Metric group for {} does not exist.", str);
        }
    }
}
