package software.amazon.kinesis.leases;

import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
import lombok.NonNull;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.ChildShard;
import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse;
import software.amazon.awssdk.services.kinesis.model.KinesisException;
import software.amazon.awssdk.services.kinesis.model.LimitExceededException;
import software.amazon.awssdk.services.kinesis.model.ListShardsRequest;
import software.amazon.awssdk.services.kinesis.model.ListShardsResponse;
import software.amazon.awssdk.services.kinesis.model.ResourceInUseException;
import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
import software.amazon.awssdk.services.kinesis.model.Shard;
import software.amazon.awssdk.services.kinesis.model.ShardFilter;
import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
import software.amazon.awssdk.utils.CollectionUtils;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.common.FutureUtils;
import software.amazon.kinesis.common.KinesisRequestsBuilder;
import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.retrieval.AWSExceptionManager;

@KinesisClientInternalApi
/* loaded from: input_file:software/amazon/kinesis/leases/KinesisShardDetector.class */
public class KinesisShardDetector implements ShardDetector {

    @NonNull
    private final KinesisAsyncClient kinesisClient;

    @NonNull
    private final StreamIdentifier streamIdentifier;
    private final long listShardsBackoffTimeInMillis;
    private final int maxListShardsRetryAttempts;
    private final long listShardsCacheAllowedAgeInSeconds;
    private final int maxCacheMissesBeforeReload;
    private final int cacheMissWarningModulus;
    private final Duration kinesisRequestTimeout;
    private volatile Instant lastCacheUpdateTime;
    private static final Boolean THROW_RESOURCE_NOT_FOUND_EXCEPTION;
    private static final Logger log = LoggerFactory.getLogger(KinesisShardDetector.class);
    private static final AWSExceptionManager AWS_EXCEPTION_MANAGER = new AWSExceptionManager();
    private final Object $lock = new Object[0];
    private volatile Map<String, Shard> cachedShardMap = null;
    private final AtomicInteger cacheMisses = new AtomicInteger(0);

    public KinesisShardDetector(KinesisAsyncClient kinesisAsyncClient, StreamIdentifier streamIdentifier, long j, int i, long j2, int i2, int i3, Duration duration) {
        this.kinesisClient = kinesisAsyncClient;
        this.streamIdentifier = streamIdentifier;
        this.listShardsBackoffTimeInMillis = j;
        this.maxListShardsRetryAttempts = i;
        this.listShardsCacheAllowedAgeInSeconds = j2;
        this.maxCacheMissesBeforeReload = i2;
        this.cacheMissWarningModulus = i3;
        this.kinesisRequestTimeout = duration;
    }

    @Override // software.amazon.kinesis.leases.ShardDetector
    public Shard shard(@NonNull String str) {
        if (str == null) {
            throw new NullPointerException("shardId is marked non-null but is null");
        }
        if (CollectionUtils.isNullOrEmpty(this.cachedShardMap)) {
            synchronized (this) {
                if (CollectionUtils.isNullOrEmpty(this.cachedShardMap)) {
                    listShards();
                }
            }
        }
        Shard shard = this.cachedShardMap.get(str);
        if (shard == null && (this.cacheMisses.incrementAndGet() > this.maxCacheMissesBeforeReload || shouldRefreshCache())) {
            synchronized (this) {
                shard = this.cachedShardMap.get(str);
                if (shard == null) {
                    log.info("Too many shard map cache misses or cache is out of date -- forcing a refresh");
                    listShards();
                    shard = this.cachedShardMap.get(str);
                    if (shard == null) {
                        log.warn("Even after cache refresh shard '{}' wasn't found. This could indicate a bigger problem.", str);
                    }
                    this.cacheMisses.set(0);
                } else {
                    this.cacheMisses.set(0);
                }
            }
        }
        if (shard == null) {
            String format = String.format("Cannot find the shard given the shardId %s. Cache misses: %s", str, this.cacheMisses);
            if (this.cacheMisses.get() % this.cacheMissWarningModulus == 0) {
                log.warn(format);
            } else {
                log.debug(format);
            }
        }
        return shard;
    }

    @Override // software.amazon.kinesis.leases.ShardDetector
    public List<Shard> listShards() {
        List<Shard> listShardsWithFilter;
        synchronized (this.$lock) {
            listShardsWithFilter = listShardsWithFilter(null);
        }
        return listShardsWithFilter;
    }

    @Override // software.amazon.kinesis.leases.ShardDetector
    public List<Shard> listShardsWithoutConsumingResourceNotFoundException() {
        List<Shard> listShardsWithFilterInternal;
        synchronized (this.$lock) {
            listShardsWithFilterInternal = listShardsWithFilterInternal(null, THROW_RESOURCE_NOT_FOUND_EXCEPTION.booleanValue());
        }
        return listShardsWithFilterInternal;
    }

    @Override // software.amazon.kinesis.leases.ShardDetector
    public List<Shard> listShardsWithFilter(ShardFilter shardFilter) {
        List<Shard> listShardsWithFilterInternal;
        synchronized (this.$lock) {
            listShardsWithFilterInternal = listShardsWithFilterInternal(shardFilter, !THROW_RESOURCE_NOT_FOUND_EXCEPTION.booleanValue());
        }
        return listShardsWithFilterInternal;
    }

    private List<Shard> listShardsWithFilterInternal(ShardFilter shardFilter, boolean z) {
        ListShardsResponse listShards;
        ArrayList arrayList = new ArrayList();
        String str = null;
        do {
            listShards = listShards(shardFilter, str, z);
            if (listShards == null) {
                return null;
            }
            arrayList.addAll(listShards.shards());
            str = listShards.nextToken();
        } while (StringUtils.isNotEmpty(listShards.nextToken()));
        cachedShardMap(arrayList);
        return arrayList;
    }

    private ListShardsResponse listShards(ShardFilter shardFilter, String str, boolean z) {
        ListShardsRequest.Builder listShardsRequestBuilder = KinesisRequestsBuilder.listShardsRequestBuilder();
        if (StringUtils.isEmpty(str)) {
            listShardsRequestBuilder.streamName(this.streamIdentifier.streamName()).shardFilter(shardFilter);
            this.streamIdentifier.streamArnOptional().ifPresent(arn -> {
                listShardsRequestBuilder.streamARN(arn.toString());
            });
        } else {
            listShardsRequestBuilder.nextToken(str);
        }
        ListShardsRequest listShardsRequest = (ListShardsRequest) listShardsRequestBuilder.build();
        log.info("Stream {}: listing shards with list shards request {}", this.streamIdentifier, listShardsRequest);
        ListShardsResponse listShardsResponse = null;
        Throwable th = null;
        int i = this.maxListShardsRetryAttempts;
        while (listShardsResponse == null) {
            try {
                try {
                    listShardsResponse = getListShardsResponse(listShardsRequest);
                } catch (InterruptedException e) {
                    log.debug("Interrupted exception caught, shutdown initiated, returning null");
                    return null;
                } catch (ExecutionException e2) {
                    throw AWS_EXCEPTION_MANAGER.apply(e2.getCause());
                }
            } catch (ResourceNotFoundException e3) {
                log.warn("Got ResourceNotFoundException when fetching shard list for {}. Stream no longer exists.", this.streamIdentifier.streamName());
                if (z) {
                    throw e3;
                }
                return (ListShardsResponse) ListShardsResponse.builder().shards(Collections.emptyList()).nextToken((String) null).build();
            } catch (ResourceInUseException e4) {
                log.info("Stream is not in Active/Updating status, returning null (wait until stream is in Active or Updating)");
                return null;
            } catch (TimeoutException e5) {
                throw new RuntimeException(e5);
            } catch (LimitExceededException e6) {
                log.info("Got LimitExceededException when listing shards {}. Backing off for {} millis.", this.streamIdentifier, Long.valueOf(this.listShardsBackoffTimeInMillis));
                try {
                    Thread.sleep(this.listShardsBackoffTimeInMillis);
                } catch (InterruptedException e7) {
                    log.debug("Stream {} : Sleep  was interrupted ", this.streamIdentifier, e7);
                }
                th = e6;
            }
            i--;
            if (i <= 0 && listShardsResponse == null) {
                if (th != null) {
                    throw th;
                }
                throw new IllegalStateException("Received null from ListShards call.");
            }
        }
        return listShardsResponse;
    }

    void cachedShardMap(List<Shard> list) {
        this.cachedShardMap = (Map) list.stream().collect(Collectors.toMap((v0) -> {
            return v0.shardId();
        }, Function.identity()));
        this.lastCacheUpdateTime = Instant.now();
    }

    private boolean shouldRefreshCache() {
        Duration between = Duration.between(this.lastCacheUpdateTime, Instant.now());
        String format = String.format("Shard map cache is %d seconds old", Long.valueOf(between.getSeconds()));
        if (between.compareTo(Duration.of(this.listShardsCacheAllowedAgeInSeconds, ChronoUnit.SECONDS)) > 0) {
            log.info("{}. Age exceeds limit of {} seconds -- Refreshing.", format, Long.valueOf(this.listShardsCacheAllowedAgeInSeconds));
            return true;
        }
        log.debug("{}. Age doesn't exceed limit of {} seconds.", format, Long.valueOf(this.listShardsCacheAllowedAgeInSeconds));
        return false;
    }

    @Override // software.amazon.kinesis.leases.ShardDetector
    public ListShardsResponse getListShardsResponse(ListShardsRequest listShardsRequest) throws ExecutionException, TimeoutException, InterruptedException {
        return (ListShardsResponse) FutureUtils.resolveOrCancelFuture(this.kinesisClient.listShards(listShardsRequest), this.kinesisRequestTimeout);
    }

    @Override // software.amazon.kinesis.leases.ShardDetector
    public List<ChildShard> getChildShards(String str) throws InterruptedException, ExecutionException, TimeoutException {
        GetShardIteratorRequest.Builder shardId = KinesisRequestsBuilder.getShardIteratorRequestBuilder().streamName(this.streamIdentifier.streamName()).shardIteratorType(ShardIteratorType.LATEST).shardId(str);
        this.streamIdentifier.streamArnOptional().ifPresent(arn -> {
            shardId.streamARN(arn.toString());
        });
        GetRecordsRequest.Builder shardIterator = KinesisRequestsBuilder.getRecordsRequestBuilder().shardIterator(((GetShardIteratorResponse) FutureUtils.resolveOrCancelFuture(this.kinesisClient.getShardIterator((GetShardIteratorRequest) shardId.build()), this.kinesisRequestTimeout)).shardIterator());
        this.streamIdentifier.streamArnOptional().ifPresent(arn2 -> {
            shardIterator.streamARN(arn2.toString());
        });
        return ((GetRecordsResponse) FutureUtils.resolveOrCancelFuture(this.kinesisClient.getRecords((GetRecordsRequest) shardIterator.build()), this.kinesisRequestTimeout)).childShards();
    }

    @Override // software.amazon.kinesis.leases.ShardDetector
    @NonNull
    public StreamIdentifier streamIdentifier() {
        return this.streamIdentifier;
    }

    AtomicInteger cacheMisses() {
        return this.cacheMisses;
    }

    static {
        AWS_EXCEPTION_MANAGER.add(KinesisException.class, kinesisException -> {
            return kinesisException;
        });
        AWS_EXCEPTION_MANAGER.add(LimitExceededException.class, limitExceededException -> {
            return limitExceededException;
        });
        AWS_EXCEPTION_MANAGER.add(ResourceInUseException.class, resourceInUseException -> {
            return resourceInUseException;
        });
        AWS_EXCEPTION_MANAGER.add(ResourceNotFoundException.class, resourceNotFoundException -> {
            return resourceNotFoundException;
        });
        THROW_RESOURCE_NOT_FOUND_EXCEPTION = true;
    }
}
