package org.apache.flink.streaming.connectors.kinesis.proxy;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import javax.annotation.Nullable;
import org.apache.flink.kinesis.shaded.com.amazonaws.ClientConfiguration;
import org.apache.flink.kinesis.shaded.com.amazonaws.ClientConfigurationFactory;
import org.apache.flink.kinesis.shaded.com.amazonaws.auth.AWSCredentialsProvider;
import org.apache.flink.kinesis.shaded.com.amazonaws.regions.Region;
import org.apache.flink.kinesis.shaded.com.amazonaws.regions.Regions;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.dynamodbv2.streamsadapter.AmazonDynamoDBStreamsAdapterClient;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesis;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.DescribeStreamResult;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.Shard;
import org.apache.flink.kinesis.shaded.org.apache.flink.connector.aws.config.AWSConfigConstants;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/streaming/connectors/kinesis/proxy/DynamoDBStreamsProxy.class */
public class DynamoDBStreamsProxy extends KinesisProxy {
    private static final Logger LOG = LoggerFactory.getLogger(DynamoDBStreamsProxy.class);
    private static final String USER_AGENT_FORMAT = "Apache Flink %s (%s) DynamoDB Streams Connector";

    protected DynamoDBStreamsProxy(Properties properties) {
        super(properties);
    }

    public static KinesisProxyInterface create(Properties properties) {
        return new DynamoDBStreamsProxy(properties);
    }

    @Override // org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy
    protected AmazonKinesis createKinesisClient(Properties properties) {
        ClientConfiguration config = new ClientConfigurationFactory().getConfig();
        AWSUtil.setAwsClientConfigProperties(config, properties);
        AWSCredentialsProvider credentialsProvider = AWSUtil.getCredentialsProvider(properties);
        config.setUserAgentPrefix(String.format(USER_AGENT_FORMAT, EnvironmentInformation.getVersion(), EnvironmentInformation.getRevisionInformation().commitId));
        AmazonDynamoDBStreamsAdapterClient amazonDynamoDBStreamsAdapterClient = new AmazonDynamoDBStreamsAdapterClient(credentialsProvider, config);
        if (properties.containsKey(AWSConfigConstants.AWS_ENDPOINT)) {
            amazonDynamoDBStreamsAdapterClient.setEndpoint(properties.getProperty(AWSConfigConstants.AWS_ENDPOINT));
        } else {
            amazonDynamoDBStreamsAdapterClient.setRegion(Region.getRegion(Regions.fromName(properties.getProperty("aws.region"))));
        }
        return amazonDynamoDBStreamsAdapterClient;
    }

    @Override // org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy, org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface
    public GetShardListResult getShardList(Map<String, String> map) throws InterruptedException {
        GetShardListResult getShardListResult = new GetShardListResult();
        for (Map.Entry<String, String> entry : map.entrySet()) {
            String key = entry.getKey();
            getShardListResult.addRetrievedShardsToStream(key, getShardsOfStream(key, entry.getValue()));
        }
        return getShardListResult;
    }

    private List<StreamShardHandle> getShardsOfStream(String str, @Nullable String str2) throws InterruptedException {
        DescribeStreamResult describeStream;
        ArrayList arrayList = new ArrayList();
        do {
            describeStream = describeStream(str, str2);
            List<Shard> shards = describeStream.getStreamDescription().getShards();
            Iterator<Shard> it = shards.iterator();
            while (it.hasNext()) {
                arrayList.add(new StreamShardHandle(str, it.next()));
            }
            if (shards.size() != 0) {
                str2 = shards.get(shards.size() - 1).getShardId();
            }
        } while (describeStream.getStreamDescription().isHasMoreShards().booleanValue());
        return arrayList;
    }
}
