package org.apache.beam.sdk.io.aws2.kinesis;

import java.io.IOException;
import java.util.NoSuchElementException;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.aws2.kinesis.KinesisIO;
import org.apache.beam.sdk.util.Preconditions;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/beam/sdk/io/aws2/kinesis/EFOKinesisReader.class */
public class EFOKinesisReader extends UnboundedSource.UnboundedReader<KinesisRecord> {
    private static final Logger LOG = LoggerFactory.getLogger(EFOKinesisReader.class);
    private final KinesisIO.Read spec;
    private final String consumerArn;
    private final KinesisAsyncClient kinesis;
    private final KinesisSource source;
    private final KinesisReaderCheckpoint initCheckpoint;
    private KinesisRecord currentRecord = null;
    private EFOShardSubscribersPool shardSubscribersPool = null;

    /* JADX INFO: Access modifiers changed from: package-private */
    public EFOKinesisReader(KinesisIO.Read read, String str, KinesisAsyncClient kinesisAsyncClient, KinesisReaderCheckpoint kinesisReaderCheckpoint, KinesisSource kinesisSource) {
        this.spec = read;
        this.consumerArn = str;
        this.kinesis = kinesisAsyncClient;
        this.initCheckpoint = kinesisReaderCheckpoint;
        this.source = kinesisSource;
    }

    public boolean start() throws IOException {
        LOG.info("Starting reader using {}", this.initCheckpoint);
        try {
            this.shardSubscribersPool = createPool();
            shardSubscribersPool().start(this.initCheckpoint);
            return advance();
        } catch (TransientKinesisException e) {
            throw new IOException(e);
        }
    }

    private EFOShardSubscribersPool shardSubscribersPool() {
        return (EFOShardSubscribersPool) Preconditions.checkStateNotNull(this.shardSubscribersPool, "Reader was not started");
    }

    public boolean advance() throws IOException {
        this.currentRecord = shardSubscribersPool().getNextRecord();
        return this.currentRecord != null;
    }

    public byte[] getCurrentRecordId() throws NoSuchElementException {
        return getOrThrow().getUniqueId();
    }

    /* renamed from: getCurrent, reason: merged with bridge method [inline-methods] */
    public KinesisRecord m13getCurrent() throws NoSuchElementException {
        return getOrThrow();
    }

    public Instant getCurrentTimestamp() throws NoSuchElementException {
        return getOrThrow().getApproximateArrivalTimestamp();
    }

    public void close() throws IOException {
        try {
            KinesisAsyncClient kinesisAsyncClient = this.kinesis;
            try {
                shardSubscribersPool().stop();
                if (kinesisAsyncClient != null) {
                    kinesisAsyncClient.close();
                }
            } finally {
            }
        } catch (RuntimeException e) {
            throw e;
        } catch (Exception e2) {
            throw new IOException(e2);
        }
    }

    public Instant getWatermark() {
        return shardSubscribersPool().getWatermark();
    }

    public UnboundedSource.CheckpointMark getCheckpointMark() {
        return shardSubscribersPool().getCheckpointMark();
    }

    /* renamed from: getCurrentSource, reason: merged with bridge method [inline-methods] */
    public UnboundedSource<KinesisRecord, ?> m12getCurrentSource() {
        return this.source;
    }

    EFOShardSubscribersPool createPool() throws TransientKinesisException {
        return new EFOShardSubscribersPool(this.spec, this.consumerArn, this.kinesis);
    }

    private KinesisRecord getOrThrow() throws NoSuchElementException {
        if (this.currentRecord != null) {
            return this.currentRecord;
        }
        throw new NoSuchElementException();
    }
}
