package org.apache.flink.connector.kinesis.source;

import java.time.Duration;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import org.apache.flink.connector.kinesis.sink.KinesisStreamsConfigConstants;
import org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions;
import org.apache.flink.connector.kinesis.source.enumerator.KinesisShardAssigner;
import org.apache.flink.connector.kinesis.source.enumerator.KinesisStreamsSourceEnumerator;
import org.apache.flink.connector.kinesis.source.enumerator.KinesisStreamsSourceEnumeratorState;
import org.apache.flink.connector.kinesis.source.enumerator.KinesisStreamsSourceEnumeratorStateSerializer;
import org.apache.flink.connector.kinesis.source.exception.KinesisStreamsSourceException;
import org.apache.flink.connector.kinesis.source.metrics.KinesisShardMetrics;
import org.apache.flink.connector.kinesis.source.proxy.KinesisAsyncStreamProxy;
import org.apache.flink.connector.kinesis.source.proxy.KinesisStreamProxy;
import org.apache.flink.connector.kinesis.source.reader.KinesisStreamsRecordEmitter;
import org.apache.flink.connector.kinesis.source.reader.KinesisStreamsSourceReader;
import org.apache.flink.connector.kinesis.source.reader.fanout.FanOutKinesisShardSplitReader;
import org.apache.flink.connector.kinesis.source.reader.fanout.StreamConsumerRegistrar;
import org.apache.flink.connector.kinesis.source.reader.polling.PollingKinesisShardSplitReader;
import org.apache.flink.connector.kinesis.source.serialization.KinesisDeserializationSchema;
import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit;
import org.apache.flink.connector.kinesis.source.split.KinesisShardSplitSerializer;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.kinesis.shaded.org.apache.flink.connector.aws.config.AWSConfigOptions;
import org.apache.flink.kinesis.shaded.org.apache.flink.connector.aws.util.AWSClientUtil;
import org.apache.flink.kinesis.shaded.org.apache.flink.connector.aws.util.AWSGeneralUtil;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.retry.SdkDefaultRetryStrategy;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.SdkHttpClient;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.ApacheHttpClient;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.async.SdkAsyncHttpClient;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.retries.StandardRetryStrategy;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.retries.api.BackoffStrategy;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.retries.api.RetryStrategy;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.KinesisClient;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.LimitExceededException;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.Record;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.utils.AttributeMap;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.UserCodeClassLoader;

@Experimental
/* loaded from: input_file:org/apache/flink/connector/kinesis/source/KinesisStreamsSource.class */
public class KinesisStreamsSource<T> implements Source<T, KinesisShardSplit, KinesisStreamsSourceEnumeratorState> {
    private final String streamArn;
    private final Configuration sourceConfig;
    private final KinesisDeserializationSchema<T> deserializationSchema;
    private final KinesisShardAssigner kinesisShardAssigner;
    private final boolean preserveShardOrder;

    /* JADX INFO: Access modifiers changed from: package-private */
    public KinesisStreamsSource(String str, Configuration configuration, KinesisDeserializationSchema<T> kinesisDeserializationSchema, KinesisShardAssigner kinesisShardAssigner, boolean z) {
        Preconditions.checkNotNull(str, "No stream ARN was supplied to the KinesisStreamsSource.");
        Preconditions.checkArgument(!str.isEmpty(), "stream ARN cannot be empty string");
        Preconditions.checkNotNull(configuration, "No source config was supplied to the KinesisStreamsSource.");
        Preconditions.checkNotNull(kinesisDeserializationSchema, "No KinesisDeserializationSchema was supplied to the KinesisStreamsSource.");
        Preconditions.checkNotNull(kinesisShardAssigner, "No KinesisShardAssigner was supplied to the KinesisStreamsSource.");
        this.streamArn = str;
        this.sourceConfig = configuration;
        this.deserializationSchema = kinesisDeserializationSchema;
        this.kinesisShardAssigner = kinesisShardAssigner;
        this.preserveShardOrder = z;
    }

    public static <T> KinesisStreamsSourceBuilder<T> builder() {
        return new KinesisStreamsSourceBuilder<>();
    }

    public Boundedness getBoundedness() {
        return Boundedness.CONTINUOUS_UNBOUNDED;
    }

    public SourceReader<T, KinesisShardSplit> createReader(SourceReaderContext sourceReaderContext) throws Exception {
        setUpDeserializationSchema(sourceReaderContext);
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        return new KinesisStreamsSourceReader(new SingleThreadFetcherManager(getKinesisShardSplitReaderSupplier(this.sourceConfig, concurrentHashMap)), new KinesisStreamsRecordEmitter(this.deserializationSchema), this.sourceConfig, sourceReaderContext, concurrentHashMap);
    }

    public SplitEnumerator<KinesisShardSplit, KinesisStreamsSourceEnumeratorState> createEnumerator(SplitEnumeratorContext<KinesisShardSplit> splitEnumeratorContext) throws Exception {
        return restoreEnumerator(splitEnumeratorContext, (KinesisStreamsSourceEnumeratorState) null);
    }

    public SplitEnumerator<KinesisShardSplit, KinesisStreamsSourceEnumeratorState> restoreEnumerator(SplitEnumeratorContext<KinesisShardSplit> splitEnumeratorContext, KinesisStreamsSourceEnumeratorState kinesisStreamsSourceEnumeratorState) throws Exception {
        KinesisStreamProxy createKinesisStreamProxy = createKinesisStreamProxy(this.sourceConfig);
        return new KinesisStreamsSourceEnumerator(splitEnumeratorContext, this.streamArn, this.sourceConfig, createKinesisStreamProxy, this.kinesisShardAssigner, kinesisStreamsSourceEnumeratorState, this.preserveShardOrder, new StreamConsumerRegistrar(this.sourceConfig, this.streamArn, createKinesisStreamProxy));
    }

    public SimpleVersionedSerializer<KinesisShardSplit> getSplitSerializer() {
        return new KinesisShardSplitSerializer();
    }

    public SimpleVersionedSerializer<KinesisStreamsSourceEnumeratorState> getEnumeratorCheckpointSerializer() {
        return new KinesisStreamsSourceEnumeratorStateSerializer(new KinesisShardSplitSerializer());
    }

    private Supplier<SplitReader<Record, KinesisShardSplit>> getKinesisShardSplitReaderSupplier(Configuration configuration, Map<String, KinesisShardMetrics> map) {
        KinesisSourceConfigOptions.ReaderType readerType = (KinesisSourceConfigOptions.ReaderType) configuration.get(KinesisSourceConfigOptions.READER_TYPE);
        switch (readerType) {
            case POLLING:
                return () -> {
                    return new PollingKinesisShardSplitReader(createKinesisStreamProxy(configuration), map, configuration);
                };
            case EFO:
                String consumerArn = getConsumerArn(this.streamArn, (String) configuration.get(KinesisSourceConfigOptions.EFO_CONSUMER_NAME));
                return () -> {
                    return new FanOutKinesisShardSplitReader(createKinesisAsyncStreamProxy(this.streamArn, configuration), consumerArn, map, (Duration) configuration.get(KinesisSourceConfigOptions.EFO_CONSUMER_SUBSCRIPTION_TIMEOUT));
                };
            default:
                throw new IllegalArgumentException("Unsupported reader type: " + readerType);
        }
    }

    private String getConsumerArn(String str, String str2) {
        StandardRetryStrategy.Builder createExpBackoffRetryStrategyBuilder = createExpBackoffRetryStrategyBuilder((Duration) this.sourceConfig.get(KinesisSourceConfigOptions.EFO_DESCRIBE_CONSUMER_RETRY_STRATEGY_MIN_DELAY_OPTION), (Duration) this.sourceConfig.get(KinesisSourceConfigOptions.EFO_DESCRIBE_CONSUMER_RETRY_STRATEGY_MAX_DELAY_OPTION), ((Integer) this.sourceConfig.get(KinesisSourceConfigOptions.EFO_DESCRIBE_CONSUMER_RETRY_STRATEGY_MAX_ATTEMPTS_OPTION)).intValue());
        createExpBackoffRetryStrategyBuilder.retryOnExceptionOrCauseInstanceOf(ResourceNotFoundException.class);
        createExpBackoffRetryStrategyBuilder.retryOnExceptionOrCauseInstanceOf(LimitExceededException.class);
        try {
            KinesisStreamProxy createKinesisStreamProxy = createKinesisStreamProxy(this.sourceConfig, createExpBackoffRetryStrategyBuilder.build());
            try {
                String consumerARN = createKinesisStreamProxy.describeStreamConsumer(str, str2).consumerDescription().consumerARN();
                if (createKinesisStreamProxy != null) {
                    createKinesisStreamProxy.close();
                }
                return consumerARN;
            } finally {
            }
        } catch (Exception e) {
            throw new KinesisStreamsSourceException("Unable to lookup consumer ARN for stream " + str + " and consumer " + str2, e);
        }
    }

    private KinesisAsyncStreamProxy createKinesisAsyncStreamProxy(String str, Configuration configuration) {
        String orElseThrow = AWSGeneralUtil.getRegionFromArn(str).orElseThrow(() -> {
            return new IllegalStateException("Unable to determine region from stream arn");
        });
        Properties properties = new Properties();
        configuration.addAllToProperties(properties);
        properties.put("aws.region", orElseThrow);
        SdkAsyncHttpClient createAsyncHttpClient = AWSGeneralUtil.createAsyncHttpClient(AttributeMap.builder().mo3388build(), NettyNioAsyncHttpClient.builder());
        return new KinesisAsyncStreamProxy((KinesisAsyncClient) AWSClientUtil.createAwsAsyncClient(properties, createAsyncHttpClient, KinesisAsyncClient.builder(), KinesisStreamsConfigConstants.BASE_KINESIS_USER_AGENT_PREFIX_FORMAT, KinesisStreamsConfigConstants.KINESIS_CLIENT_USER_AGENT_PREFIX), createAsyncHttpClient);
    }

    private KinesisStreamProxy createKinesisStreamProxy(Configuration configuration) {
        return createKinesisStreamProxy(configuration, createExpBackoffRetryStrategy((Duration) this.sourceConfig.get(AWSConfigOptions.RETRY_STRATEGY_MIN_DELAY_OPTION), (Duration) this.sourceConfig.get(AWSConfigOptions.RETRY_STRATEGY_MAX_DELAY_OPTION), ((Integer) this.sourceConfig.get(AWSConfigOptions.RETRY_STRATEGY_MAX_ATTEMPTS_OPTION)).intValue()));
    }

    private KinesisStreamProxy createKinesisStreamProxy(Configuration configuration, RetryStrategy retryStrategy) {
        String orElseThrow = AWSGeneralUtil.getRegionFromArn(this.streamArn).orElseThrow(() -> {
            return new IllegalStateException("Unable to determine region from stream arn");
        });
        Properties properties = new Properties();
        configuration.addAllToProperties(properties);
        properties.put("aws.region", orElseThrow);
        ClientOverrideConfiguration.Builder retryStrategy2 = ClientOverrideConfiguration.builder().retryStrategy(retryStrategy);
        SdkHttpClient createSyncHttpClient = AWSGeneralUtil.createSyncHttpClient(properties, ApacheHttpClient.builder());
        AWSGeneralUtil.validateAwsCredentials(properties);
        return new KinesisStreamProxy((KinesisClient) AWSClientUtil.createAwsSyncClient(properties, createSyncHttpClient, KinesisClient.builder(), retryStrategy2, KinesisStreamsConfigConstants.BASE_KINESIS_USER_AGENT_PREFIX_FORMAT, KinesisStreamsConfigConstants.KINESIS_CLIENT_USER_AGENT_PREFIX), createSyncHttpClient);
    }

    private void setUpDeserializationSchema(final SourceReaderContext sourceReaderContext) throws Exception {
        this.deserializationSchema.open(new DeserializationSchema.InitializationContext() { // from class: org.apache.flink.connector.kinesis.source.KinesisStreamsSource.1
            public MetricGroup getMetricGroup() {
                return sourceReaderContext.metricGroup().addGroup("deserializer");
            }

            public UserCodeClassLoader getUserCodeClassLoader() {
                return sourceReaderContext.getUserCodeClassLoader();
            }
        });
    }

    private RetryStrategy createExpBackoffRetryStrategy(Duration duration, Duration duration2, int i) {
        return createExpBackoffRetryStrategyBuilder(duration, duration2, i).build();
    }

    private StandardRetryStrategy.Builder createExpBackoffRetryStrategyBuilder(Duration duration, Duration duration2, int i) {
        BackoffStrategy exponentialDelayHalfJitter = BackoffStrategy.exponentialDelayHalfJitter(duration, duration2);
        return SdkDefaultRetryStrategy.standardRetryStrategyBuilder().backoffStrategy2(exponentialDelayHalfJitter).throttlingBackoffStrategy2(exponentialDelayHalfJitter).circuitBreakerEnabled(false).retryOnExceptionOrCauseInstanceOf(LimitExceededException.class).maxAttempts2(i);
    }

    public /* bridge */ /* synthetic */ SplitEnumerator restoreEnumerator(SplitEnumeratorContext splitEnumeratorContext, Object obj) throws Exception {
        return restoreEnumerator((SplitEnumeratorContext<KinesisShardSplit>) splitEnumeratorContext, (KinesisStreamsSourceEnumeratorState) obj);
    }
}
