package org.apache.beam.sdk.io.aws2.kinesis;

import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.beam.sdk.util.BackOff;
import org.apache.beam.sdk.util.BackOffUtils;
import org.apache.beam.sdk.util.FluentBackoff;
import org.apache.beam.sdk.util.Sleeper;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.Minutes;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.core.exception.SdkServiceException;
import software.amazon.awssdk.core.internal.retry.SdkDefaultRetrySetting;
import software.amazon.awssdk.services.cloudwatch.CloudWatchClient;
import software.amazon.awssdk.services.cloudwatch.model.Datapoint;
import software.amazon.awssdk.services.cloudwatch.model.Dimension;
import software.amazon.awssdk.services.cloudwatch.model.GetMetricStatisticsRequest;
import software.amazon.awssdk.services.cloudwatch.model.Statistic;
import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryRequest;
import software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException;
import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
import software.amazon.awssdk.services.kinesis.model.LimitExceededException;
import software.amazon.awssdk.services.kinesis.model.ListShardsRequest;
import software.amazon.awssdk.services.kinesis.model.ListShardsResponse;
import software.amazon.awssdk.services.kinesis.model.ProvisionedThroughputExceededException;
import software.amazon.awssdk.services.kinesis.model.Record;
import software.amazon.awssdk.services.kinesis.model.Shard;
import software.amazon.awssdk.services.kinesis.model.ShardFilter;
import software.amazon.awssdk.services.kinesis.model.ShardFilterType;
import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
import software.amazon.awssdk.services.kinesis.model.StreamDescriptionSummary;
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.retrieval.AggregatorUtil;
import software.amazon.kinesis.retrieval.KinesisClientRecord;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/beam/sdk/io/aws2/kinesis/SimplifiedKinesisClient.class */
public class SimplifiedKinesisClient implements AutoCloseable {
    private static final String KINESIS_NAMESPACE = "AWS/Kinesis";
    private static final String INCOMING_RECORDS_METRIC = "IncomingBytes";
    private static final int PERIOD_GRANULARITY_IN_SECONDS = 60;
    private static final String STREAM_NAME_DIMENSION = "StreamName";
    private static final int LIST_SHARDS_MAX_RESULTS = 1000;
    private static final int DESCRIBE_STREAM_SUMMARY_MAX_ATTEMPTS = 10;
    private final LazyResource<KinesisClient> kinesis;
    private final LazyResource<CloudWatchClient> cloudWatch;
    private final Integer limit;
    private static final Duration SPACING_FOR_TIMESTAMP_LIST_SHARDS_REQUEST_TO_NOT_EXCEED_TRIM_HORIZON = Duration.standardMinutes(5);
    private static final Duration DESCRIBE_STREAM_SUMMARY_INITIAL_BACKOFF = Duration.standardSeconds(1);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.beam.sdk.io.aws2.kinesis.SimplifiedKinesisClient$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/beam/sdk/io/aws2/kinesis/SimplifiedKinesisClient$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$software$amazon$kinesis$common$InitialPositionInStream = new int[InitialPositionInStream.values().length];

        static {
            try {
                $SwitchMap$software$amazon$kinesis$common$InitialPositionInStream[InitialPositionInStream.LATEST.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$software$amazon$kinesis$common$InitialPositionInStream[InitialPositionInStream.TRIM_HORIZON.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$software$amazon$kinesis$common$InitialPositionInStream[InitialPositionInStream.AT_TIMESTAMP.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/aws2/kinesis/SimplifiedKinesisClient$LazyResource.class */
    public static class LazyResource<T extends AutoCloseable> implements Supplier<T>, AutoCloseable {
        private final Supplier<T> initializer;
        private volatile T resource;

        private LazyResource(Supplier<T> supplier) {
            this.resource = null;
            this.initializer = supplier;
        }

        @Override // java.lang.AutoCloseable
        public void close() throws Exception {
            T t = this.resource;
            if (t != null) {
                t.close();
            }
        }

        @Override // java.util.function.Supplier
        public T get() {
            T t = this.resource;
            if (t == null) {
                synchronized (this) {
                    t = this.resource;
                    if (t == null) {
                        T t2 = this.initializer.get();
                        t = t2;
                        this.resource = t2;
                    }
                }
            }
            return t;
        }

        /* synthetic */ LazyResource(Supplier supplier, AnonymousClass1 anonymousClass1) {
            this(supplier);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SimplifiedKinesisClient(Supplier<KinesisClient> supplier, Supplier<CloudWatchClient> supplier2, Integer num) {
        this.kinesis = new LazyResource<>((Supplier) Preconditions.checkNotNull(supplier, "kinesis"), null);
        this.cloudWatch = new LazyResource<>((Supplier) Preconditions.checkNotNull(supplier2, "cloudWatch"), null);
        this.limit = num;
    }

    public String getShardIterator(String str, String str2, ShardIteratorType shardIteratorType, String str3, Instant instant) throws TransientKinesisException {
        return (String) wrapExceptions(() -> {
            return this.kinesis.get().getShardIterator((GetShardIteratorRequest) GetShardIteratorRequest.builder().streamName(str).shardId(str2).shardIteratorType(shardIteratorType).startingSequenceNumber(str3).timestamp(TimeUtil.toJava(instant)).build()).shardIterator();
        });
    }

    public List<Shard> listShardsAtPoint(String str, StartingPoint startingPoint) throws TransientKinesisException {
        return listShards(str, (ShardFilter) wrapExceptions(() -> {
            return buildShardFilterForStartingPoint(str, startingPoint);
        }));
    }

    private ShardFilter buildShardFilterForStartingPoint(String str, StartingPoint startingPoint) throws IOException, InterruptedException {
        InitialPositionInStream position = startingPoint.getPosition();
        switch (AnonymousClass1.$SwitchMap$software$amazon$kinesis$common$InitialPositionInStream[position.ordinal()]) {
            case 1:
                return (ShardFilter) ShardFilter.builder().type(ShardFilterType.AT_LATEST).build();
            case 2:
                return (ShardFilter) ShardFilter.builder().type(ShardFilterType.AT_TRIM_HORIZON).build();
            case 3:
                return buildShardFilterForTimestamp(str, startingPoint.getTimestamp());
            default:
                throw new IllegalArgumentException(String.format("Unrecognized '%s' position to create shard filter with", position));
        }
    }

    private ShardFilter buildShardFilterForTimestamp(String str, Instant instant) throws IOException, InterruptedException {
        StreamDescriptionSummary describeStreamSummary = describeStreamSummary(str);
        if (TimeUtil.toJoda(describeStreamSummary.streamCreationTimestamp()).isAfter(instant)) {
            return (ShardFilter) ShardFilter.builder().type(ShardFilterType.AT_TRIM_HORIZON).build();
        }
        return instant.isAfter(Instant.now().minus(Duration.standardHours((long) describeStreamSummary.retentionPeriodHours().intValue())).plus(SPACING_FOR_TIMESTAMP_LIST_SHARDS_REQUEST_TO_NOT_EXCEED_TRIM_HORIZON)) ? (ShardFilter) ShardFilter.builder().type(ShardFilterType.AT_TIMESTAMP).timestamp(TimeUtil.toJava(instant)).build() : (ShardFilter) ShardFilter.builder().type(ShardFilterType.AT_TRIM_HORIZON).build();
    }

    private StreamDescriptionSummary describeStreamSummary(String str) throws IOException, InterruptedException {
        BackOff backoff = FluentBackoff.DEFAULT.withMaxRetries(10).withInitialBackoff(DESCRIBE_STREAM_SUMMARY_INITIAL_BACKOFF).backoff();
        Sleeper sleeper = Sleeper.DEFAULT;
        DescribeStreamSummaryRequest describeStreamSummaryRequest = (DescribeStreamSummaryRequest) DescribeStreamSummaryRequest.builder().streamName(str).build();
        do {
            try {
                return this.kinesis.get().describeStreamSummary(describeStreamSummaryRequest).streamDescriptionSummary();
            } catch (LimitExceededException e) {
            }
        } while (BackOffUtils.next(sleeper, backoff));
        throw e;
    }

    public List<Shard> listShardsFollowingClosedShard(String str, String str2) throws TransientKinesisException {
        return listShards(str, (ShardFilter) ShardFilter.builder().type(ShardFilterType.AFTER_SHARD_ID).shardId(str2).build());
    }

    private List<Shard> listShards(String str, ShardFilter shardFilter) throws TransientKinesisException {
        return (List) wrapExceptions(() -> {
            ImmutableList.Builder builder = ImmutableList.builder();
            String str2 = null;
            do {
                ListShardsRequest.Builder shardFilter2 = ListShardsRequest.builder().maxResults(Integer.valueOf(LIST_SHARDS_MAX_RESULTS)).shardFilter(shardFilter);
                if (str2 != null) {
                    shardFilter2.nextToken(str2);
                } else {
                    shardFilter2.streamName(str);
                }
                ListShardsResponse listShards = this.kinesis.get().listShards((ListShardsRequest) shardFilter2.build());
                builder.addAll(listShards.shards());
                str2 = listShards.nextToken();
            } while (str2 != null);
            return builder.build();
        });
    }

    public GetKinesisRecordsResult getRecords(String str, String str2, String str3) throws TransientKinesisException {
        return getRecords(str, str2, str3, this.limit);
    }

    public GetKinesisRecordsResult getRecords(String str, String str2, String str3, Integer num) throws TransientKinesisException {
        return (GetKinesisRecordsResult) wrapExceptions(() -> {
            GetRecordsResponse records = this.kinesis.get().getRecords((GetRecordsRequest) GetRecordsRequest.builder().shardIterator(str).limit(num).build());
            return new GetKinesisRecordsResult(deaggregate(records.records()), records.nextShardIterator(), records.millisBehindLatest().longValue(), str2, str3);
        });
    }

    public static List<KinesisClientRecord> deaggregate(List<Record> list) {
        return list.isEmpty() ? ImmutableList.of() : new AggregatorUtil().deaggregate((List) list.stream().map(KinesisClientRecord::fromRecord).collect(Collectors.toList()));
    }

    public long getBacklogBytes(String str, Instant instant) throws TransientKinesisException {
        return getBacklogBytes(str, instant, new Instant());
    }

    public long getBacklogBytes(String str, Instant instant, Instant instant2) throws TransientKinesisException {
        return ((Long) wrapExceptions(() -> {
            Minutes minutesBetween = Minutes.minutesBetween(instant, instant2);
            if (minutesBetween.isLessThan(Minutes.ONE)) {
                return 0L;
            }
            long j = 0;
            Iterator it = this.cloudWatch.get().getMetricStatistics(createMetricStatisticsRequest(str, instant, instant2, minutesBetween)).datapoints().iterator();
            while (it.hasNext()) {
                j += ((Datapoint) it.next()).sum().longValue();
            }
            return Long.valueOf(j);
        })).longValue();
    }

    GetMetricStatisticsRequest createMetricStatisticsRequest(String str, Instant instant, Instant instant2, Minutes minutes) {
        return (GetMetricStatisticsRequest) GetMetricStatisticsRequest.builder().namespace(KINESIS_NAMESPACE).metricName(INCOMING_RECORDS_METRIC).period(Integer.valueOf(minutes.getMinutes() * PERIOD_GRANULARITY_IN_SECONDS)).startTime(TimeUtil.toJava(instant)).endTime(TimeUtil.toJava(instant2)).statistics(new Statistic[]{Statistic.SUM}).dimensions(new Dimension[]{(Dimension) Dimension.builder().name(STREAM_NAME_DIMENSION).value(str).build()}).build();
    }

    private <T> T wrapExceptions(Callable<T> callable) throws TransientKinesisException {
        try {
            return callable.call();
        } catch (Exception e) {
            throw new RuntimeException("Unknown kinesis failure, when trying to reach kinesis", e);
        } catch (SdkClientException e2) {
            if (SdkDefaultRetrySetting.RETRYABLE_EXCEPTIONS.contains(e2.getClass())) {
                throw new TransientKinesisException("Retryable failure", e2);
            }
            throw e2;
        } catch (ExpiredIteratorException e3) {
            throw e3;
        } catch (SdkServiceException e4) {
            if (e4.isThrottlingException() || SdkDefaultRetrySetting.RETRYABLE_STATUS_CODES.contains(Integer.valueOf(e4.statusCode()))) {
                throw new TransientKinesisException("Kinesis backend failed. Wait some time and retry.", e4);
            }
            throw e4;
        } catch (LimitExceededException | ProvisionedThroughputExceededException e5) {
            throw new KinesisClientThrottledException("Too many requests to Kinesis. Wait some time and retry.", e5);
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        LazyResource<KinesisClient> lazyResource = this.kinesis;
        Throwable th = null;
        try {
            LazyResource<CloudWatchClient> lazyResource2 = this.cloudWatch;
            Throwable th2 = null;
            if (lazyResource2 != null) {
                if (0 != 0) {
                    try {
                        lazyResource2.close();
                    } catch (Throwable th3) {
                        th2.addSuppressed(th3);
                    }
                } else {
                    lazyResource2.close();
                }
            }
            if (lazyResource != null) {
                if (0 == 0) {
                    lazyResource.close();
                    return;
                }
                try {
                    lazyResource.close();
                } catch (Throwable th4) {
                    th.addSuppressed(th4);
                }
            }
        } catch (Throwable th5) {
            if (lazyResource != null) {
                if (0 != 0) {
                    try {
                        lazyResource.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    lazyResource.close();
                }
            }
            throw th5;
        }
    }
}
