package io.trino.plugin.kinesis;

import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient;
import com.amazonaws.services.kinesis.model.GetRecordsRequest;
import com.amazonaws.services.kinesis.model.GetRecordsResult;
import com.amazonaws.services.kinesis.model.GetShardIteratorRequest;
import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kinesis.model.ResourceNotFoundException;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import io.airlift.log.Logger;
import io.airlift.slice.Slice;
import io.airlift.units.Duration;
import io.trino.decoder.DecoderColumnHandle;
import io.trino.decoder.FieldValueProvider;
import io.trino.decoder.FieldValueProviders;
import io.trino.decoder.RowDecoder;
import io.trino.spi.StandardErrorCode;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.RecordCursor;
import io.trino.spi.connector.RecordSet;
import io.trino.spi.type.Type;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.zip.GZIPInputStream;

/* loaded from: input_file:io/trino/plugin/kinesis/KinesisRecordSet.class */
public class KinesisRecordSet implements RecordSet {
    private static final int MILLIS_BEHIND_LIMIT = 10000;
    private static final Logger log = Logger.get(KinesisRecordSet.class);
    private static final byte[] EMPTY_BYTE_ARRAY = new byte[0];
    private final KinesisSplit split;
    private final ConnectorSession session;
    private final KinesisClientProvider clientManager;
    private final RowDecoder messageDecoder;
    private final List<KinesisColumnHandle> columnHandles;
    private final List<Type> columnTypes;
    private final int batchSize;
    private final int maxBatches;
    private final int fetchAttempts;
    private final long sleepTime;
    private final boolean isLogBatches;
    private final boolean checkpointEnabled;
    private final KinesisShardCheckpointer shardCheckpointer;
    private String lastReadSequenceNumber;

    /* loaded from: input_file:io/trino/plugin/kinesis/KinesisRecordSet$KinesisRecordCursor.class */
    public class KinesisRecordCursor implements RecordCursor {
        private final FieldValueProvider[] currentRowValues;
        private long batchesRead;
        private long messagesRead;
        private long totalBytes;
        private long totalMessages;
        private long lastReadTime;
        private String shardIterator;
        private List<Record> kinesisRecords;
        private Iterator<Record> listIterator;
        private GetRecordsRequest getRecordsRequest;
        private GetRecordsResult getRecordsResult;

        public KinesisRecordCursor() {
            this.currentRowValues = new FieldValueProvider[KinesisRecordSet.this.columnHandles.size()];
        }

        public long getCompletedBytes() {
            return this.totalBytes;
        }

        public long getReadTimeNanos() {
            return 0L;
        }

        public Type getType(int i) {
            Preconditions.checkArgument(i < KinesisRecordSet.this.columnHandles.size(), "Invalid field index");
            return KinesisRecordSet.this.columnHandles.get(i).getType();
        }

        public boolean advanceNextPosition() {
            if (this.shardIterator == null && this.getRecordsRequest == null) {
                getIterator();
                KinesisRecordSet.log.debug("(%s:%s) Starting read.  Retrieved first shard iterator from AWS Kinesis.", new Object[]{KinesisRecordSet.this.split.getStreamName(), KinesisRecordSet.this.split.getShardId()});
            }
            if (this.getRecordsRequest == null || (!this.listIterator.hasNext() && shouldGetMoreRecords())) {
                getKinesisRecords();
            }
            if (this.listIterator.hasNext()) {
                return nextRow();
            }
            KinesisRecordSet.log.debug("(%s:%s) Read all of the records from the shard:  %d batches and %d messages and %d total bytes.", new Object[]{KinesisRecordSet.this.split.getStreamName(), KinesisRecordSet.this.split.getShardId(), Long.valueOf(this.batchesRead), Long.valueOf(this.totalMessages), Long.valueOf(this.totalBytes)});
            return false;
        }

        private boolean shouldGetMoreRecords() {
            return this.shardIterator != null && this.batchesRead < ((long) KinesisRecordSet.this.maxBatches) && getMillisBehindLatest() > 10000;
        }

        private void getKinesisRecords() throws ResourceNotFoundException {
            boolean z = false;
            for (int i = 0; !z && i < KinesisRecordSet.this.fetchAttempts; i++) {
                Duration nanosSince = Duration.nanosSince(this.lastReadTime);
                if (nanosSince.toMillis() <= KinesisRecordSet.this.sleepTime) {
                    try {
                        Thread.sleep(nanosSince.toMillis());
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        throw new RuntimeException("thread interrupted");
                    }
                }
                this.getRecordsRequest = new GetRecordsRequest();
                this.getRecordsRequest.setShardIterator(this.shardIterator);
                this.getRecordsRequest.setLimit(Integer.valueOf(KinesisRecordSet.this.batchSize));
                this.getRecordsResult = KinesisRecordSet.this.clientManager.getClient().getRecords(this.getRecordsRequest);
                this.lastReadTime = System.nanoTime();
                this.shardIterator = this.getRecordsResult.getNextShardIterator();
                this.kinesisRecords = this.getRecordsResult.getRecords();
                if (KinesisRecordSet.this.isLogBatches) {
                    KinesisRecordSet.log.info("(%s:%s) Fetched %d records from Kinesis.  MillisBehindLatest=%d Attempt=%d", new Object[]{KinesisRecordSet.this.split.getStreamName(), KinesisRecordSet.this.split.getShardId(), Integer.valueOf(this.kinesisRecords.size()), this.getRecordsResult.getMillisBehindLatest(), Integer.valueOf(i)});
                }
                z = this.kinesisRecords.size() > 0 || getMillisBehindLatest() <= 10000;
            }
            this.listIterator = this.kinesisRecords.iterator();
            this.batchesRead++;
            this.messagesRead += this.kinesisRecords.size();
        }

        private boolean nextRow() {
            Record next = this.listIterator.next();
            KinesisRecordSet.log.debug("(%s:%s) Reading record with partition key %s", new Object[]{KinesisRecordSet.this.split.getStreamName(), KinesisRecordSet.this.split.getShardId(), next.getPartitionKey()});
            byte[] bArr = KinesisRecordSet.EMPTY_BYTE_ARRAY;
            ByteBuffer data = next.getData();
            if (data != null) {
                bArr = new byte[data.remaining()];
                data.get(bArr);
            }
            this.totalBytes += bArr.length;
            this.totalMessages++;
            KinesisRecordSet.log.debug("(%s:%s) Fetching %d bytes from current record. %d messages read so far", new Object[]{KinesisRecordSet.this.split.getStreamName(), KinesisRecordSet.this.split.getShardId(), Integer.valueOf(bArr.length), Long.valueOf(this.totalMessages)});
            Optional<Map<DecoderColumnHandle, FieldValueProvider>> decodeMessage = decodeMessage(bArr);
            HashMap hashMap = new HashMap();
            for (KinesisColumnHandle kinesisColumnHandle : KinesisRecordSet.this.columnHandles) {
                if (kinesisColumnHandle.isInternal()) {
                    KinesisInternalFieldDescription forColumnName = KinesisInternalFieldDescription.forColumnName(kinesisColumnHandle.getName());
                    switch (forColumnName) {
                        case SHARD_ID_FIELD:
                            hashMap.put(kinesisColumnHandle, FieldValueProviders.bytesValueProvider(KinesisRecordSet.this.split.getShardId().getBytes(StandardCharsets.UTF_8)));
                            break;
                        case SEGMENT_START_FIELD:
                            hashMap.put(kinesisColumnHandle, FieldValueProviders.bytesValueProvider(KinesisRecordSet.this.split.getStart().getBytes(StandardCharsets.UTF_8)));
                            break;
                        case SEGMENT_COUNT_FIELD:
                            hashMap.put(kinesisColumnHandle, FieldValueProviders.longValueProvider(this.totalMessages));
                            break;
                        case SHARD_SEQUENCE_ID_FIELD:
                            hashMap.put(kinesisColumnHandle, FieldValueProviders.bytesValueProvider(next.getSequenceNumber().getBytes(StandardCharsets.UTF_8)));
                            break;
                        case MESSAGE_FIELD:
                            hashMap.put(kinesisColumnHandle, FieldValueProviders.bytesValueProvider(bArr));
                            break;
                        case MESSAGE_LENGTH_FIELD:
                            hashMap.put(kinesisColumnHandle, FieldValueProviders.longValueProvider(bArr.length));
                            break;
                        case MESSAGE_VALID_FIELD:
                            hashMap.put(kinesisColumnHandle, FieldValueProviders.booleanValueProvider(decodeMessage.isEmpty()));
                            break;
                        default:
                            throw new IllegalArgumentException("unknown internal field " + forColumnName);
                    }
                }
            }
            Objects.requireNonNull(hashMap);
            decodeMessage.ifPresent(hashMap::putAll);
            for (int i = 0; i < KinesisRecordSet.this.columnHandles.size(); i++) {
                this.currentRowValues[i] = (FieldValueProvider) hashMap.get(KinesisRecordSet.this.columnHandles.get(i));
            }
            return true;
        }

        private Optional<Map<DecoderColumnHandle, FieldValueProvider>> decodeMessage(byte[] bArr) {
            KinesisCompressionCodec compressionCodec = KinesisRecordSet.this.split.getCompressionCodec();
            if (compressionCodec == KinesisCompressionCodec.UNCOMPRESSED) {
                return KinesisRecordSet.this.messageDecoder.decodeRow(bArr);
            }
            if (!KinesisRecordSet.isGZipped(bArr)) {
                if (compressionCodec == KinesisCompressionCodec.AUTOMATIC) {
                    return KinesisRecordSet.this.messageDecoder.decodeRow(bArr);
                }
                throw new TrinoException(StandardErrorCode.GENERIC_INTERNAL_ERROR, String.format("A message was found that did not match the required %s compression codec. Consider using %s or %s compressionCodec in table description", compressionCodec, KinesisCompressionCodec.UNCOMPRESSED, KinesisCompressionCodec.AUTOMATIC));
            }
            if (!KinesisCompressionCodec.canUseGzip(compressionCodec)) {
                throw new TrinoException(StandardErrorCode.GENERIC_INTERNAL_ERROR, String.format("A %s message was found that did not match the required %s compression codec. Consider using %s or %s compressionCodec in table description", KinesisCompressionCodec.GZIP, compressionCodec, KinesisCompressionCodec.GZIP, KinesisCompressionCodec.AUTOMATIC));
            }
            try {
                ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bArr);
                try {
                    GZIPInputStream gZIPInputStream = new GZIPInputStream(byteArrayInputStream);
                    try {
                        Optional<Map<DecoderColumnHandle, FieldValueProvider>> decodeRow = KinesisRecordSet.this.messageDecoder.decodeRow(gZIPInputStream.readAllBytes());
                        gZIPInputStream.close();
                        byteArrayInputStream.close();
                        return decodeRow;
                    } catch (Throwable th) {
                        try {
                            gZIPInputStream.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                        throw th;
                    }
                } finally {
                }
            } catch (IOException e) {
                throw new TrinoException(StandardErrorCode.GENERIC_INTERNAL_ERROR, e);
            }
        }

        private long getMillisBehindLatest() {
            if (this.getRecordsResult == null || this.getRecordsResult.getMillisBehindLatest() == null) {
                return 10001L;
            }
            return this.getRecordsResult.getMillisBehindLatest().longValue();
        }

        public boolean getBoolean(int i) {
            return getFieldValueProvider(i, Boolean.TYPE).getBoolean();
        }

        public long getLong(int i) {
            return getFieldValueProvider(i, Long.TYPE).getLong();
        }

        public double getDouble(int i) {
            return getFieldValueProvider(i, Double.TYPE).getDouble();
        }

        public Slice getSlice(int i) {
            return getFieldValueProvider(i, Slice.class).getSlice();
        }

        public Object getObject(int i) {
            return getFieldValueProvider(i, Object.class).getObject();
        }

        public boolean isNull(int i) {
            Preconditions.checkArgument(i < KinesisRecordSet.this.columnHandles.size(), "Invalid field index");
            return this.currentRowValues[i] == null || this.currentRowValues[i].isNull();
        }

        private FieldValueProvider getFieldValueProvider(int i, Class<?> cls) {
            Preconditions.checkArgument(i < KinesisRecordSet.this.columnHandles.size(), "Invalid field index");
            checkFieldType(i, cls);
            return this.currentRowValues[i];
        }

        public void close() {
            KinesisRecordSet.log.info("(%s:%s) Closing cursor - read complete.  Total read: %d batches %d messages, processed: %d messages and %d bytes.", new Object[]{KinesisRecordSet.this.split.getStreamName(), KinesisRecordSet.this.split.getShardId(), Long.valueOf(this.batchesRead), Long.valueOf(this.messagesRead), Long.valueOf(this.totalMessages), Long.valueOf(this.totalBytes)});
            if (!KinesisRecordSet.this.checkpointEnabled || KinesisRecordSet.this.lastReadSequenceNumber == null) {
                return;
            }
            KinesisRecordSet.this.shardCheckpointer.checkpoint(KinesisRecordSet.this.lastReadSequenceNumber);
        }

        private void checkFieldType(int i, Class<?> cls) {
            Class<?> javaType = getType(i).getJavaType();
            Preconditions.checkArgument(javaType == cls, "Expected field %s to be type %s but is %s", Integer.valueOf(i), cls, javaType);
        }

        private void getIterator() throws ResourceNotFoundException {
            GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest();
            getShardIteratorRequest.setStreamName(KinesisRecordSet.this.split.getStreamName());
            getShardIteratorRequest.setShardId(KinesisRecordSet.this.split.getShardId());
            if (KinesisRecordSet.this.lastReadSequenceNumber != null) {
                getShardIteratorRequest.setShardIteratorType("AFTER_SEQUENCE_NUMBER");
                getShardIteratorRequest.setStartingSequenceNumber(KinesisRecordSet.this.lastReadSequenceNumber);
            } else if (KinesisSessionProperties.isIteratorFromTimestamp(KinesisRecordSet.this.session)) {
                getShardIteratorRequest.setShardIteratorType("AT_TIMESTAMP");
                long iteratorStartTimestamp = KinesisSessionProperties.getIteratorStartTimestamp(KinesisRecordSet.this.session);
                if (iteratorStartTimestamp == 0) {
                    getShardIteratorRequest.setTimestamp(new Date(System.currentTimeMillis() - (KinesisSessionProperties.getIteratorOffsetSeconds(KinesisRecordSet.this.session) * 1000)));
                } else {
                    getShardIteratorRequest.setTimestamp(new Date(iteratorStartTimestamp));
                }
            } else {
                getShardIteratorRequest.setShardIteratorType("TRIM_HORIZON");
            }
            this.shardIterator = KinesisRecordSet.this.clientManager.getClient().getShardIterator(getShardIteratorRequest).getShardIterator();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public KinesisRecordSet(KinesisSplit kinesisSplit, ConnectorSession connectorSession, KinesisClientProvider kinesisClientProvider, List<KinesisColumnHandle> list, RowDecoder rowDecoder, long j, long j2, boolean z, int i, Duration duration) {
        this.split = (KinesisSplit) Objects.requireNonNull(kinesisSplit, "split is null");
        this.session = (ConnectorSession) Objects.requireNonNull(connectorSession, "session is null");
        this.isLogBatches = z;
        this.clientManager = (KinesisClientProvider) Objects.requireNonNull(kinesisClientProvider, "clientManager is null");
        this.columnHandles = (List) Objects.requireNonNull(list, "columnHandles is null");
        this.messageDecoder = rowDecoder;
        ImmutableList.Builder builder = ImmutableList.builder();
        Iterator<KinesisColumnHandle> it = list.iterator();
        while (it.hasNext()) {
            builder.add(it.next().getType());
        }
        this.columnTypes = builder.build();
        this.batchSize = KinesisSessionProperties.getBatchSize(connectorSession);
        this.maxBatches = KinesisSessionProperties.getMaxBatches(this.session);
        this.fetchAttempts = i;
        this.sleepTime = duration.toMillis();
        this.checkpointEnabled = KinesisSessionProperties.isCheckpointEnabled(connectorSession);
        this.lastReadSequenceNumber = null;
        if (!this.checkpointEnabled) {
            this.shardCheckpointer = null;
            return;
        }
        AmazonDynamoDBClient dynamoDbClient = kinesisClientProvider.getDynamoDbClient();
        String streamName = kinesisSplit.getStreamName();
        int iterationNumber = KinesisSessionProperties.getIterationNumber(connectorSession);
        String checkpointLogicalName = KinesisSessionProperties.getCheckpointLogicalName(connectorSession);
        this.shardCheckpointer = new KinesisShardCheckpointer(dynamoDbClient, streamName, kinesisSplit, checkpointLogicalName != null ? checkpointLogicalName : null, iterationNumber, j, j2);
        this.lastReadSequenceNumber = this.shardCheckpointer.getLastReadSeqNumber();
    }

    public List<Type> getColumnTypes() {
        return this.columnTypes;
    }

    public RecordCursor cursor() {
        return new KinesisRecordCursor();
    }

    private static boolean isGZipped(byte[] bArr) {
        return bArr != null && bArr.length >= 2 && ((bArr[0] & 255) | ((bArr[1] << 8) & 65280)) == 35615;
    }
}
