/*
 * Decompiled with CFR 0.152.
 */
package kafka.tier.fetcher;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import kafka.log.OffsetPosition;
import kafka.log.TimestampOffset;
import kafka.server.DelayedOperationKey;
import kafka.server.TierOffsetForTimestampOperationKey;
import kafka.tier.TierTimestampAndOffset;
import kafka.tier.fetcher.CancellationContext;
import kafka.tier.fetcher.OffsetIndexFetchRequest;
import kafka.tier.fetcher.TierFetcherMetrics;
import kafka.tier.fetcher.TierSegmentReader;
import kafka.tier.fetcher.TimestampIndexFetchRequest;
import kafka.tier.store.TierObjectStore;
import kafka.tier.store.TierObjectStoreResponse;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.KafkaStorageException;
import org.apache.kafka.common.record.FileRecords;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PendingOffsetForTimestamp
implements Runnable {
    private static final Logger log = LoggerFactory.getLogger(PendingOffsetForTimestamp.class);
    private final CancellationContext cancellationContext;
    private final TierObjectStore tierObjectStore;
    private final Map<TopicPartition, TierTimestampAndOffset> timestamps;
    private final Optional<TierFetcherMetrics> tierFetcherMetrics;
    private final Consumer<DelayedOperationKey> fetchCompletionCallback;
    private final ConcurrentHashMap<TopicPartition, Optional<FileRecords.FileTimestampAndOffset>> results = new ConcurrentHashMap();
    private final UUID requestId = UUID.randomUUID();
    private final TierSegmentReader reader;
    private final String logPrefix = "PendingOffsetForTimestamp(" + this.requestId + ")";

    PendingOffsetForTimestamp(CancellationContext cancellationContext, TierObjectStore tierObjectStore, Map<TopicPartition, TierTimestampAndOffset> timestamps, Optional<TierFetcherMetrics> tierFetcherMetrics, Consumer<DelayedOperationKey> fetchCompletionCallback) {
        this.cancellationContext = cancellationContext;
        this.tierObjectStore = tierObjectStore;
        this.timestamps = Collections.unmodifiableMap(timestamps);
        this.tierFetcherMetrics = tierFetcherMetrics;
        this.fetchCompletionCallback = fetchCompletionCallback;
        this.reader = new TierSegmentReader(this.logPrefix);
    }

    public boolean isDone() {
        return this.cancellationContext.isCancelled() || this.isComplete();
    }

    public boolean isComplete() {
        return this.results.size() == this.timestamps.size();
    }

    public Map<TopicPartition, Optional<FileRecords.FileTimestampAndOffset>> results() {
        return Collections.unmodifiableMap(this.results);
    }

    public void completeExceptionally(TopicPartition topicPartition, Exception exception) {
        TierTimestampAndOffset timestampAndOffset = this.timestamps.get(topicPartition);
        if (timestampAndOffset != null) {
            this.results.put(topicPartition, Optional.of(new FileRecords.FileTimestampAndOffset(timestampAndOffset.timestamp, timestampAndOffset.leaderEpoch(), exception)));
        }
    }

    public List<DelayedOperationKey> delayedOperationKeys() {
        return Collections.singletonList(new TierOffsetForTimestampOperationKey(this.requestId));
    }

    private void complete() {
        if (this.fetchCompletionCallback != null) {
            for (DelayedOperationKey key : this.delayedOperationKeys()) {
                this.fetchCompletionCallback.accept(key);
            }
        }
    }

    public Map<TopicPartition, TierTimestampAndOffset> tierTimestampAndOffsets() {
        return this.timestamps;
    }

    @Override
    public void run() {
        log.debug("Starting offsetForTimestamp. requestId={}, timestamps={}.", (Object)this.requestId, this.timestamps);
        for (Map.Entry<TopicPartition, TierTimestampAndOffset> entry : this.timestamps.entrySet()) {
            TopicPartition topicPartition = entry.getKey();
            TierTimestampAndOffset tierTimestampAndOffset = entry.getValue();
            TierObjectStore.ObjectMetadata objectMetadata = tierTimestampAndOffset.metadata;
            long targetTimestamp = tierTimestampAndOffset.timestamp;
            try {
                if (!this.fetchable(topicPartition)) continue;
                TimestampOffset indexOffsetTimestamp = TimestampIndexFetchRequest.fetchOffsetForTimestamp(this.cancellationContext, this.tierObjectStore, tierTimestampAndOffset.metadata, targetTimestamp);
                if (!this.fetchable(topicPartition)) continue;
                OffsetPosition offsetPosition = OffsetIndexFetchRequest.fetchOffsetPositionForStartingOffset(this.cancellationContext, this.tierObjectStore, objectMetadata, indexOffsetTimestamp.offset());
                if (!this.fetchable(topicPartition)) continue;
                TierObjectStoreResponse response = this.tierObjectStore.getObject(objectMetadata, TierObjectStore.FileType.SEGMENT, offsetPosition.position());
                Throwable throwable = null;
                try {
                    Optional<Long> offsetOpt = this.reader.offsetForTimestamp(this.cancellationContext, response.getInputStream(), targetTimestamp, tierTimestampAndOffset.segmentSize);
                    this.results.putIfAbsent(topicPartition, offsetOpt.map(offset -> new FileRecords.FileTimestampAndOffset(targetTimestamp, offset.longValue(), tierTimestampAndOffset.leaderEpoch())));
                }
                catch (Throwable throwable2) {
                    throwable = throwable2;
                    throw throwable2;
                }
                finally {
                    if (response == null) continue;
                    if (throwable != null) {
                        try {
                            response.close();
                        }
                        catch (Throwable throwable3) {
                            throwable.addSuppressed(throwable3);
                        }
                        continue;
                    }
                    response.close();
                }
            }
            catch (Exception e) {
                log.debug("Failed to fetch TierTimestampAndOffset {} from tiered storage", (Object)tierTimestampAndOffset, (Object)e);
                this.tierFetcherMetrics.ifPresent(metrics -> metrics.fetchOffsetForTimestampException().record());
                this.results.putIfAbsent(topicPartition, Optional.of(new FileRecords.FileTimestampAndOffset(targetTimestamp, tierTimestampAndOffset.leaderEpoch(), (Exception)new KafkaStorageException((Throwable)e))));
            }
        }
        this.complete();
    }

    private boolean fetchable(TopicPartition topicPartition) {
        return !this.cancellationContext.isCancelled() && !this.results.containsKey(topicPartition);
    }

    public void cancel() {
        this.cancellationContext.cancel();
    }
}

