package org.apache.flink.connector.kinesis.source.reader.fanout;

import java.time.Instant;
import java.time.temporal.TemporalAmount;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions;
import org.apache.flink.connector.kinesis.source.proxy.StreamProxy;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.arns.Arn;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.util.ChunkContentUtils;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.RegisterStreamConsumerResponse;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.ResourceInUseException;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/connector/kinesis/source/reader/fanout/StreamConsumerRegistrar.class */
public class StreamConsumerRegistrar {
    private static final Logger LOG = LoggerFactory.getLogger(StreamConsumerRegistrar.class);
    private final Configuration sourceConfig;
    private final String streamArn;
    private final StreamProxy kinesisStreamProxy;
    private String consumerArn;

    public StreamConsumerRegistrar(Configuration configuration, String str, StreamProxy streamProxy) {
        this.sourceConfig = configuration;
        this.streamArn = str;
        this.kinesisStreamProxy = streamProxy;
    }

    public void registerStreamConsumer() {
        if (this.sourceConfig.get(KinesisSourceConfigOptions.READER_TYPE) != KinesisSourceConfigOptions.ReaderType.EFO) {
            return;
        }
        String str = (String) this.sourceConfig.get(KinesisSourceConfigOptions.EFO_CONSUMER_NAME);
        Preconditions.checkNotNull(str, "For EFO reader type, EFO consumer name must be specified.");
        Preconditions.checkArgument(!str.isEmpty(), "For EFO reader type, EFO consumer name cannot be empty.");
        switch ((KinesisSourceConfigOptions.ConsumerLifecycle) this.sourceConfig.get(KinesisSourceConfigOptions.EFO_CONSUMER_LIFECYCLE)) {
            case JOB_MANAGED:
                try {
                    LOG.info("Registering stream consumer - {}::{}", this.streamArn, str);
                    RegisterStreamConsumerResponse registerStreamConsumer = this.kinesisStreamProxy.registerStreamConsumer(this.streamArn, str);
                    this.consumerArn = registerStreamConsumer.consumer().consumerARN();
                    LOG.info("Registered stream consumer - {}::{}", this.streamArn, registerStreamConsumer.consumer().consumerARN());
                    return;
                } catch (ResourceInUseException e) {
                    LOG.warn("Found existing consumer {} on stream {}. Proceeding to read from consumer.", new Object[]{str, this.streamArn, e});
                    return;
                }
            case SELF_MANAGED:
                LOG.info("Discovered stream consumer - {}", this.kinesisStreamProxy.describeStreamConsumer(this.streamArn, str));
                return;
            default:
                throw new IllegalArgumentException("Unsupported EFO consumer lifecycle: " + this.sourceConfig.get(KinesisSourceConfigOptions.EFO_CONSUMER_LIFECYCLE));
        }
    }

    public void deregisterStreamConsumer() {
        if (this.sourceConfig.get(KinesisSourceConfigOptions.READER_TYPE) == KinesisSourceConfigOptions.ReaderType.EFO && this.sourceConfig.get(KinesisSourceConfigOptions.EFO_CONSUMER_LIFECYCLE) == KinesisSourceConfigOptions.ConsumerLifecycle.JOB_MANAGED) {
            LOG.info("De-registering stream consumer - {}", this.consumerArn);
            if (this.consumerArn == null) {
                LOG.warn("Unable to deregister stream consumer as there was no consumer ARN stored in the StreamConsumerRegistrar. There may be leaked EFO consumers on the Kinesis stream.");
                return;
            }
            this.kinesisStreamProxy.deregisterStreamConsumer(this.consumerArn);
            LOG.info("De-registered stream consumer - {}", this.consumerArn);
            Instant plus = Instant.now().plus((TemporalAmount) this.sourceConfig.get(KinesisSourceConfigOptions.EFO_DEREGISTER_CONSUMER_TIMEOUT));
            String consumerNameFromArn = getConsumerNameFromArn(this.consumerArn);
            while (Instant.now().isBefore(plus)) {
                try {
                    LOG.info("Waiting for stream consumer to be deregistered - {} {} {}", new Object[]{this.streamArn, consumerNameFromArn, this.kinesisStreamProxy.describeStreamConsumer(this.streamArn, consumerNameFromArn).consumerDescription().consumerStatusAsString()});
                } catch (ResourceNotFoundException e) {
                    LOG.info("Stream consumer {} has been deregistered", this.consumerArn);
                    return;
                }
            }
            LOG.warn("Timed out waiting for stream consumer to be deregistered. There may be leaked EFO consumers on the Kinesis stream.");
        }
    }

    private String getConsumerNameFromArn(String str) {
        return StringUtils.substringBetween(Arn.fromString(str).resource().qualifier().orElseThrow(() -> {
            return new IllegalArgumentException("Unable to parse consumer name from consumer ARN");
        }), "/", ChunkContentUtils.HEADER_COLON_SEPARATOR);
    }
}
