package org.apache.beam.sdk.io.aws2.kinesis;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.function.Supplier;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.aws2.common.ClientBuilderFactory;
import org.apache.beam.sdk.io.aws2.common.ClientConfiguration;
import org.apache.beam.sdk.io.aws2.kinesis.KinesisIO;
import org.apache.beam.sdk.io.aws2.options.AwsOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.cloudwatch.CloudWatchClient;
import software.amazon.awssdk.services.kinesis.KinesisClient;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/beam/sdk/io/aws2/kinesis/KinesisSource.class */
public class KinesisSource extends UnboundedSource<KinesisRecord, KinesisReaderCheckpoint> {
    private static final Logger LOG = LoggerFactory.getLogger(KinesisSource.class);
    private final KinesisIO.Read spec;
    private final CheckpointGenerator checkpointGenerator;

    /* JADX INFO: Access modifiers changed from: package-private */
    public KinesisSource(KinesisIO.Read read) {
        this(read, new DynamicCheckpointGenerator(read.getStreamName(), read.getInitialPosition()));
    }

    private KinesisSource(KinesisIO.Read read, CheckpointGenerator checkpointGenerator) {
        this.spec = (KinesisIO.Read) Preconditions.checkNotNull(read);
        this.checkpointGenerator = (CheckpointGenerator) Preconditions.checkNotNull(checkpointGenerator);
    }

    private SimplifiedKinesisClient createClient(PipelineOptions pipelineOptions) {
        Supplier supplier;
        Supplier supplier2;
        AwsOptions awsOptions = (AwsOptions) pipelineOptions.as(AwsOptions.class);
        if (this.spec.getAWSClientsProvider() != null) {
            AWSClientsProvider aWSClientsProvider = this.spec.getAWSClientsProvider();
            Objects.requireNonNull(aWSClientsProvider);
            supplier = aWSClientsProvider::getKinesisClient;
            AWSClientsProvider aWSClientsProvider2 = this.spec.getAWSClientsProvider();
            Objects.requireNonNull(aWSClientsProvider2);
            supplier2 = aWSClientsProvider2::getCloudWatchClient;
        } else {
            ClientConfiguration clientConfiguration = this.spec.getClientConfiguration();
            supplier = () -> {
                return (KinesisClient) ClientBuilderFactory.buildClient(awsOptions, KinesisClient.builder(), clientConfiguration);
            };
            supplier2 = () -> {
                return (CloudWatchClient) ClientBuilderFactory.buildClient(awsOptions, CloudWatchClient.builder(), clientConfiguration);
            };
        }
        return new SimplifiedKinesisClient(supplier, supplier2, this.spec.getRequestRecordsLimit());
    }

    public List<KinesisSource> split(int i, PipelineOptions pipelineOptions) throws Exception {
        SimplifiedKinesisClient createClient = createClient(pipelineOptions);
        Throwable th = null;
        try {
            try {
                KinesisReaderCheckpoint generate = this.checkpointGenerator.generate(createClient);
                ArrayList newArrayList = Lists.newArrayList();
                Iterator<KinesisReaderCheckpoint> it = generate.splitInto(i).iterator();
                while (it.hasNext()) {
                    newArrayList.add(new KinesisSource(this.spec, new StaticCheckpointGenerator(it.next())));
                }
                if (createClient != null) {
                    if (0 != 0) {
                        try {
                            createClient.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        createClient.close();
                    }
                }
                return newArrayList;
            } finally {
            }
        } catch (Throwable th3) {
            if (createClient != null) {
                if (th != null) {
                    try {
                        createClient.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createClient.close();
                }
            }
            throw th3;
        }
    }

    public UnboundedSource.UnboundedReader<KinesisRecord> createReader(PipelineOptions pipelineOptions, KinesisReaderCheckpoint kinesisReaderCheckpoint) {
        CheckpointGenerator checkpointGenerator = this.checkpointGenerator;
        if (kinesisReaderCheckpoint != null) {
            checkpointGenerator = new StaticCheckpointGenerator(kinesisReaderCheckpoint);
        }
        LOG.info("Creating new reader using {}", checkpointGenerator);
        return new KinesisReader(this.spec, createClient(pipelineOptions), checkpointGenerator, this);
    }

    public Coder<KinesisReaderCheckpoint> getCheckpointMarkCoder() {
        return SerializableCoder.of(KinesisReaderCheckpoint.class);
    }

    public Coder<KinesisRecord> getOutputCoder() {
        return KinesisRecordCoder.of();
    }
}
