/*
 * Decompiled with CFR 0.152.
 */
package kafka.tier.fetcher;

import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.Optional;
import java.util.OptionalInt;
import kafka.tier.fetcher.CancellationContext;
import kafka.tier.fetcher.offsetcache.FetchOffsetMetadata;
import org.apache.kafka.common.record.AbstractLegacyRecordBatch;
import org.apache.kafka.common.record.DefaultRecordBatch;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TierSegmentReader {
    private static final Logger log = LoggerFactory.getLogger(TierSegmentReader.class);
    private final String logPrefix;

    public TierSegmentReader(String logPrefix) {
        this.logPrefix = logPrefix;
    }

    public RecordsAndNextBatchMetadata readRecords(CancellationContext cancellationContext, InputStream inputStream, int maxBytes, long targetOffset, int startBytePosition, int segmentSize) throws IOException {
        NextOffsetAndBatchMetadata nextOffsetAndBatchMetadata;
        BatchAndReadState firstBatchAndState = this.readFirstBatch(cancellationContext, inputStream, targetOffset, segmentSize);
        if (firstBatchAndState == null) {
            return new RecordsAndNextBatchMetadata(MemoryRecords.EMPTY, null);
        }
        RecordBatch firstBatch = firstBatchAndState.recordBatch;
        int firstBatchSize = firstBatch.sizeInBytes();
        int totalRequestBytes = Math.max(firstBatchSize, maxBytes);
        ByteBuffer buffer = ByteBuffer.allocate(totalRequestBytes);
        firstBatch.writeTo(buffer);
        ReadState firstBatchState = firstBatchAndState.readState;
        ReadState subsequentBatchState = this.readInto(cancellationContext, inputStream, buffer, segmentSize);
        if (subsequentBatchState.totalBytesRead > 0) {
            ReadState mergedState = new ReadState(firstBatchState.totalBytesRead + subsequentBatchState.totalBytesRead, subsequentBatchState.lastBatchStartPosition, subsequentBatchState.nextBatchSize, subsequentBatchState.safeToReadMore);
            nextOffsetAndBatchMetadata = this.determineNextFetchMetadata(buffer, inputStream, mergedState, startBytePosition, segmentSize);
        } else {
            ReadState mergedState = new ReadState(firstBatchState.totalBytesRead, firstBatchState.lastBatchStartPosition, firstBatchState.nextBatchSize, firstBatchState.safeToReadMore && subsequentBatchState.safeToReadMore);
            nextOffsetAndBatchMetadata = this.determineNextFetchMetadata(buffer, inputStream, mergedState, startBytePosition, segmentSize);
        }
        buffer.flip();
        return new RecordsAndNextBatchMetadata(new MemoryRecords(buffer), nextOffsetAndBatchMetadata);
    }

    public Optional<Long> offsetForTimestamp(CancellationContext cancellationContext, InputStream inputStream, long targetTimestamp, int segmentSize) throws IOException {
        while (!cancellationContext.isCancelled()) {
            RecordBatch recordBatch = this.readBatch(inputStream, segmentSize);
            if (recordBatch.maxTimestamp() < targetTimestamp) continue;
            for (Record record2 : recordBatch) {
                if (record2.timestamp() < targetTimestamp) continue;
                return Optional.of(record2.offset());
            }
        }
        return Optional.empty();
    }

    private BatchAndReadState readFirstBatch(CancellationContext cancellationContext, InputStream inputStream, long targetOffset, int segmentSize) throws IOException {
        RecordBatch firstBatch = null;
        int totalBytesRead = 0;
        while (!cancellationContext.isCancelled()) {
            RecordBatch recordBatch = this.readBatch(inputStream, segmentSize);
            totalBytesRead += recordBatch.sizeInBytes();
            if (recordBatch.baseOffset() > targetOffset || recordBatch.lastOffset() < targetOffset) continue;
            firstBatch = recordBatch;
            break;
        }
        if (firstBatch != null) {
            ReadState readState = new ReadState(totalBytesRead, 0, OptionalInt.empty(), true);
            log.debug("{} completed reading first batch: {}", (Object)this.logPrefix, (Object)readState);
            return new BatchAndReadState(firstBatch, readState);
        }
        log.debug("{} could not read first batch", (Object)this.logPrefix);
        return null;
    }

    private ReadState readInto(CancellationContext cancellationContext, InputStream inputStream, ByteBuffer buffer, int segmentSize) {
        int initialPosition;
        int lastBatchStartPosition = initialPosition = buffer.position();
        int totalBytesRead = 0;
        OptionalInt nextBatchSize = OptionalInt.empty();
        boolean readPartialData = false;
        while (!cancellationContext.isCancelled() && buffer.position() < buffer.limit()) {
            try {
                int headerStartPosition = buffer.position();
                int headerBytesRead = Utils.readBytes((InputStream)inputStream, (ByteBuffer)buffer, (int)17);
                if (headerBytesRead < 17) {
                    readPartialData = true;
                    break;
                }
                int batchSize = this.readMagicAndBatchSize(buffer, headerStartPosition, segmentSize).batchSize;
                int remainingBytes = batchSize - 17;
                int payloadBytesRead = Utils.readBytes((InputStream)inputStream, (ByteBuffer)buffer, (int)remainingBytes);
                if (payloadBytesRead < remainingBytes) {
                    log.debug("{} could not read full batch at end of stream", (Object)this.logPrefix);
                    nextBatchSize = OptionalInt.of(batchSize);
                    readPartialData = true;
                    break;
                }
                lastBatchStartPosition = headerStartPosition;
                totalBytesRead += headerBytesRead + remainingBytes;
            }
            catch (EOFException e) {
                readPartialData = true;
                log.debug("{} terminating read loop due to EOFException", (Object)this.logPrefix, (Object)e);
                break;
            }
            catch (IOException e) {
                readPartialData = true;
                log.error("{} terminating read loop due to IOException", (Object)this.logPrefix, (Object)e);
                break;
            }
        }
        buffer.position(initialPosition + totalBytesRead);
        ReadState state = new ReadState(totalBytesRead, lastBatchStartPosition, nextBatchSize, !readPartialData);
        log.debug("{} completed reading all batches: {}", (Object)this.logPrefix, (Object)state);
        return state;
    }

    private NextOffsetAndBatchMetadata determineNextFetchMetadata(ByteBuffer buffer, InputStream inputStream, ReadState readState, int startBytePosition, int segmentSize) throws IOException {
        if (readState.totalBytesRead == 0) {
            return null;
        }
        OptionalInt nextBatchSize = readState.nextBatchSize;
        if (!nextBatchSize.isPresent() && readState.safeToReadMore) {
            ByteBuffer tmpBuffer = ByteBuffer.allocate(17);
            Utils.readFully((InputStream)inputStream, (ByteBuffer)tmpBuffer);
            if (!tmpBuffer.hasRemaining()) {
                nextBatchSize = OptionalInt.of(this.readMagicAndBatchSize(tmpBuffer, 0, segmentSize).batchSize);
            }
        }
        long nextOffset = this.nextOffset(readState.lastBatchStartPosition, buffer);
        int nextOffsetBytePosition = startBytePosition + readState.totalBytesRead;
        if (nextOffsetBytePosition < segmentSize) {
            return new NextOffsetAndBatchMetadata(new FetchOffsetMetadata(nextOffsetBytePosition, nextBatchSize), nextOffset);
        }
        return null;
    }

    private long nextOffset(int finalBatchStartPosition, ByteBuffer buffer) {
        ByteBuffer duplicate = buffer.duplicate();
        duplicate.position(finalBatchStartPosition);
        ByteBuffer batchBuffer = duplicate.slice();
        byte magic = batchBuffer.get(16);
        Object batch = magic < 2 ? new AbstractLegacyRecordBatch.ByteBufferLegacyRecordBatch(batchBuffer) : new DefaultRecordBatch(batchBuffer);
        return batch.nextOffset();
    }

    private MagicAndBatchSizePair readMagicAndBatchSize(ByteBuffer buffer, int headerStartPosition, int segmentSize) {
        byte magic = buffer.get(headerStartPosition + 16);
        if (magic > 2) {
            throw new IllegalStateException(this.logPrefix + " unknown magic: " + magic);
        }
        int extraLength = 12;
        int batchSize = buffer.getInt(headerStartPosition + 8) + 12;
        if (batchSize <= 0 || batchSize > segmentSize) {
            throw new IllegalStateException(this.logPrefix + " illegal batch size: " + batchSize);
        }
        return new MagicAndBatchSizePair(magic, batchSize);
    }

    public RecordBatch readBatch(InputStream inputStream, int segmentSize) throws IOException {
        ByteBuffer logHeaderBuffer = ByteBuffer.allocate(17);
        int bytesRead = Utils.readBytes((InputStream)inputStream, (ByteBuffer)logHeaderBuffer, (int)17);
        if (bytesRead < 17) {
            throw new EOFException("Could not read HEADER_SIZE_UP_TO_MAGIC from InputStream");
        }
        logHeaderBuffer.rewind();
        MagicAndBatchSizePair magicAndBatchSizePair = this.readMagicAndBatchSize(logHeaderBuffer, 0, segmentSize);
        byte magic = magicAndBatchSizePair.magic;
        int batchSize = magicAndBatchSizePair.batchSize;
        ByteBuffer recordBatchBuffer = ByteBuffer.allocate(batchSize);
        recordBatchBuffer.put(logHeaderBuffer);
        int bytesToRead = recordBatchBuffer.limit() - recordBatchBuffer.position();
        int recordBatchBytesRead = Utils.readBytes((InputStream)inputStream, (ByteBuffer)recordBatchBuffer, (int)bytesToRead);
        if (recordBatchBytesRead < bytesToRead) {
            throw new EOFException("Attempted to read a record batch of size " + batchSize + " but was only able to read " + recordBatchBytesRead + " bytes");
        }
        recordBatchBuffer.rewind();
        Object recordBatch = magic < 2 ? new AbstractLegacyRecordBatch.ByteBufferLegacyRecordBatch(recordBatchBuffer) : new DefaultRecordBatch(recordBatchBuffer);
        return recordBatch;
    }

    private static class ReadState {
        final int totalBytesRead;
        final int lastBatchStartPosition;
        final OptionalInt nextBatchSize;
        final boolean safeToReadMore;

        private ReadState(int totalBytesRead, int lastBatchStartPosition, OptionalInt nextBatchSize, boolean safeToReadMore) {
            this.totalBytesRead = totalBytesRead;
            this.lastBatchStartPosition = lastBatchStartPosition;
            this.nextBatchSize = nextBatchSize;
            this.safeToReadMore = safeToReadMore;
        }

        public String toString() {
            return "ReadState(totalBytesRead=" + this.totalBytesRead + ", lastBatchStartPosition=" + this.lastBatchStartPosition + ", nextBatchSize=" + this.nextBatchSize + ", safeToReadMore=" + this.safeToReadMore + ")";
        }
    }

    private static class BatchAndReadState {
        private final RecordBatch recordBatch;
        private final ReadState readState;

        BatchAndReadState(RecordBatch recordBatch, ReadState readState) {
            this.recordBatch = recordBatch;
            this.readState = readState;
        }
    }

    public static class NextOffsetAndBatchMetadata {
        final FetchOffsetMetadata nextBatchMetadata;
        final long nextOffset;

        public NextOffsetAndBatchMetadata(FetchOffsetMetadata nextBatchMetadata, long nextOffset) {
            this.nextBatchMetadata = nextBatchMetadata;
            this.nextOffset = nextOffset;
        }

        public String toString() {
            return "NextOffsetAndBatchMetadata(nextBatchMetadata=" + this.nextBatchMetadata + ", nextOffset=" + this.nextOffset + ')';
        }
    }

    public static class RecordsAndNextBatchMetadata {
        final MemoryRecords records;
        final NextOffsetAndBatchMetadata nextOffsetAndBatchMetadata;

        public RecordsAndNextBatchMetadata(MemoryRecords records, NextOffsetAndBatchMetadata nextOffsetAndBatchMetadata) {
            this.records = records;
            this.nextOffsetAndBatchMetadata = nextOffsetAndBatchMetadata;
        }
    }

    private static class MagicAndBatchSizePair {
        private final byte magic;
        private final int batchSize;

        MagicAndBatchSizePair(byte magic, int batchSize) {
            this.magic = magic;
            this.batchSize = batchSize;
        }
    }
}

