package org.apache.flink.streaming.connectors.kinesis.proxy;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.apache.flink.annotation.Internal;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.async.SdkAsyncHttpClient;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.DeregisterStreamConsumerRequest;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.DeregisterStreamConsumerResponse;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.DescribeStreamConsumerRequest;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.DescribeStreamConsumerResponse;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryRequest;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryResponse;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.RegisterStreamConsumerRequest;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.RegisterStreamConsumerResponse;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler;
import org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisherConfiguration;
import org.apache.flink.streaming.connectors.kinesis.util.AwsV2Util;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2.class */
public class KinesisProxyV2 implements KinesisProxyV2Interface {
    private static final Logger LOG = LoggerFactory.getLogger(KinesisProxyV2.class);
    private final KinesisAsyncClient kinesisAsyncClient;
    private final SdkAsyncHttpClient httpClient;
    private final FanOutRecordPublisherConfiguration fanOutRecordPublisherConfiguration;
    private final FullJitterBackoff backoff;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2$ResponseSupplier.class */
    public interface ResponseSupplier<T> {
        T get() throws ExecutionException, InterruptedException;
    }

    public KinesisProxyV2(KinesisAsyncClient kinesisAsyncClient, SdkAsyncHttpClient sdkAsyncHttpClient, FanOutRecordPublisherConfiguration fanOutRecordPublisherConfiguration, FullJitterBackoff fullJitterBackoff) {
        this.kinesisAsyncClient = (KinesisAsyncClient) Preconditions.checkNotNull(kinesisAsyncClient);
        this.httpClient = sdkAsyncHttpClient;
        this.fanOutRecordPublisherConfiguration = fanOutRecordPublisherConfiguration;
        this.backoff = fullJitterBackoff;
    }

    @Override // org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2Interface
    public CompletableFuture<Void> subscribeToShard(SubscribeToShardRequest subscribeToShardRequest, SubscribeToShardResponseHandler subscribeToShardResponseHandler) {
        return this.kinesisAsyncClient.subscribeToShard(subscribeToShardRequest, subscribeToShardResponseHandler);
    }

    @Override // org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2Interface
    public void close() {
        this.kinesisAsyncClient.close();
        this.httpClient.close();
    }

    @Override // org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2Interface
    public DescribeStreamSummaryResponse describeStreamSummary(String str) throws InterruptedException, ExecutionException {
        DescribeStreamSummaryRequest describeStreamSummaryRequest = (DescribeStreamSummaryRequest) DescribeStreamSummaryRequest.builder().streamName(str).mo3740build();
        return (DescribeStreamSummaryResponse) invokeWithRetryAndBackoff(() -> {
            return this.kinesisAsyncClient.describeStreamSummary(describeStreamSummaryRequest).get();
        }, this.fanOutRecordPublisherConfiguration.getDescribeStreamBaseBackoffMillis(), this.fanOutRecordPublisherConfiguration.getDescribeStreamMaxBackoffMillis(), this.fanOutRecordPublisherConfiguration.getDescribeStreamExpConstant(), this.fanOutRecordPublisherConfiguration.getDescribeStreamMaxRetries());
    }

    @Override // org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2Interface
    public DescribeStreamConsumerResponse describeStreamConsumer(String str, String str2) throws InterruptedException, ExecutionException {
        return describeStreamConsumer((DescribeStreamConsumerRequest) DescribeStreamConsumerRequest.builder().streamARN(str).consumerName(str2).mo3740build());
    }

    @Override // org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2Interface
    public DescribeStreamConsumerResponse describeStreamConsumer(String str) throws InterruptedException, ExecutionException {
        return describeStreamConsumer((DescribeStreamConsumerRequest) DescribeStreamConsumerRequest.builder().consumerARN(str).mo3740build());
    }

    private DescribeStreamConsumerResponse describeStreamConsumer(DescribeStreamConsumerRequest describeStreamConsumerRequest) throws InterruptedException, ExecutionException {
        return (DescribeStreamConsumerResponse) invokeWithRetryAndBackoff(() -> {
            return this.kinesisAsyncClient.describeStreamConsumer(describeStreamConsumerRequest).get();
        }, this.fanOutRecordPublisherConfiguration.getDescribeStreamConsumerBaseBackoffMillis(), this.fanOutRecordPublisherConfiguration.getDescribeStreamConsumerMaxBackoffMillis(), this.fanOutRecordPublisherConfiguration.getDescribeStreamConsumerExpConstant(), this.fanOutRecordPublisherConfiguration.getDescribeStreamConsumerMaxRetries());
    }

    @Override // org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2Interface
    public RegisterStreamConsumerResponse registerStreamConsumer(String str, String str2) throws InterruptedException, ExecutionException {
        RegisterStreamConsumerRequest registerStreamConsumerRequest = (RegisterStreamConsumerRequest) RegisterStreamConsumerRequest.builder().streamARN(str).consumerName(str2).mo3740build();
        return (RegisterStreamConsumerResponse) invokeWithRetryAndBackoff(() -> {
            return this.kinesisAsyncClient.registerStreamConsumer(registerStreamConsumerRequest).get();
        }, this.fanOutRecordPublisherConfiguration.getRegisterStreamBaseBackoffMillis(), this.fanOutRecordPublisherConfiguration.getRegisterStreamMaxBackoffMillis(), this.fanOutRecordPublisherConfiguration.getRegisterStreamExpConstant(), this.fanOutRecordPublisherConfiguration.getRegisterStreamMaxRetries());
    }

    @Override // org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2Interface
    public DeregisterStreamConsumerResponse deregisterStreamConsumer(String str) throws InterruptedException, ExecutionException {
        DeregisterStreamConsumerRequest deregisterStreamConsumerRequest = (DeregisterStreamConsumerRequest) DeregisterStreamConsumerRequest.builder().consumerARN(str).mo3740build();
        return (DeregisterStreamConsumerResponse) invokeWithRetryAndBackoff(() -> {
            return this.kinesisAsyncClient.deregisterStreamConsumer(deregisterStreamConsumerRequest).get();
        }, this.fanOutRecordPublisherConfiguration.getDeregisterStreamBaseBackoffMillis(), this.fanOutRecordPublisherConfiguration.getDeregisterStreamMaxBackoffMillis(), this.fanOutRecordPublisherConfiguration.getDeregisterStreamExpConstant(), this.fanOutRecordPublisherConfiguration.getDeregisterStreamMaxRetries());
    }

    private <T> T invokeWithRetryAndBackoff(ResponseSupplier<T> responseSupplier, long j, long j2, double d, int i) throws InterruptedException, ExecutionException {
        T t = null;
        int i2 = 0;
        while (i2 < i && t == null) {
            try {
                t = responseSupplier.get();
            } catch (Exception e) {
                if (!AwsV2Util.isRecoverableException(e)) {
                    throw e;
                }
                i2++;
                long calculateFullJitterBackoff = this.backoff.calculateFullJitterBackoff(j, j2, d, i2);
                LOG.warn("Encountered recoverable error: {}. Backing off for {} millis.", new Object[]{e.getClass().getSimpleName(), Long.valueOf(calculateFullJitterBackoff), e});
                this.backoff.sleep(calculateFullJitterBackoff);
            }
        }
        if (t == null) {
            throw new RuntimeException("Retries exceeded - all " + i + " retry attempts failed.");
        }
        return t;
    }
}
