package org.apache.beam.sdk.io.aws2.kinesis;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.aws2.common.ClientBuilderFactory;
import org.apache.beam.sdk.io.aws2.common.ClientConfiguration;
import org.apache.beam.sdk.io.aws2.kinesis.KinesisIO;
import org.apache.beam.sdk.io.aws2.options.AwsOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.cloudwatch.CloudWatchClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.services.kinesis.model.Shard;
import software.amazon.kinesis.common.KinesisClientUtil;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/beam/sdk/io/aws2/kinesis/KinesisSource.class */
public class KinesisSource extends UnboundedSource<KinesisRecord, KinesisReaderCheckpoint> {
    private static final long serialVersionUID = 1;
    private static final Logger LOG = LoggerFactory.getLogger(KinesisSource.class);
    private final KinesisIO.Read spec;
    private final KinesisReaderCheckpoint initialCheckpoint;

    /* JADX INFO: Access modifiers changed from: package-private */
    public KinesisSource(KinesisIO.Read read) {
        this(read, null);
    }

    KinesisSource(KinesisIO.Read read, KinesisReaderCheckpoint kinesisReaderCheckpoint) {
        this.spec = (KinesisIO.Read) Preconditions.checkNotNull(read);
        this.initialCheckpoint = kinesisReaderCheckpoint;
    }

    public List<KinesisSource> split(int i, PipelineOptions pipelineOptions) throws Exception {
        KinesisReaderCheckpoint generateInitCheckpoint;
        ArrayList arrayList = new ArrayList();
        if (this.initialCheckpoint != null) {
            generateInitCheckpoint = this.initialCheckpoint;
        } else {
            KinesisClient kinesisClient = (KinesisClient) ClientBuilderFactory.buildClient((AwsOptions) pipelineOptions.as(AwsOptions.class), KinesisClient.builder(), this.spec.getClientConfiguration());
            try {
                generateInitCheckpoint = generateInitCheckpoint(this.spec, kinesisClient);
                if (kinesisClient != null) {
                    kinesisClient.close();
                }
            } catch (Throwable th) {
                if (kinesisClient != null) {
                    try {
                        kinesisClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
        Iterator<KinesisReaderCheckpoint> it = generateInitCheckpoint.splitInto(i).iterator();
        while (it.hasNext()) {
            arrayList.add(new KinesisSource(this.spec, it.next()));
        }
        return arrayList;
    }

    public UnboundedSource.UnboundedReader<KinesisRecord> createReader(PipelineOptions pipelineOptions, KinesisReaderCheckpoint kinesisReaderCheckpoint) throws IOException {
        KinesisReaderCheckpoint kinesisReaderCheckpoint2;
        if (kinesisReaderCheckpoint != null) {
            LOG.info("Got checkpoint mark {}", kinesisReaderCheckpoint);
            kinesisReaderCheckpoint2 = kinesisReaderCheckpoint;
        } else {
            try {
                LOG.info("No checkpointMark specified, fall back to initial {}", this.initialCheckpoint);
                kinesisReaderCheckpoint2 = (KinesisReaderCheckpoint) org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull(this.initialCheckpoint);
            } catch (Exception e) {
                throw new IOException(e);
            }
        }
        return initReader(this.spec, pipelineOptions, kinesisReaderCheckpoint2, this);
    }

    public Coder<KinesisReaderCheckpoint> getCheckpointMarkCoder() {
        return SerializableCoder.of(KinesisReaderCheckpoint.class);
    }

    public Coder<KinesisRecord> getOutputCoder() {
        return KinesisRecordCoder.of();
    }

    static String resolveConsumerArn(KinesisIO.Read read, PipelineOptions pipelineOptions) {
        String str = (String) org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull(read.getStreamName());
        Map<String, String> kinesisIOConsumerArns = ((KinesisIOOptions) pipelineOptions.as(KinesisIOOptions.class)).getKinesisIOConsumerArns();
        return kinesisIOConsumerArns.containsKey(str) ? kinesisIOConsumerArns.get(str) : read.getConsumerArn();
    }

    UnboundedSource.UnboundedReader<KinesisRecord> initReader(KinesisIO.Read read, PipelineOptions pipelineOptions, KinesisReaderCheckpoint kinesisReaderCheckpoint, KinesisSource kinesisSource) {
        String resolveConsumerArn = resolveConsumerArn(read, pipelineOptions);
        if (resolveConsumerArn == null) {
            LOG.info("Creating new reader using {}", kinesisReaderCheckpoint);
            return new KinesisReader(read, createSimplifiedKinesisClient(pipelineOptions), kinesisReaderCheckpoint, kinesisSource);
        }
        LOG.info("Creating new EFO reader using {}", kinesisReaderCheckpoint);
        return new EFOKinesisReader(read, resolveConsumerArn, createAsyncClient(read, pipelineOptions), kinesisReaderCheckpoint, kinesisSource);
    }

    private SimplifiedKinesisClient createSimplifiedKinesisClient(PipelineOptions pipelineOptions) {
        AwsOptions awsOptions = (AwsOptions) pipelineOptions.as(AwsOptions.class);
        ClientConfiguration clientConfiguration = this.spec.getClientConfiguration();
        return new SimplifiedKinesisClient(() -> {
            return (KinesisClient) ClientBuilderFactory.buildClient(awsOptions, KinesisClient.builder(), clientConfiguration);
        }, () -> {
            return (CloudWatchClient) ClientBuilderFactory.buildClient(awsOptions, CloudWatchClient.builder(), clientConfiguration);
        }, this.spec.getRequestRecordsLimit());
    }

    private static KinesisAsyncClient createAsyncClient(KinesisIO.Read read, PipelineOptions pipelineOptions) {
        return (KinesisAsyncClient) ClientBuilderFactory.buildClient((AwsOptions) pipelineOptions.as(AwsOptions.class), KinesisClientUtil.adjustKinesisClientBuilder(KinesisAsyncClient.builder()), read.getClientConfiguration());
    }

    private KinesisReaderCheckpoint generateInitCheckpoint(KinesisIO.Read read, KinesisClient kinesisClient) throws IOException, InterruptedException {
        String str = (String) org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull(read.getStreamName());
        StartingPoint startingPoint = (StartingPoint) org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull(read.getInitialPosition());
        List<Shard> listShardsAtPoint = ShardListingUtils.listShardsAtPoint(kinesisClient, str, startingPoint);
        LOG.info("Creating a checkpoint with following shards {} at {}", listShardsAtPoint, startingPoint);
        return new KinesisReaderCheckpoint((Iterable) listShardsAtPoint.stream().map(shard -> {
            return new ShardCheckpoint(str, shard.shardId(), startingPoint);
        }).collect(Collectors.toList()));
    }
}
