/*
 * Decompiled with CFR 0.152.
 */
package kafka.tier.fetcher;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
import kafka.log.AbortedTxn;
import kafka.log.OffsetPosition;
import kafka.server.DelayedOperationKey;
import kafka.server.TierFetchOperationKey;
import kafka.tier.fetcher.CancellationContext;
import kafka.tier.fetcher.OffsetIndexFetchRequest;
import kafka.tier.fetcher.TierAbortedTxnReader;
import kafka.tier.fetcher.TierFetchResult;
import kafka.tier.fetcher.TierFetcherMetrics;
import kafka.tier.fetcher.TierSegmentReader;
import kafka.tier.fetcher.offsetcache.FetchOffsetCache;
import kafka.tier.fetcher.offsetcache.FetchOffsetMetadata;
import kafka.tier.store.TierObjectStore;
import kafka.tier.store.TierObjectStoreResponse;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.Records;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PendingFetch
implements Runnable {
    private static final Logger log = LoggerFactory.getLogger(PendingFetch.class);
    private final CancellationContext cancellationContext;
    private final TierObjectStore tierObjectStore;
    private final Optional<TierFetcherMetrics> tierFetcherMetrics;
    private final TierObjectStore.ObjectMetadata objectMetadata;
    private final Consumer<DelayedOperationKey> fetchCompletionCallback;
    private final long targetOffset;
    private final int maxBytes;
    private final int segmentSize;
    private final List<TopicPartition> ignoredTopicPartitions;
    private final UUID requestId = UUID.randomUUID();
    private final CompletableFuture<TierFetchResult> transferPromise;
    private final IsolationLevel isolationLevel;
    private final FetchOffsetCache cache;
    private final FetchOffsetMetadata fetchOffsetMetadata;
    private final TierSegmentReader reader;
    private final String logPrefix;

    PendingFetch(CancellationContext cancellationContext, TierObjectStore tierObjectStore, FetchOffsetCache cache, Optional<TierFetcherMetrics> tierFetcherMetrics, TierObjectStore.ObjectMetadata objectMetadata, Consumer<DelayedOperationKey> fetchCompletionCallback, long targetOffset, int maxBytes, int segmentSize, IsolationLevel isolationLevel, List<TopicPartition> ignoredTopicPartitions) {
        this.cancellationContext = cancellationContext;
        this.tierObjectStore = tierObjectStore;
        this.tierFetcherMetrics = tierFetcherMetrics;
        this.objectMetadata = objectMetadata;
        this.fetchCompletionCallback = fetchCompletionCallback;
        this.targetOffset = targetOffset;
        this.maxBytes = maxBytes;
        this.segmentSize = segmentSize;
        this.cache = cache;
        this.ignoredTopicPartitions = ignoredTopicPartitions;
        this.transferPromise = new CompletableFuture();
        this.isolationLevel = isolationLevel;
        this.logPrefix = "PendingFetch(requestId=" + this.requestId + ")";
        this.reader = new TierSegmentReader(this.logPrefix);
        this.fetchOffsetMetadata = targetOffset == objectMetadata.baseOffset() ? new FetchOffsetMetadata(0, OptionalInt.empty()) : cache.get(objectMetadata.objectId(), targetOffset);
    }

    public List<DelayedOperationKey> delayedOperationKeys() {
        return Collections.singletonList(new TierFetchOperationKey(this.objectMetadata.topicIdPartition().topicPartition(), this.requestId));
    }

    public boolean isComplete() {
        return this.transferPromise.isDone();
    }

    private OffsetPosition fetchOffsetPosition() throws Exception {
        if (this.fetchOffsetMetadata != null) {
            log.debug("{} using fetch position {}", (Object)this.logPrefix, (Object)this.fetchOffsetMetadata);
            return new OffsetPosition(this.targetOffset, this.fetchOffsetMetadata.bytePosition);
        }
        log.debug("{} fetching offset index", (Object)this.logPrefix);
        return OffsetIndexFetchRequest.fetchOffsetPositionForStartingOffset(this.cancellationContext, this.tierObjectStore, this.objectMetadata, this.targetOffset);
    }

    private Integer getEndRange(OffsetPosition offsetPosition) {
        if (this.fetchOffsetMetadata != null && this.fetchOffsetMetadata.recordBatchSize.isPresent()) {
            int length = Math.max(this.fetchOffsetMetadata.recordBatchSize.getAsInt() + 17, this.maxBytes);
            return offsetPosition.position() + length;
        }
        return null;
    }

    private TierObjectStoreResponse fetchSegment(OffsetPosition offsetPosition) throws IOException {
        Integer endRange = this.getEndRange(offsetPosition);
        if (endRange != null) {
            log.debug("{} fetching segment startPosition: {}, endPosition: {}", new Object[]{this.logPrefix, offsetPosition, endRange});
            return this.tierObjectStore.getObject(this.objectMetadata, TierObjectStore.FileType.SEGMENT, offsetPosition.position(), endRange);
        }
        log.debug("{} fetching segment startPosition: {}", (Object)this.logPrefix, (Object)offsetPosition);
        return this.tierObjectStore.getObject(this.objectMetadata, TierObjectStore.FileType.SEGMENT, offsetPosition.position());
    }

    private List<AbortedTxn> fetchAbortedTxns(MemoryRecords records) throws Exception {
        Long startOffset = null;
        long lastOffset = 0L;
        for (RecordBatch recordBatch : records.batches()) {
            if (startOffset == null) {
                startOffset = recordBatch.baseOffset();
            }
            lastOffset = recordBatch.lastOffset();
        }
        if (startOffset == null || lastOffset == 0L) {
            return Collections.emptyList();
        }
        try (TierObjectStoreResponse abortedTransactionsResponse = this.tierObjectStore.getObject(this.objectMetadata, TierObjectStore.FileType.TRANSACTION_INDEX);){
            List<AbortedTxn> list = TierAbortedTxnReader.readInto(this.cancellationContext, abortedTransactionsResponse.getInputStream(), startOffset, lastOffset);
            return list;
        }
    }

    @Override
    public void run() {
        block17: {
            try {
                log.debug("Starting tiered fetch. requestId={}, objectMetadata={}, targetOffset={}, maxBytes={}, isolationLevel={}.", new Object[]{this.requestId, this.objectMetadata, this.targetOffset, this.maxBytes, this.isolationLevel});
                if (!this.cancellationContext.isCancelled()) {
                    OffsetPosition offsetPosition = this.fetchOffsetPosition();
                    try (TierObjectStoreResponse response = this.fetchSegment(offsetPosition);){
                        TierSegmentReader.RecordsAndNextBatchMetadata recordsAndNextBatchMetadata = this.reader.readRecords(this.cancellationContext, response.getInputStream(), this.maxBytes, this.targetOffset, offsetPosition.position(), this.segmentSize);
                        MemoryRecords records = recordsAndNextBatchMetadata.records;
                        this.updateCache(recordsAndNextBatchMetadata.nextOffsetAndBatchMetadata);
                        if (this.objectMetadata.hasAbortedTxns() && this.isolationLevel == IsolationLevel.READ_COMMITTED) {
                            List<AbortedTxn> abortedTxns = this.fetchAbortedTxns(records);
                            this.completeFetch(records, abortedTxns, null);
                        } else {
                            this.completeFetch(records, Collections.emptyList(), null);
                        }
                        break block17;
                    }
                }
                this.completeFetch(MemoryRecords.EMPTY, Collections.emptyList(), null);
            }
            catch (Exception e) {
                this.completeFetch(MemoryRecords.EMPTY, Collections.emptyList(), e);
            }
        }
    }

    private void updateCache(TierSegmentReader.NextOffsetAndBatchMetadata nextOffsetAndBatchMetadata) {
        if (nextOffsetAndBatchMetadata != null) {
            long nextOffset = nextOffsetAndBatchMetadata.nextOffset;
            FetchOffsetMetadata nextBatchMetadata = nextOffsetAndBatchMetadata.nextBatchMetadata;
            if (nextBatchMetadata != null) {
                log.debug("{} updating cache. metadata: {}", (Object)this.logPrefix, (Object)nextOffsetAndBatchMetadata);
                this.cache.put(this.objectMetadata.objectId(), nextOffset, nextBatchMetadata);
            }
        }
    }

    public Map<TopicPartition, TierFetchResult> finish() {
        HashMap<TopicPartition, TierFetchResult> resultMap = new HashMap<TopicPartition, TierFetchResult>();
        try {
            TierFetchResult tierFetchResult = this.transferPromise.get();
            this.tierFetcherMetrics.ifPresent(metrics -> metrics.bytesFetched().record((double)tierFetchResult.records.sizeInBytes()));
            resultMap.put(this.objectMetadata.topicIdPartition().topicPartition(), tierFetchResult);
        }
        catch (InterruptedException e) {
            resultMap.put(this.objectMetadata.topicIdPartition().topicPartition(), TierFetchResult.emptyFetchResult());
        }
        catch (ExecutionException e) {
            log.warn("Failed exceptionally while finishing pending fetch request for partition {} from tiered storage. This exception is unexpected as the promise in not completed exceptionally ", (Object)this.objectMetadata.topicIdPartition().topicPartition(), (Object)e);
            this.tierFetcherMetrics.ifPresent(metrics -> metrics.fetchException().record());
            resultMap.put(this.objectMetadata.topicIdPartition().topicPartition(), new TierFetchResult((Records)MemoryRecords.EMPTY, Collections.emptyList(), e.getCause()));
        }
        for (TopicPartition ignoredTopicPartition : this.ignoredTopicPartitions) {
            resultMap.put(ignoredTopicPartition, TierFetchResult.emptyFetchResult());
        }
        return resultMap;
    }

    public void cancel() {
        this.cancellationContext.cancel();
        this.tierFetcherMetrics.ifPresent(metrics -> metrics.fetchCancelled().record());
    }

    private void completeFetch(MemoryRecords records, List<AbortedTxn> abortedTxns, Throwable throwable) {
        if (throwable != null) {
            log.error("{} tier fetch completed with exception", (Object)this.logPrefix, (Object)throwable);
            this.tierFetcherMetrics.ifPresent(metrics -> metrics.fetchException().record());
        }
        TierFetchResult tierFetchResult = new TierFetchResult((Records)records, abortedTxns, throwable);
        this.transferPromise.complete(tierFetchResult);
        if (this.fetchCompletionCallback != null) {
            for (DelayedOperationKey key : this.delayedOperationKeys()) {
                this.fetchCompletionCallback.accept(key);
            }
        }
    }
}

