package org.apache.flink.streaming.connectors.kinesis.proxy;

import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.kinesis.shaded.com.amazonaws.AmazonServiceException;
import org.apache.flink.kinesis.shaded.com.amazonaws.ClientConfiguration;
import org.apache.flink.kinesis.shaded.com.amazonaws.ClientConfigurationFactory;
import org.apache.flink.kinesis.shaded.com.amazonaws.SdkClientException;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesis;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.DescribeStreamRequest;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.DescribeStreamResult;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.ExpiredNextTokenException;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.GetRecordsRequest;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.GetRecordsResult;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.GetShardIteratorRequest;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.GetShardIteratorResult;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.InvalidArgumentException;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.LimitExceededException;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.ListShardsRequest;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.ListShardsResult;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.ResourceInUseException;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.ResourceNotFoundException;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.Shard;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.ShardIteratorType;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.StreamStatus;
import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.class */
public class KinesisProxy implements KinesisProxyInterface {
    private static final Logger LOG = LoggerFactory.getLogger(KinesisProxy.class);
    private static final FullJitterBackoff BACKOFF = new FullJitterBackoff();
    private final AmazonKinesis kinesisClient;
    private final long listShardsBaseBackoffMillis;
    private final long listShardsMaxBackoffMillis;
    private final double listShardsExpConstant;
    private final int listShardsMaxRetries;
    private final long getRecordsBaseBackoffMillis;
    private final long getRecordsMaxBackoffMillis;
    private final double getRecordsExpConstant;
    private final int getRecordsMaxRetries;
    private final long getShardIteratorBaseBackoffMillis;
    private final long getShardIteratorMaxBackoffMillis;
    private final double getShardIteratorExpConstant;
    private final int getShardIteratorMaxRetries;
    private final long describeStreamBaseBackoffMillis;
    private final long describeStreamMaxBackoffMillis;
    private final double describeStreamExpConstant;

    /* JADX INFO: Access modifiers changed from: protected */
    public KinesisProxy(Properties properties) {
        Preconditions.checkNotNull(properties);
        KinesisConfigUtil.backfillConsumerKeys(properties);
        this.kinesisClient = createKinesisClient(properties);
        this.listShardsBaseBackoffMillis = Long.parseLong(properties.getProperty(ConsumerConfigConstants.LIST_SHARDS_BACKOFF_BASE, Long.toString(1000L)));
        this.listShardsMaxBackoffMillis = Long.parseLong(properties.getProperty(ConsumerConfigConstants.LIST_SHARDS_BACKOFF_MAX, Long.toString(5000L)));
        this.listShardsExpConstant = Double.parseDouble(properties.getProperty(ConsumerConfigConstants.LIST_SHARDS_BACKOFF_EXPONENTIAL_CONSTANT, Double.toString(1.5d)));
        this.listShardsMaxRetries = Integer.parseInt(properties.getProperty(ConsumerConfigConstants.LIST_SHARDS_RETRIES, Long.toString(10L)));
        this.describeStreamBaseBackoffMillis = Long.parseLong(properties.getProperty(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_BASE, Long.toString(2000L)));
        this.describeStreamMaxBackoffMillis = Long.parseLong(properties.getProperty(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_MAX, Long.toString(5000L)));
        this.describeStreamExpConstant = Double.parseDouble(properties.getProperty(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT, Double.toString(1.5d)));
        this.getRecordsBaseBackoffMillis = Long.parseLong(properties.getProperty(ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_BASE, Long.toString(300L)));
        this.getRecordsMaxBackoffMillis = Long.parseLong(properties.getProperty(ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_MAX, Long.toString(1000L)));
        this.getRecordsExpConstant = Double.parseDouble(properties.getProperty(ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT, Double.toString(1.5d)));
        this.getRecordsMaxRetries = Integer.parseInt(properties.getProperty(ConsumerConfigConstants.SHARD_GETRECORDS_RETRIES, Long.toString(3L)));
        this.getShardIteratorBaseBackoffMillis = Long.parseLong(properties.getProperty(ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_BASE, Long.toString(300L)));
        this.getShardIteratorMaxBackoffMillis = Long.parseLong(properties.getProperty(ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_MAX, Long.toString(1000L)));
        this.getShardIteratorExpConstant = Double.parseDouble(properties.getProperty(ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT, Double.toString(1.5d)));
        this.getShardIteratorMaxRetries = Integer.parseInt(properties.getProperty(ConsumerConfigConstants.SHARD_GETITERATOR_RETRIES, Long.toString(3L)));
    }

    protected AmazonKinesis createKinesisClient(Properties properties) {
        ClientConfiguration config = new ClientConfigurationFactory().getConfig();
        AWSUtil.setAwsClientConfigProperties(config, properties);
        return AWSUtil.createKinesisClient(properties, config);
    }

    public static KinesisProxyInterface create(Properties properties) {
        return new KinesisProxy(properties);
    }

    @Override // org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface
    public GetRecordsResult getRecords(String str, int i) throws InterruptedException {
        GetRecordsRequest getRecordsRequest = new GetRecordsRequest();
        getRecordsRequest.setShardIterator(str);
        getRecordsRequest.setLimit(Integer.valueOf(i));
        GetRecordsResult getRecordsResult = null;
        int i2 = 0;
        while (i2 <= this.getRecordsMaxRetries && getRecordsResult == null) {
            try {
                getRecordsResult = this.kinesisClient.getRecords(getRecordsRequest);
            } catch (SdkClientException e) {
                if (!isRecoverableSdkClientException(e)) {
                    throw e;
                }
                int i3 = i2;
                i2++;
                long calculateFullJitterBackoff = BACKOFF.calculateFullJitterBackoff(this.getRecordsBaseBackoffMillis, this.getRecordsMaxBackoffMillis, this.getRecordsExpConstant, i3);
                LOG.warn("Got recoverable SdkClientException. Backing off for " + calculateFullJitterBackoff + " millis (" + e.getClass().getName() + ": " + e.getMessage() + ")");
                BACKOFF.sleep(calculateFullJitterBackoff);
            }
        }
        if (getRecordsResult == null) {
            throw new RuntimeException("Retries exceeded for getRecords operation - all " + this.getRecordsMaxRetries + " retry attempts failed.");
        }
        return getRecordsResult;
    }

    @Override // org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface
    public GetShardListResult getShardList(Map<String, String> map) throws InterruptedException {
        GetShardListResult getShardListResult = new GetShardListResult();
        for (Map.Entry<String, String> entry : map.entrySet()) {
            String key = entry.getKey();
            getShardListResult.addRetrievedShardsToStream(key, getShardsOfStream(key, entry.getValue()));
        }
        return getShardListResult;
    }

    @Override // org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface
    public String getShardIterator(StreamShardHandle streamShardHandle, String str, @Nullable Object obj) throws InterruptedException {
        GetShardIteratorRequest withShardIteratorType = new GetShardIteratorRequest().withStreamName(streamShardHandle.getStreamName()).withShardId(streamShardHandle.getShard().getShardId()).withShardIteratorType(str);
        switch (ShardIteratorType.fromValue(str)) {
            case AT_TIMESTAMP:
                if (!(obj instanceof Date)) {
                    throw new IllegalArgumentException("Invalid object given for GetShardIteratorRequest() when ShardIteratorType is AT_TIMESTAMP. Must be a Date object.");
                }
                withShardIteratorType.setTimestamp((Date) obj);
                break;
            case AT_SEQUENCE_NUMBER:
            case AFTER_SEQUENCE_NUMBER:
                if (!(obj instanceof String)) {
                    throw new IllegalArgumentException("Invalid object given for GetShardIteratorRequest() when ShardIteratorType is AT_SEQUENCE_NUMBER or AFTER_SEQUENCE_NUMBER. Must be a String.");
                }
                withShardIteratorType.setStartingSequenceNumber((String) obj);
                break;
        }
        return getShardIterator(withShardIteratorType);
    }

    private String getShardIterator(GetShardIteratorRequest getShardIteratorRequest) throws InterruptedException {
        GetShardIteratorResult getShardIteratorResult = null;
        int i = 0;
        while (i <= this.getShardIteratorMaxRetries && getShardIteratorResult == null) {
            try {
                getShardIteratorResult = this.kinesisClient.getShardIterator(getShardIteratorRequest);
            } catch (AmazonServiceException e) {
                if (!isRecoverableException(e)) {
                    throw e;
                }
                int i2 = i;
                i++;
                long calculateFullJitterBackoff = BACKOFF.calculateFullJitterBackoff(this.getShardIteratorBaseBackoffMillis, this.getShardIteratorMaxBackoffMillis, this.getShardIteratorExpConstant, i2);
                LOG.warn("Got recoverable AmazonServiceException. Backing off for " + calculateFullJitterBackoff + " millis (" + e.getClass().getName() + ": " + e.getMessage() + ")");
                BACKOFF.sleep(calculateFullJitterBackoff);
            }
        }
        if (getShardIteratorResult == null) {
            throw new RuntimeException("Retries exceeded for getShardIterator operation - all " + this.getShardIteratorMaxRetries + " retry attempts failed.");
        }
        return getShardIteratorResult.getShardIterator();
    }

    protected boolean isRecoverableSdkClientException(SdkClientException sdkClientException) {
        return sdkClientException instanceof AmazonServiceException ? isRecoverableException((AmazonServiceException) sdkClientException) : ExceptionUtils.findThrowable(sdkClientException, SocketTimeoutException.class).isPresent();
    }

    protected static boolean isRecoverableException(AmazonServiceException amazonServiceException) {
        if (amazonServiceException.getErrorType() == null) {
            return false;
        }
        switch (amazonServiceException.getErrorType()) {
            case Client:
                return amazonServiceException instanceof ProvisionedThroughputExceededException;
            case Service:
            case Unknown:
                return true;
            default:
                return false;
        }
    }

    private List<StreamShardHandle> getShardsOfStream(String str, @Nullable String str2) throws InterruptedException {
        ArrayList arrayList = new ArrayList();
        String str3 = null;
        do {
            ListShardsResult listShards = listShards(str, str2, str3);
            if (listShards == null) {
                arrayList.clear();
                return arrayList;
            }
            Iterator<Shard> it = listShards.getShards().iterator();
            while (it.hasNext()) {
                arrayList.add(new StreamShardHandle(str, it.next()));
            }
            str3 = listShards.getNextToken();
        } while (str3 != null);
        return arrayList;
    }

    private ListShardsResult listShards(String str, @Nullable String str2, @Nullable String str3) throws InterruptedException {
        ListShardsRequest listShardsRequest = new ListShardsRequest();
        if (str3 == null) {
            listShardsRequest.setExclusiveStartShardId(str2);
            listShardsRequest.setStreamName(str);
        } else {
            listShardsRequest.setNextToken(str3);
        }
        ListShardsResult listShardsResult = null;
        int i = 0;
        while (true) {
            if (i > this.listShardsMaxRetries || listShardsResult != null) {
                break;
            }
            try {
                listShardsResult = this.kinesisClient.listShards(listShardsRequest);
            } catch (ExpiredNextTokenException e) {
                LOG.warn("List Shards has an expired token. Reusing the previous state.");
                if (str2 != null && listShardsResult != null) {
                    listShardsResult.getShards().removeIf(shard -> {
                        return StreamShardHandle.compareShardIds(shard.getShardId(), str2) <= 0;
                    });
                }
                return listShardsResult;
            } catch (InvalidArgumentException e2) {
                throw new RuntimeException("Invalid Arguments to listShards.", e2);
            } catch (LimitExceededException e3) {
                int i2 = i;
                i++;
                long calculateFullJitterBackoff = BACKOFF.calculateFullJitterBackoff(this.listShardsBaseBackoffMillis, this.listShardsMaxBackoffMillis, this.listShardsExpConstant, i2);
                LOG.warn("Got LimitExceededException when listing shards from stream " + str + ". Backing off for " + calculateFullJitterBackoff + " millis.");
                BACKOFF.sleep(calculateFullJitterBackoff);
            } catch (ResourceInUseException e4) {
                if (LOG.isWarnEnabled()) {
                    LOG.info("The stream is currently not in active state. Reusing the older state for the time being");
                    if (str2 != null) {
                        listShardsResult.getShards().removeIf(shard2 -> {
                            return StreamShardHandle.compareShardIds(shard2.getShardId(), str2) <= 0;
                        });
                    }
                    return listShardsResult;
                }
            } catch (ResourceNotFoundException e5) {
                throw new RuntimeException("Stream not found. Error while getting shard list.", e5);
            } catch (SdkClientException e6) {
                if (i >= this.listShardsMaxRetries || !isRecoverableSdkClientException(e6)) {
                    throw e6;
                }
                int i3 = i;
                i++;
                long calculateFullJitterBackoff2 = BACKOFF.calculateFullJitterBackoff(this.listShardsBaseBackoffMillis, this.listShardsMaxBackoffMillis, this.listShardsExpConstant, i3);
                LOG.warn("Got SdkClientException when listing shards from stream {}. Backing off for {} millis.", str, Long.valueOf(calculateFullJitterBackoff2));
                BACKOFF.sleep(calculateFullJitterBackoff2);
            }
        }
        throw e6;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public DescribeStreamResult describeStream(String str, @Nullable String str2) throws InterruptedException {
        DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest();
        describeStreamRequest.setStreamName(str);
        describeStreamRequest.setExclusiveStartShardId(str2);
        DescribeStreamResult describeStreamResult = null;
        int i = 0;
        while (describeStreamResult == null) {
            try {
                describeStreamResult = this.kinesisClient.describeStream(describeStreamRequest);
            } catch (LimitExceededException e) {
                int i2 = i;
                i++;
                long calculateFullJitterBackoff = BACKOFF.calculateFullJitterBackoff(this.describeStreamBaseBackoffMillis, this.describeStreamMaxBackoffMillis, this.describeStreamExpConstant, i2);
                LOG.warn(String.format("Got LimitExceededException when describing stream %s. Backing off for %d millis.", str, Long.valueOf(calculateFullJitterBackoff)));
                BACKOFF.sleep(calculateFullJitterBackoff);
            } catch (ResourceNotFoundException e2) {
                throw new RuntimeException("Error while getting stream details", e2);
            }
        }
        String streamStatus = describeStreamResult.getStreamDescription().getStreamStatus();
        if (!streamStatus.equals(StreamStatus.ACTIVE.toString()) && !streamStatus.equals(StreamStatus.UPDATING.toString()) && LOG.isWarnEnabled()) {
            LOG.warn(String.format("The status of stream %s is %s ; result of the current describeStream operation will not contain any shard information.", str, streamStatus));
        }
        return describeStreamResult;
    }
}
