package org.apache.flink.connector.kinesis.source.reader.fanout;

import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.annotation.Internal;
import org.apache.flink.connector.kinesis.source.exception.KinesisStreamsSourceException;
import org.apache.flink.connector.kinesis.source.proxy.AsyncStreamProxy;
import org.apache.flink.connector.kinesis.source.split.StartingPosition;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.s3.model.InstructionFileId;
import org.apache.flink.kinesis.shaded.io.netty.handler.timeout.ReadTimeoutException;
import org.apache.flink.kinesis.shaded.org.reactivestreams.Subscriber;
import org.apache.flink.kinesis.shaded.org.reactivestreams.Subscription;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.InternalFailureException;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.KinesisException;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.LimitExceededException;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.ResourceInUseException;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.SubscribeToShardEventStream;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler;
import org.apache.flink.util.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSubscription.class */
public class FanOutKinesisShardSubscription {
    private static final Logger LOG = LoggerFactory.getLogger(FanOutKinesisShardSubscription.class);
    private static final List<Class<? extends Throwable>> RECOVERABLE_EXCEPTIONS = Arrays.asList(InternalFailureException.class, ResourceNotFoundException.class, KinesisException.class, ResourceInUseException.class, ReadTimeoutException.class, TimeoutException.class, IOException.class, LimitExceededException.class);
    private final AsyncStreamProxy kinesis;
    private final String consumerArn;
    private final String shardId;
    private final Duration subscriptionTimeout;
    private final BlockingQueue<SubscribeToShardEvent> eventQueue = new LinkedBlockingQueue(2);
    private final AtomicBoolean subscriptionActive = new AtomicBoolean(false);
    private final AtomicReference<Throwable> subscriptionException = new AtomicReference<>();
    private StartingPosition startingPosition;
    private FanOutShardSubscriber shardSubscriber;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSubscription$FanOutShardSubscriber.class */
    public class FanOutShardSubscriber implements Subscriber<SubscribeToShardEventStream> {
        private final CountDownLatch subscriptionLatch;
        private Subscription subscription;

        private FanOutShardSubscriber(CountDownLatch countDownLatch) {
            this.subscriptionLatch = countDownLatch;
        }

        public void requestRecords() {
            this.subscription.request(1L);
        }

        public void cancel() {
            if (!FanOutKinesisShardSubscription.this.subscriptionActive.get()) {
                FanOutKinesisShardSubscription.LOG.warn("Trying to cancel inactive subscription. Ignoring.");
                return;
            }
            FanOutKinesisShardSubscription.this.subscriptionActive.set(false);
            if (this.subscription != null) {
                this.subscription.cancel();
            }
        }

        @Override // org.apache.flink.kinesis.shaded.org.reactivestreams.Subscriber
        public void onSubscribe(Subscription subscription) {
            FanOutKinesisShardSubscription.LOG.info("Successfully subscribed to shard {} at {} using consumer {}.", new Object[]{FanOutKinesisShardSubscription.this.shardId, FanOutKinesisShardSubscription.this.startingPosition, FanOutKinesisShardSubscription.this.consumerArn});
            this.subscription = subscription;
            this.subscriptionLatch.countDown();
        }

        @Override // org.apache.flink.kinesis.shaded.org.reactivestreams.Subscriber
        public void onNext(SubscribeToShardEventStream subscribeToShardEventStream) {
            subscribeToShardEventStream.accept(new SubscribeToShardResponseHandler.Visitor() { // from class: org.apache.flink.connector.kinesis.source.reader.fanout.FanOutKinesisShardSubscription.FanOutShardSubscriber.1
                @Override // org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler.Visitor
                public void visit(SubscribeToShardEvent subscribeToShardEvent) {
                    try {
                        FanOutKinesisShardSubscription.LOG.debug("Received event: {}, {}", subscribeToShardEvent.getClass().getSimpleName(), subscribeToShardEvent);
                        FanOutKinesisShardSubscription.this.eventQueue.put(subscribeToShardEvent);
                        FanOutKinesisShardSubscription.this.startingPosition = StartingPosition.continueFromSequenceNumber(subscribeToShardEvent.continuationSequenceNumber());
                        FanOutShardSubscriber.this.requestRecords();
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        throw new KinesisStreamsSourceException("Interrupted while adding Kinesis record to internal buffer.", e);
                    }
                }
            });
        }

        @Override // org.apache.flink.kinesis.shaded.org.reactivestreams.Subscriber
        public void onError(Throwable th) {
            if (FanOutKinesisShardSubscription.this.subscriptionException.compareAndSet(null, th)) {
                return;
            }
            FanOutKinesisShardSubscription.LOG.warn("Another subscription exception has been queued, ignoring subsequent exceptions", th);
        }

        @Override // org.apache.flink.kinesis.shaded.org.reactivestreams.Subscriber
        public void onComplete() {
            FanOutKinesisShardSubscription.LOG.info("Subscription complete - {} ({})", FanOutKinesisShardSubscription.this.shardId, FanOutKinesisShardSubscription.this.consumerArn);
            cancel();
            FanOutKinesisShardSubscription.this.activateSubscription();
        }
    }

    public FanOutKinesisShardSubscription(AsyncStreamProxy asyncStreamProxy, String str, String str2, StartingPosition startingPosition, Duration duration) {
        this.kinesis = asyncStreamProxy;
        this.consumerArn = str;
        this.shardId = str2;
        this.startingPosition = startingPosition;
        this.subscriptionTimeout = duration;
    }

    public void activateSubscription() {
        LOG.info("Activating subscription to shard {} with starting position {} for consumer {}.", new Object[]{this.shardId, this.startingPosition, this.consumerArn});
        if (this.subscriptionActive.get()) {
            LOG.warn("Skipping activation of subscription since it is already active.");
            return;
        }
        CountDownLatch countDownLatch = new CountDownLatch(1);
        this.shardSubscriber = new FanOutShardSubscriber(countDownLatch);
        this.kinesis.subscribeToShard(this.consumerArn, this.shardId, this.startingPosition, SubscribeToShardResponseHandler.builder().subscriber(() -> {
            return this.shardSubscriber;
        }).onError(th -> {
            if (countDownLatch.getCount() > 0) {
                terminateSubscription(th);
                countDownLatch.countDown();
            }
        }).build()).exceptionally(th2 -> {
            if (ExceptionUtils.findThrowable(th2, ResourceInUseException.class).isPresent()) {
                countDownLatch.countDown();
                return null;
            }
            LOG.error("Error subscribing to shard {} with starting position {} for consumer {}.", new Object[]{this.shardId, this.startingPosition, this.consumerArn, th2});
            terminateSubscription(th2);
            return null;
        });
        CompletableFuture.runAsync(() -> {
            try {
                if (countDownLatch.await(this.subscriptionTimeout.toMillis(), TimeUnit.MILLISECONDS)) {
                    LOG.info("Successfully subscribed to shard {} with starting position {} for consumer {}.", new Object[]{this.shardId, this.startingPosition, this.consumerArn});
                    this.subscriptionActive.set(true);
                    this.shardSubscriber.requestRecords();
                } else {
                    String str = "Timeout when subscribing to shard " + this.shardId + " with starting position " + this.startingPosition + " for consumer " + this.consumerArn + InstructionFileId.DOT;
                    LOG.error(str);
                    terminateSubscription(new TimeoutException(str));
                }
            } catch (InterruptedException e) {
                LOG.warn("Interrupted while waiting for subscription to complete.", e);
                terminateSubscription(e);
                Thread.currentThread().interrupt();
            }
        });
    }

    private void terminateSubscription(Throwable th) {
        if (!this.subscriptionException.compareAndSet(null, th)) {
            LOG.warn("Another subscription exception has been queued, ignoring subsequent exceptions", th);
        }
        this.shardSubscriber.cancel();
    }

    public SubscribeToShardEvent nextEvent() {
        Throwable andSet = this.subscriptionException.getAndSet(null);
        if (andSet == null) {
            if (this.subscriptionActive.get()) {
                return this.eventQueue.poll();
            }
            LOG.debug("Subscription to shard {} for consumer {} is not yet active. Skipping.", this.shardId, this.consumerArn);
            return null;
        }
        if (ExceptionUtils.findThrowable(andSet, ResourceInUseException.class).isPresent()) {
            return null;
        }
        if (andSet instanceof ResourceNotFoundException) {
            throw ((ResourceNotFoundException) andSet);
        }
        Optional findFirst = RECOVERABLE_EXCEPTIONS.stream().map(cls -> {
            return ExceptionUtils.findThrowable(andSet, cls);
        }).filter((v0) -> {
            return v0.isPresent();
        }).map((v0) -> {
            return v0.get();
        }).findFirst();
        if (!findFirst.isPresent()) {
            LOG.error("Subscription encountered unrecoverable exception.", andSet);
            throw new KinesisStreamsSourceException("Subscription encountered unrecoverable exception.", andSet);
        }
        LOG.warn("Recoverable exception encountered while subscribing to shard. Ignoring.", (Throwable) findFirst.get());
        this.shardSubscriber.cancel();
        activateSubscription();
        return null;
    }
}
