/*
 * Decompiled with CFR 0.152.
 */
package kafka.tier.fetcher;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import kafka.server.DelayedOperationKey;
import kafka.tier.TierTimestampAndOffset;
import kafka.tier.fetcher.CancellationContext;
import kafka.tier.fetcher.PendingFetch;
import kafka.tier.fetcher.PendingOffsetForTimestamp;
import kafka.tier.fetcher.TierFetchMetadata;
import kafka.tier.fetcher.TierFetcherConfig;
import kafka.tier.fetcher.TierFetcherMetrics;
import kafka.tier.fetcher.offsetcache.FetchOffsetCache;
import kafka.tier.store.TierObjectStore;
import kafka.utils.KafkaScheduler;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import scala.Function0;
import scala.compat.java8.JFunction;
import scala.runtime.BoxedUnit;

public class TierFetcher {
    private final AtomicBoolean stopped = new AtomicBoolean(false);
    private final CancellationContext cancellationContext = CancellationContext.newContext();
    private final Logger logger;
    private final TierObjectStore tierObjectStore;
    private final ThreadPoolExecutor executorService;
    public final TierFetcherMetrics tierFetcherMetrics;
    final FetchOffsetCache cache;

    public TierFetcher(TierFetcherConfig tierFetcherConfig, TierObjectStore tierObjectStore, KafkaScheduler scheduler, Metrics metrics, LogContext logContext) {
        this.tierObjectStore = tierObjectStore;
        this.logger = logContext.logger(TierFetcher.class);
        this.executorService = (ThreadPoolExecutor)Executors.newFixedThreadPool(tierFetcherConfig.numFetchThreads);
        this.cache = new FetchOffsetCache(Time.SYSTEM, tierFetcherConfig.offsetCacheSize, tierFetcherConfig.offsetCacheExpirationMs);
        this.tierFetcherMetrics = new TierFetcherMetrics(metrics, this.executorService, this.cache);
        scheduler.schedule("tier-fetcher-clear-fetch-offset-cache", (Function0<BoxedUnit>)JFunction.func(() -> {
            this.cache.expireEntries();
            return null;
        }), tierFetcherConfig.offsetCacheExpiryPeriodMs, tierFetcherConfig.offsetCacheExpiryPeriodMs, TimeUnit.MILLISECONDS);
    }

    TierFetcher(TierObjectStore tierObjectStore, KafkaScheduler scheduler, Metrics metrics) {
        this(new TierFetcherConfig(), tierObjectStore, scheduler, metrics, new LogContext());
    }

    public void close() {
        this.logger.info("Closing TierFetcher");
        if (this.stopped.compareAndSet(false, true)) {
            this.cancellationContext.cancel();
            this.executorService.shutdownNow();
        }
    }

    public PendingFetch buildFetch(List<TierFetchMetadata> tierFetchMetadataList, IsolationLevel isolationLevel, Consumer<DelayedOperationKey> fetchCompletionCallback) {
        if (!tierFetchMetadataList.isEmpty()) {
            TierFetchMetadata firstFetchMetadata = tierFetchMetadataList.get(0);
            List<TopicPartition> ignoredTopicPartitions = tierFetchMetadataList.subList(1, tierFetchMetadataList.size()).stream().map(TierFetchMetadata::topicPartition).collect(Collectors.toList());
            if (firstFetchMetadata == null) {
                throw new IllegalStateException("No TierFetchMetadata supplied, cannot start fetch");
            }
            if (!this.stopped.get()) {
                this.logger.debug("Fetching " + firstFetchMetadata.topicPartition() + " from tiered storage");
                long targetOffset = firstFetchMetadata.fetchStartOffset();
                int maxBytes = firstFetchMetadata.maxBytes();
                CancellationContext cancellationContext = this.cancellationContext.subContext();
                return new PendingFetch(cancellationContext, this.tierObjectStore, this.cache, Optional.of(this.tierFetcherMetrics), firstFetchMetadata.segmentMetadata(), fetchCompletionCallback, targetOffset, maxBytes, firstFetchMetadata.segmentSize(), isolationLevel, ignoredTopicPartitions);
            }
            throw new IllegalStateException("TierFetcher is shutting down, request was not scheduled");
        }
        throw new IllegalStateException("No TierFetchMetadata supplied to TierFetcher fetch request");
    }

    public PendingFetch fetch(List<TierFetchMetadata> tierFetchMetadataList, IsolationLevel isolationLevel, Consumer<DelayedOperationKey> fetchCompletionCallback) {
        PendingFetch fetch = this.buildFetch(tierFetchMetadataList, isolationLevel, fetchCompletionCallback);
        this.executorService.execute(fetch);
        return fetch;
    }

    public PendingOffsetForTimestamp fetchOffsetForTimestamp(Map<TopicPartition, TierTimestampAndOffset> tierTimestampAndOffsets, Consumer<DelayedOperationKey> fetchCompletionCallback) {
        CancellationContext cancellationContext = this.cancellationContext.subContext();
        PendingOffsetForTimestamp pending = new PendingOffsetForTimestamp(cancellationContext, this.tierObjectStore, tierTimestampAndOffsets, Optional.of(this.tierFetcherMetrics), fetchCompletionCallback);
        this.executorService.execute(pending);
        return pending;
    }
}

