/*
 * Decompiled with CFR 0.152.
 */
package kafka.tier.fetcher;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Properties;
import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.IntStream;
import kafka.common.FetchedTimestampAndOffset;
import kafka.common.TierUnfetchedTimestampAndOffset;
import kafka.log.LogConfig;
import kafka.log.LogSegment;
import kafka.server.DelayedOperation;
import kafka.server.KafkaConfig;
import kafka.tier.TopicIdPartition;
import kafka.tier.exceptions.TierObjectStoreRetriableException;
import kafka.tier.fetcher.CancellationContext;
import kafka.tier.fetcher.MemoryTracker;
import kafka.tier.fetcher.PendingFetch;
import kafka.tier.fetcher.PendingOffsetForTimestamp;
import kafka.tier.fetcher.ReclaimableMemoryRecords;
import kafka.tier.fetcher.TierFetchMetadata;
import kafka.tier.fetcher.TierFetchResult;
import kafka.tier.fetcher.TierFetcher;
import kafka.tier.fetcher.TierFetcherConfig;
import kafka.tier.fetcher.offsetcache.FetchOffsetCache;
import kafka.tier.store.MockInMemoryTierObjectStore;
import kafka.tier.store.MockInMemoryTierObjectStoreConfig;
import kafka.tier.store.TierObjectStore;
import kafka.tier.store.TierObjectStoreResponse;
import kafka.tier.store.VersionInformation;
import kafka.utils.KafkaScheduler;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MemoryRecordsBuilder;
import org.apache.kafka.common.record.MutableRecordBatch;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.SimpleRecord;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.ByteBufferInputStream;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.test.TestUtils;
import org.easymock.EasyMock;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import scala.Option;
import scala.collection.JavaConverters;
import scala.compat.java8.OptionConverters;

public class TierFetcherTest {
    private MockTime mockTime = new MockTime();

    private boolean futureReady(long timeoutMs, CompletableFuture<?> future) {
        try {
            future.get(timeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (Exception ignored) {
            return false;
        }
        return true;
    }

    @Test
    public void tierFetcherCancellationUnblocksWaitingForMemory() throws InterruptedException {
        ByteBuffer combinedBuffer = this.getMemoryRecordsBuffer();
        MockedTierObjectStore tierObjectStore = new MockedTierObjectStore(combinedBuffer, ByteBuffer.allocate(0), ByteBuffer.allocate(0));
        TopicIdPartition topicIdPartition = new TopicIdPartition("foo", UUID.randomUUID(), 0);
        TierObjectStore.ObjectMetadata tierObjectMetadata = new TierObjectStore.ObjectMetadata(topicIdPartition, UUID.randomUUID(), 0, 0L, false, false, false, TierObjectStore.OpaqueData.ZEROED);
        Metrics metrics = new Metrics();
        KafkaScheduler kafkaScheduler = (KafkaScheduler)EasyMock.createNiceMock(KafkaScheduler.class);
        TierFetcherConfig tierFetcherConfig = new TierFetcherConfig(1, Integer.MAX_VALUE, Integer.MAX_VALUE, Integer.MAX_VALUE, Long.valueOf(1L));
        TierFetcher tierFetcher = new TierFetcher((Time)this.mockTime, tierFetcherConfig, (TierObjectStore)tierObjectStore, kafkaScheduler, metrics, new LogContext());
        tierFetcher.memoryTracker().newLease(CancellationContext.newContext(), 0x100000L);
        Assertions.assertFalse((boolean)tierFetcher.memoryTracker().tryLease(100L).isPresent(), (String)"expected tierfetcher to have less than zero bytes available in pool, further lease attempts should fail");
        int maxBytes = 600;
        TierFetchMetadata fetchMetadata = new TierFetchMetadata(topicIdPartition.topicPartition(), 0L, Integer.valueOf(maxBytes), 1000L, true, tierObjectMetadata, OptionConverters.toScala(Optional.empty()), 0L, 1000);
        CompletableFuture future = new CompletableFuture();
        PendingFetch pending = tierFetcher.fetch(new ArrayList<TierFetchMetadata>(Collections.singletonList(fetchMetadata)), IsolationLevel.READ_UNCOMMITTED, ignored -> future.complete(true), 0);
        Thread runThread = new Thread((Runnable)pending);
        Assertions.assertFalse((boolean)this.futureReady(100L, future), (String)"expected fetch to be blocked on memory allocation");
        pending.cancel();
        Assertions.assertTrue((boolean)this.futureReady(1000L, future), (String)"expected canceling the fetch to unblock the memory allocation");
        runThread.join();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void tierFetcherExceptionCausesOnComplete() throws Exception {
        ByteBuffer offsetIndexBuffer = ByteBuffer.allocate(0);
        ByteBuffer segmentFileBuffer = this.getMemoryRecordsBuffer();
        ByteBuffer timestampFileBuffer = ByteBuffer.allocate(0);
        MockedTierObjectStore tierObjectStore = new MockedTierObjectStore(segmentFileBuffer, offsetIndexBuffer, timestampFileBuffer);
        TopicIdPartition topicIdPartition = new TopicIdPartition("foo", UUID.randomUUID(), 0);
        TierObjectStore.ObjectMetadata tierObjectMetadata = new TierObjectStore.ObjectMetadata(topicIdPartition, UUID.randomUUID(), 0, 0L, false, false, false, TierObjectStore.OpaqueData.ZEROED);
        Metrics metrics = new Metrics();
        KafkaScheduler kafkaScheduler = (KafkaScheduler)EasyMock.createNiceMock(KafkaScheduler.class);
        try (TierFetcher tierFetcher = new TierFetcher((Time)this.mockTime, (TierObjectStore)tierObjectStore, kafkaScheduler, metrics);){
            int maxBytes = 600;
            TierFetchMetadata fetchMetadata = new TierFetchMetadata(topicIdPartition.topicPartition(), 0L, Integer.valueOf(maxBytes), 1000L, true, tierObjectMetadata, OptionConverters.toScala(Optional.empty()), 0L, 1000);
            CompletableFuture f = new CompletableFuture();
            tierObjectStore.failNextRequest();
            Assertions.assertEquals((double)0.0, (double)((Double)metrics.metric(tierFetcher.tierFetcherMetrics.bytesFetchedTotalMetricName).metricValue()), (double)0.0);
            Assertions.assertEquals((double)0.0, (double)((Double)metrics.metric(tierFetcher.tierFetcherMetrics.fetchExceptionTotalMetricName).metricValue()), (double)0.0);
            PendingFetch pending = tierFetcher.fetch(new ArrayList<TierFetchMetadata>(Collections.singletonList(fetchMetadata)), IsolationLevel.READ_UNCOMMITTED, ignored -> f.complete(true), 0);
            Assertions.assertTrue((boolean)((Boolean)f.get(2000L, TimeUnit.MILLISECONDS)));
            TierFetchResult result = (TierFetchResult)pending.finish().get(tierObjectMetadata.topicIdPartition().topicPartition());
            Assertions.assertEquals((Object)ReclaimableMemoryRecords.EMPTY, (Object)result.records, (String)"expected returned records to be empty due to exception thrown");
            Assertions.assertEquals((double)0.0, (double)((Double)metrics.metric(tierFetcher.tierFetcherMetrics.bytesFetchedTotalMetricName).metricValue()), (double)0.0);
            Assertions.assertEquals((double)1.0, (double)((Double)metrics.metric(tierFetcher.tierFetcherMetrics.fetchExceptionTotalMetricName).metricValue()), (double)0.0);
        }
        Assertions.assertEquals((long)0L, (long)tierFetcher.memoryTracker().leased(), (String)"expected zero leased bytes");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void tierFetcherFetchCancelled() throws Exception {
        ByteBuffer offsetIndexBuffer = ByteBuffer.allocate(0);
        ByteBuffer segmentFileBuffer = this.getMemoryRecordsBuffer();
        ByteBuffer timestampFileBuffer = ByteBuffer.allocate(0);
        MockedTierObjectStore tierObjectStore = new MockedTierObjectStore(segmentFileBuffer, offsetIndexBuffer, timestampFileBuffer);
        TopicIdPartition topicIdPartition = new TopicIdPartition("foo", UUID.randomUUID(), 0);
        TierObjectStore.ObjectMetadata tierObjectMetadata = new TierObjectStore.ObjectMetadata(topicIdPartition, UUID.randomUUID(), 0, 0L, false, false, false, TierObjectStore.OpaqueData.ZEROED);
        Metrics metrics = new Metrics();
        KafkaScheduler kafkaScheduler = (KafkaScheduler)EasyMock.createNiceMock(KafkaScheduler.class);
        try (TierFetcher tierFetcher = new TierFetcher((Time)this.mockTime, (TierObjectStore)tierObjectStore, kafkaScheduler, metrics);){
            int maxBytes = 600;
            TierFetchMetadata fetchMetadata = new TierFetchMetadata(topicIdPartition.topicPartition(), 0L, Integer.valueOf(maxBytes), 1000L, true, tierObjectMetadata, OptionConverters.toScala(Optional.empty()), 0L, 1000);
            CompletableFuture f = new CompletableFuture();
            Assertions.assertEquals((double)0.0, (double)((Double)metrics.metric(tierFetcher.tierFetcherMetrics.bytesFetchedTotalMetricName).metricValue()), (double)0.0);
            Assertions.assertEquals((double)0.0, (double)((Double)metrics.metric(tierFetcher.tierFetcherMetrics.fetchCancellationTotalMetricName).metricValue()), (double)0.0);
            PendingFetch pending = tierFetcher.fetch(new ArrayList<TierFetchMetadata>(Collections.singletonList(fetchMetadata)), IsolationLevel.READ_UNCOMMITTED, ignored -> f.complete(true), 0);
            pending.cancel();
            Assertions.assertTrue((boolean)((Boolean)f.get(2000L, TimeUnit.MILLISECONDS)));
            Map results = pending.finish();
            TierFetchResult result = (TierFetchResult)results.get(topicIdPartition.topicPartition());
            if (result.records.sizeInBytes() > 0) {
                Assertions.assertTrue(((Double)metrics.metric(tierFetcher.tierFetcherMetrics.bytesFetchedTotalMetricName).metricValue() > 0.0 ? 1 : 0) != 0);
            }
            if (result.exception == null) {
                Assertions.assertEquals((double)0.0, (double)((Double)metrics.metric(tierFetcher.tierFetcherMetrics.fetchExceptionTotalMetricName).metricValue()), (double)0.0);
            }
            pending.markFetchExpired();
            Assertions.assertEquals((double)1.0, (double)((Double)metrics.metric(tierFetcher.tierFetcherMetrics.fetchCancellationTotalMetricName).metricValue()), (double)0.0, (String)"expected 1 cancellation");
            result.records.release();
        }
        Assertions.assertEquals((long)0L, (long)tierFetcher.memoryTracker().leased(), (String)"expected zero leased bytes");
    }

    private ByteBuffer getMemoryRecordsBuffer() {
        ByteBuffer buffer = ByteBuffer.allocate(2048);
        ByteBuffer buffer2 = ByteBuffer.allocate(2048);
        MemoryRecordsBuilder builder = MemoryRecords.builder((ByteBuffer)buffer, (byte)2, (CompressionType)CompressionType.NONE, (TimestampType)TimestampType.CREATE_TIME, (long)0L);
        MemoryRecordsBuilder builder2 = MemoryRecords.builder((ByteBuffer)buffer2, (byte)2, (CompressionType)CompressionType.NONE, (TimestampType)TimestampType.CREATE_TIME, (long)0L);
        IntStream.range(0, 50).forEach(i -> builder.appendWithOffset((long)i, 1L, "a".getBytes(), "v".getBytes()));
        IntStream.range(50, 101).forEach(i -> builder2.appendWithOffset((long)i, 1L, "a".getBytes(), "v".getBytes()));
        builder.build();
        builder2.build();
        buffer.flip();
        buffer2.flip();
        ByteBuffer combinedBuffer = ByteBuffer.allocate(0x400000).put(buffer).put(buffer2);
        combinedBuffer.flip();
        return combinedBuffer;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void tierFetcherRequestEmptyIndexTest() throws Exception {
        ByteBuffer combinedBuffer = this.getMemoryRecordsBuffer();
        MockedTierObjectStore tierObjectStore = new MockedTierObjectStore(combinedBuffer, ByteBuffer.allocate(0), ByteBuffer.allocate(0));
        TopicIdPartition topicIdPartition = new TopicIdPartition("foo", UUID.randomUUID(), 0);
        TierObjectStore.ObjectMetadata tierObjectMetadata = new TierObjectStore.ObjectMetadata(topicIdPartition, UUID.randomUUID(), 0, 0L, false, false, false, TierObjectStore.OpaqueData.ZEROED);
        Metrics metrics = new Metrics();
        KafkaScheduler kafkaScheduler = (KafkaScheduler)EasyMock.createNiceMock(KafkaScheduler.class);
        try (TierFetcher tierFetcher = new TierFetcher((Time)this.mockTime, (TierObjectStore)tierObjectStore, kafkaScheduler, metrics);){
            TierFetchMetadata fetchMetadata = new TierFetchMetadata(topicIdPartition.topicPartition(), 0L, Integer.valueOf(10000), 1000L, true, tierObjectMetadata, OptionConverters.toScala(Optional.empty()), 0L, 1000);
            CompletableFuture f = new CompletableFuture();
            Assertions.assertEquals((Object)metrics.metric(tierFetcher.tierFetcherMetrics.bytesFetchedTotalMetricName).metricValue(), (Object)0.0);
            PendingFetch pending = tierFetcher.fetch(new ArrayList<TierFetchMetadata>(Collections.singletonList(fetchMetadata)), IsolationLevel.READ_UNCOMMITTED, ignored -> f.complete(true), 0);
            MockDelayedFetch delayedFetch = new MockDelayedFetch(pending);
            Assertions.assertTrue((boolean)((Boolean)f.get(2000L, TimeUnit.MILLISECONDS)));
            Map fetchResults = pending.finish();
            Assertions.assertNotNull((Object)fetchResults, (String)"expected non-null fetch result");
            Assertions.assertTrue(((Double)metrics.metric(tierFetcher.tierFetcherMetrics.bytesFetchedTotalMetricName).metricValue() > 0.0 ? 1 : 0) != 0);
            Assertions.assertTrue((boolean)delayedFetch.tryComplete());
            TierFetchResult fetchResult = (TierFetchResult)fetchResults.get(topicIdPartition.topicPartition());
            ReclaimableMemoryRecords records = fetchResult.records;
            long lastOffset = 0L;
            for (Record record : records.records()) {
                Assertions.assertEquals((long)record.offset(), (long)lastOffset, (String)"Offset not expected");
                ++lastOffset;
            }
            fetchResult.records.release();
        }
        Assertions.assertEquals((long)0L, (long)tierFetcher.memoryTracker().leased(), (String)"expected zero leased bytes");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void tierFetcherLocateTargetOffsetTest() throws Exception {
        MemoryRecords[] recordArr = new MemoryRecords[]{this.buildWithOffset(0L, 50), this.buildWithOffset(50L, 50), this.buildWithOffset(100L, 50), this.buildWithOffset(150L, 50), this.buildWithOffset(200L, 50)};
        int indexInterval = recordArr[0].sizeInBytes() + 1;
        File logSegmentDir = TestUtils.tempDirectory();
        Properties logProps = new Properties();
        logProps.put(LogConfig.IndexIntervalBytesProp(), (Object)indexInterval);
        Set override = Collections.emptySet();
        LogConfig logConfig = LogConfig.apply((Map)logProps, (scala.collection.immutable.Set)((scala.collection.mutable.Set)JavaConverters.asScalaSetConverter(override).asScala()).toSet());
        LogSegment logSegment = LogSegment.open((File)logSegmentDir, (long)0L, (LogConfig)logConfig, (Time)this.mockTime, (boolean)false, (int)4096, (boolean)false, (String)"");
        try {
            for (MemoryRecords records : recordArr) {
                long largestOffset = ((MutableRecordBatch)records.batches().iterator().next()).baseOffset();
                for (MutableRecordBatch r : records.batches()) {
                    largestOffset = r.lastOffset();
                }
                logSegment.append(largestOffset, records);
            }
            logSegment.flush();
            logSegment.offsetIndex().flush();
            logSegment.offsetIndex().trimToValidSize();
            long expectedEndOffset = logSegment.readNextOffset() - 1L;
            File offsetIndexFile = logSegment.offsetIndex().file();
            ByteBuffer offsetIndexBuffer = ByteBuffer.wrap(Files.readAllBytes(offsetIndexFile.toPath()));
            File timestampIndexFile = logSegment.offsetIndex().file();
            ByteBuffer timestampIndexBuffer = ByteBuffer.wrap(Files.readAllBytes(timestampIndexFile.toPath()));
            File segmentFile = logSegment.log().file();
            ByteBuffer segmentFileBuffer = ByteBuffer.wrap(Files.readAllBytes(segmentFile.toPath()));
            MockedTierObjectStore tierObjectStore = new MockedTierObjectStore(segmentFileBuffer, offsetIndexBuffer, timestampIndexBuffer);
            TopicIdPartition topicIdPartition = new TopicIdPartition("foo", UUID.randomUUID(), 0);
            UUID objectId = UUID.randomUUID();
            TierObjectStore.ObjectMetadata tierObjectMetadata = new TierObjectStore.ObjectMetadata(topicIdPartition, objectId, 0, 0L, false, false, false, TierObjectStore.OpaqueData.ZEROED);
            Metrics metrics = new Metrics();
            KafkaScheduler kafkaScheduler = (KafkaScheduler)EasyMock.createNiceMock(KafkaScheduler.class);
            try (TierFetcher tierFetcher = new TierFetcher((Time)this.mockTime, (TierObjectStore)tierObjectStore, kafkaScheduler, metrics);){
                int expectedCacheEntries = 0;
                long fetchOffset = 150L;
                while (fetchOffset < expectedEndOffset) {
                    TierFetchMetadata fetchMetadata = new TierFetchMetadata(topicIdPartition.topicPartition(), fetchOffset, Integer.valueOf(1000), 1000L, true, tierObjectMetadata, OptionConverters.toScala(Optional.empty()), 0L, segmentFileBuffer.limit());
                    CompletableFuture f = new CompletableFuture();
                    PendingFetch pending = tierFetcher.fetch(new ArrayList<TierFetchMetadata>(Collections.singletonList(fetchMetadata)), IsolationLevel.READ_UNCOMMITTED, ignored -> f.complete(true), 0);
                    MockDelayedFetch delayedFetch = new MockDelayedFetch(pending);
                    Assertions.assertTrue((boolean)((Boolean)f.get(4000L, TimeUnit.MILLISECONDS)));
                    Map fetchResults = pending.finish();
                    Assertions.assertNotNull((Object)fetchResults, (String)"expected non-null fetch result");
                    Assertions.assertTrue(((Double)metrics.metric(tierFetcher.tierFetcherMetrics.bytesFetchedTotalMetricName).metricValue() > 0.0 ? 1 : 0) != 0);
                    Assertions.assertTrue((boolean)delayedFetch.tryComplete());
                    TierFetchResult fetchResult = (TierFetchResult)fetchResults.get(topicIdPartition.topicPartition());
                    ReclaimableMemoryRecords records = fetchResult.records;
                    for (Record record : records.records()) {
                        Assertions.assertEquals((long)fetchOffset, (long)record.offset(), (String)"Offset not expected");
                        ++fetchOffset;
                    }
                    if (fetchOffset < expectedEndOffset) {
                        ++expectedCacheEntries;
                    }
                    long expected = expectedCacheEntries;
                    TestUtils.waitForCondition(() -> expected == tierFetcher.cache.size(), (String)"cache not updated by timeout");
                    fetchResult.records.release();
                }
                Assertions.assertEquals((long)(fetchOffset - 1L), (long)expectedEndOffset);
                Assertions.assertEquals((int)1, (int)tierObjectStore.offsetIndexReads, (String)"offset index should have been used exactly once, for the initial fetch");
            }
            Assertions.assertEquals((long)0L, (long)tierFetcher.memoryTracker().leased(), (String)"expected zero leased bytes");
        }
        finally {
            logSegment.deleteIfExists();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void tierFetcherRepeatedFetchesViaOffsetCacheTest(boolean isCacheEnabled) throws Exception {
        File logSegmentDir = TestUtils.tempDirectory();
        Properties logProps = new Properties();
        logProps.put(LogConfig.IndexIntervalBytesProp(), (Object)1);
        Set override = Collections.emptySet();
        LogConfig logConfig = LogConfig.apply((Map)logProps, (scala.collection.immutable.Set)((scala.collection.mutable.Set)JavaConverters.asScalaSetConverter(override).asScala()).toSet());
        LogSegment logSegment = LogSegment.open((File)logSegmentDir, (long)0L, (LogConfig)logConfig, (Time)this.mockTime, (boolean)false, (int)4096, (boolean)false, (String)"");
        try {
            TierFetcher tierFetcher;
            logSegment.append(logSegment.readNextOffset() + 49L, this.buildWithOffset(logSegment.readNextOffset(), 50));
            logSegment.flush();
            logSegment.append(logSegment.readNextOffset() + 49L, this.buildWithOffset(logSegment.readNextOffset(), 50));
            logSegment.flush();
            logSegment.append(logSegment.readNextOffset() + 49L, this.buildWithOffset(logSegment.readNextOffset(), 50));
            logSegment.flush();
            logSegment.append(logSegment.readNextOffset() + 49L, this.buildWithOffset(logSegment.readNextOffset(), 50));
            logSegment.flush();
            logSegment.offsetIndex().flush();
            logSegment.offsetIndex().trimToValidSize();
            long expectedEndOffset = logSegment.readNextOffset() - 1L;
            File offsetIndexFile = logSegment.offsetIndex().file();
            ByteBuffer offsetIndexBuffer = ByteBuffer.wrap(Files.readAllBytes(offsetIndexFile.toPath()));
            File timestampIndexFile = logSegment.offsetIndex().file();
            ByteBuffer timestampIndexBuffer = ByteBuffer.wrap(Files.readAllBytes(timestampIndexFile.toPath()));
            File segmentFile = logSegment.log().file();
            ByteBuffer segmentFileBuffer = ByteBuffer.wrap(Files.readAllBytes(segmentFile.toPath()));
            MockedTierObjectStore tierObjectStore = new MockedTierObjectStore(segmentFileBuffer, offsetIndexBuffer, timestampIndexBuffer);
            TopicIdPartition topicIdPartition = new TopicIdPartition("foo", UUID.randomUUID(), 0);
            UUID objectId = UUID.randomUUID();
            TierObjectStore.ObjectMetadata tierObjectMetadata = new TierObjectStore.ObjectMetadata(topicIdPartition, objectId, 0, 0L, false, false, false, TierObjectStore.OpaqueData.ZEROED);
            Metrics metrics = new Metrics();
            KafkaScheduler kafkaScheduler = (KafkaScheduler)EasyMock.createNiceMock(KafkaScheduler.class);
            if (isCacheEnabled) {
                tierFetcher = new TierFetcher((Time)this.mockTime, (TierObjectStore)tierObjectStore, kafkaScheduler, metrics);
            } else {
                HashMap<String, String> cfg = new HashMap<String, String>();
                cfg.put(KafkaConfig.ZkConnectProp(), "127.0.0.1:0000");
                cfg.put(KafkaConfig.TierFetcherOffsetCacheSizeProp(), "0");
                KafkaConfig kafkaConfig = new KafkaConfig(cfg);
                TierFetcherConfig config = new TierFetcherConfig(kafkaConfig);
                tierFetcher = new TierFetcher((Time)this.mockTime, config, (TierObjectStore)tierObjectStore, kafkaScheduler, metrics, new LogContext());
            }
            try {
                int expectedCacheEntries = 0;
                long fetchOffset = 0L;
                while (fetchOffset < expectedEndOffset) {
                    TierFetchMetadata fetchMetadata = new TierFetchMetadata(topicIdPartition.topicPartition(), fetchOffset, Integer.valueOf(1000), 1000L, true, tierObjectMetadata, OptionConverters.toScala(Optional.empty()), 0L, segmentFileBuffer.limit());
                    CompletableFuture f = new CompletableFuture();
                    PendingFetch pending = tierFetcher.fetch(new ArrayList<TierFetchMetadata>(Collections.singletonList(fetchMetadata)), IsolationLevel.READ_UNCOMMITTED, ignored -> f.complete(true), 0);
                    MockDelayedFetch delayedFetch = new MockDelayedFetch(pending);
                    Assertions.assertTrue((boolean)((Boolean)f.get(4000L, TimeUnit.MILLISECONDS)));
                    Map fetchResults = pending.finish();
                    Assertions.assertNotNull((Object)fetchResults, (String)"expected non-null fetch result");
                    Assertions.assertTrue(((Double)metrics.metric(tierFetcher.tierFetcherMetrics.bytesFetchedTotalMetricName).metricValue() > 0.0 ? 1 : 0) != 0);
                    Assertions.assertTrue((boolean)delayedFetch.tryComplete());
                    TierFetchResult fetchResult = (TierFetchResult)fetchResults.get(topicIdPartition.topicPartition());
                    ReclaimableMemoryRecords records = fetchResult.records;
                    for (Record record : records.records()) {
                        Assertions.assertEquals((long)fetchOffset, (long)record.offset(), (String)"Offset not expected");
                        ++fetchOffset;
                    }
                    if (isCacheEnabled) {
                        if (fetchOffset < expectedEndOffset) {
                            ++expectedCacheEntries;
                        }
                        long expected = expectedCacheEntries;
                        TestUtils.waitForCondition(() -> expected == tierFetcher.cache.size(), (String)"cache not updated by timeout");
                    }
                    fetchResult.records.release();
                }
                Assertions.assertEquals((long)(fetchOffset - 1L), (long)expectedEndOffset);
                if (isCacheEnabled) {
                    Assertions.assertEquals((double)1.0, (double)tierFetcher.cache.hitRatio(), (double)1.0E-4);
                    Assertions.assertEquals((int)0, (int)tierObjectStore.offsetIndexReads, (String)"offset index should not have been used");
                } else {
                    Assertions.assertEquals((long)0L, (long)tierFetcher.cache.size());
                    Assertions.assertEquals((double)0.0, (double)tierFetcher.cache.hitRatio(), (double)0.0);
                }
            }
            finally {
                tierFetcher.close();
            }
            Assertions.assertEquals((long)0L, (long)tierFetcher.memoryTracker().leased(), (String)"expected zero leased bytes");
        }
        finally {
            logSegment.deleteIfExists();
        }
    }

    @Test
    public void tierFetcherRepeatedFetchesWithDisabledOffsetCacheTest() throws Exception {
        this.tierFetcherRepeatedFetchesViaOffsetCacheTest(false);
    }

    @Test
    public void tierFetcherRepeatedFetchesWithEnabledOffsetCacheTest() throws Exception {
        this.tierFetcherRepeatedFetchesViaOffsetCacheTest(true);
    }

    private MemoryRecords buildWithOffset(long baseOffset, int numRecords) {
        ByteBuffer buffer = ByteBuffer.allocate(2048);
        MemoryRecordsBuilder builder = MemoryRecords.builder((ByteBuffer)buffer, (byte)2, (CompressionType)CompressionType.NONE, (TimestampType)TimestampType.CREATE_TIME, (long)baseOffset);
        IntStream.range(0, numRecords).forEach(i -> builder.appendWithOffset(baseOffset + (long)i, baseOffset + (long)i, "a".getBytes(), "v".getBytes()));
        return builder.build();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void tierFetcherIndexTest() throws Exception {
        File logSegmentDir = TestUtils.tempDirectory();
        Properties logProps = new Properties();
        logProps.put(LogConfig.IndexIntervalBytesProp(), (Object)1);
        Set override = Collections.emptySet();
        LogConfig logConfig = LogConfig.apply((Map)logProps, (scala.collection.immutable.Set)((scala.collection.mutable.Set)JavaConverters.asScalaSetConverter(override).asScala()).toSet());
        try (LogSegment logSegment = LogSegment.open((File)logSegmentDir, (long)0L, (LogConfig)logConfig, (Time)this.mockTime, (boolean)false, (int)4096, (boolean)false, (String)"");){
            logSegment.append(logSegment.readNextOffset() + 49L, this.buildWithOffset(logSegment.readNextOffset(), 50));
            logSegment.flush();
            logSegment.append(logSegment.readNextOffset() + 49L, this.buildWithOffset(logSegment.readNextOffset(), 50));
            logSegment.flush();
            logSegment.append(logSegment.readNextOffset() + 49L, this.buildWithOffset(logSegment.readNextOffset(), 50));
            logSegment.flush();
            logSegment.offsetIndex().flush();
            logSegment.offsetIndex().trimToValidSize();
            File offsetIndexFile = logSegment.offsetIndex().file();
            ByteBuffer offsetIndexBuffer = ByteBuffer.wrap(Files.readAllBytes(offsetIndexFile.toPath()));
            File timestampIndexFile = logSegment.offsetIndex().file();
            ByteBuffer timestampIndexBuffer = ByteBuffer.wrap(Files.readAllBytes(timestampIndexFile.toPath()));
            File segmentFile = logSegment.log().file();
            ByteBuffer segmentFileBuffer = ByteBuffer.wrap(Files.readAllBytes(segmentFile.toPath()));
            MockedTierObjectStore tierObjectStore = new MockedTierObjectStore(segmentFileBuffer, offsetIndexBuffer, timestampIndexBuffer);
            TopicIdPartition topicIdPartition = new TopicIdPartition("foo", UUID.randomUUID(), 0);
            TierObjectStore.ObjectMetadata tierObjectMetadata = new TierObjectStore.ObjectMetadata(topicIdPartition, UUID.randomUUID(), 0, 0L, false, false, false, TierObjectStore.OpaqueData.ZEROED);
            Metrics metrics = new Metrics();
            KafkaScheduler kafkaScheduler = (KafkaScheduler)EasyMock.createNiceMock(KafkaScheduler.class);
            try (TierFetcher tierFetcher = new TierFetcher((Time)this.mockTime, (TierObjectStore)tierObjectStore, kafkaScheduler, metrics);){
                TierFetchMetadata fetchMetadata = new TierFetchMetadata(topicIdPartition.topicPartition(), 100L, Integer.valueOf(10000), 1000L, true, tierObjectMetadata, OptionConverters.toScala(Optional.empty()), 0L, 1000);
                CompletableFuture f = new CompletableFuture();
                Assertions.assertEquals((Object)metrics.metric(tierFetcher.tierFetcherMetrics.bytesFetchedTotalMetricName).metricValue(), (Object)0.0);
                PendingFetch pending = tierFetcher.fetch(new ArrayList<TierFetchMetadata>(Collections.singletonList(fetchMetadata)), IsolationLevel.READ_UNCOMMITTED, ignored -> f.complete(true), 0);
                MockDelayedFetch delayedFetch = new MockDelayedFetch(pending);
                Assertions.assertTrue((boolean)((Boolean)f.get(2000L, TimeUnit.MILLISECONDS)));
                Map fetchResults = pending.finish();
                Assertions.assertNotNull((Object)fetchResults, (String)"expected non-null fetch result");
                Assertions.assertTrue(((Double)metrics.metric(tierFetcher.tierFetcherMetrics.bytesFetchedTotalMetricName).metricValue() > 0.0 ? 1 : 0) != 0);
                Assertions.assertTrue((boolean)delayedFetch.tryComplete());
                TierFetchResult fetchResult = (TierFetchResult)fetchResults.get(topicIdPartition.topicPartition());
                ReclaimableMemoryRecords records = fetchResult.records;
                long lastOffset = 100L;
                for (Record record : records.records()) {
                    Assertions.assertEquals((long)lastOffset, (long)record.offset(), (String)"Offset not expected");
                    ++lastOffset;
                }
                fetchResult.records.release();
            }
            Assertions.assertEquals((long)0L, (long)tierFetcher.memoryTracker().leased(), (String)"expected zero leased bytes");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void tierTimestampIndexTest() throws Exception {
        File logSegmentDir = TestUtils.tempDirectory();
        Properties logProps = new Properties();
        logProps.put(LogConfig.IndexIntervalBytesProp(), (Object)1);
        Set override = Collections.emptySet();
        LogConfig logConfig = LogConfig.apply((Map)logProps, (scala.collection.immutable.Set)((scala.collection.mutable.Set)JavaConverters.asScalaSetConverter(override).asScala()).toSet());
        try (LogSegment logSegment = LogSegment.open((File)logSegmentDir, (long)0L, (LogConfig)logConfig, (Time)this.mockTime, (boolean)false, (int)4096, (boolean)false, (String)"");){
            MemoryRecords records1 = this.buildWithOffset(logSegment.readNextOffset(), 50);
            long largestOffset1 = logSegment.readNextOffset() + 49L;
            logSegment.append(largestOffset1, records1);
            logSegment.flush();
            MemoryRecords records2 = this.buildWithOffset(logSegment.readNextOffset(), 50);
            long largestOffset2 = logSegment.readNextOffset() + 49L;
            logSegment.append(largestOffset2, records2);
            logSegment.flush();
            MemoryRecords records3 = this.buildWithOffset(logSegment.readNextOffset(), 50);
            long largestOffset3 = logSegment.readNextOffset() + 49L;
            logSegment.append(largestOffset3, records3);
            logSegment.flush();
            MemoryRecords records4 = this.buildWithOffset(logSegment.readNextOffset(), 50);
            long largestOffset4 = logSegment.readNextOffset() + 49L;
            logSegment.append(largestOffset4, records4);
            logSegment.offsetIndex().flush();
            logSegment.offsetIndex().trimToValidSize();
            logSegment.timeIndex().flush();
            logSegment.timeIndex().trimToValidSize();
            File offsetIndexFile = logSegment.offsetIndex().file();
            ByteBuffer offsetIndexBuffer = ByteBuffer.wrap(Files.readAllBytes(offsetIndexFile.toPath()));
            File timestampIndexFile = logSegment.timeIndex().file();
            ByteBuffer timestampIndexBuffer = ByteBuffer.wrap(Files.readAllBytes(timestampIndexFile.toPath()));
            File segmentFile = logSegment.log().file();
            ByteBuffer segmentFileBuffer = ByteBuffer.wrap(Files.readAllBytes(segmentFile.toPath()));
            MockedTierObjectStore tierObjectStore = new MockedTierObjectStore(segmentFileBuffer, offsetIndexBuffer, timestampIndexBuffer);
            TopicIdPartition topicIdPartition = new TopicIdPartition("foo", UUID.randomUUID(), 0);
            TierObjectStore.ObjectMetadata tierObjectMetadata = new TierObjectStore.ObjectMetadata(topicIdPartition, UUID.randomUUID(), 0, 0L, false, false, false, TierObjectStore.OpaqueData.ZEROED);
            Metrics metrics = new Metrics();
            KafkaScheduler kafkaScheduler = (KafkaScheduler)EasyMock.createNiceMock(KafkaScheduler.class);
            try (TierFetcher tierFetcher = new TierFetcher((Time)this.mockTime, (TierObjectStore)tierObjectStore, kafkaScheduler, metrics);){
                CompletableFuture f = new CompletableFuture();
                HashMap<TopicPartition, TierUnfetchedTimestampAndOffset> timestamps = new HashMap<TopicPartition, TierUnfetchedTimestampAndOffset>();
                timestamps.put(topicIdPartition.topicPartition(), new TierUnfetchedTimestampAndOffset(101L, tierObjectMetadata, segmentFileBuffer.limit()));
                PendingOffsetForTimestamp pending = tierFetcher.fetchOffsetForTimestamp(timestamps, ignored -> f.complete(true));
                f.get(2000L, TimeUnit.MILLISECONDS);
                Assertions.assertEquals(Optional.of(FetchedTimestampAndOffset.apply((long)101L, (long)101L, (Option)Option.empty())), pending.results().get(topicIdPartition.topicPartition()), (String)"incorrect offset for supplied timestamp returned");
                tierObjectStore.failNextRequest();
                f = new CompletableFuture();
                timestamps = new HashMap();
                timestamps.put(topicIdPartition.topicPartition(), new TierUnfetchedTimestampAndOffset(101L, tierObjectMetadata, segmentFileBuffer.limit()));
                pending = tierFetcher.fetchOffsetForTimestamp(timestamps, ignored -> f.complete(true));
                f.get(2000L, TimeUnit.MILLISECONDS);
                Assertions.assertNotNull((Object)((FetchedTimestampAndOffset)((Optional)pending.results().get(topicIdPartition.topicPartition())).get()).responseException(), (String)"tier object store through exception, pending result should have been completed exceptionally");
                Assertions.assertEquals((double)1.0, (double)((Double)metrics.metric(tierFetcher.tierFetcherMetrics.fetchOffsetForTimestampExceptionTotalMetricName).metricValue()), (double)0.0);
            }
            Assertions.assertEquals((long)0L, (long)tierFetcher.memoryTracker().leased(), (String)"expected zero leased bytes");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void tierFetcherMaxBytesTest() throws Exception {
        File logSegmentDir = TestUtils.tempDirectory();
        Properties logProps = new Properties();
        logProps.put(LogConfig.IndexIntervalBytesProp(), (Object)1);
        Set override = Collections.emptySet();
        LogConfig logConfig = LogConfig.apply((Map)logProps, (scala.collection.immutable.Set)((scala.collection.mutable.Set)JavaConverters.asScalaSetConverter(override).asScala()).toSet());
        try (LogSegment logSegment = LogSegment.open((File)logSegmentDir, (long)0L, (LogConfig)logConfig, (Time)this.mockTime, (boolean)false, (int)4096, (boolean)false, (String)"");){
            TreeMap<Integer, Long> bytesToOffset = new TreeMap<Integer, Long>();
            MemoryRecords toAppend = this.buildWithOffset(0L, 50);
            logSegment.append(49L, toAppend);
            bytesToOffset.put(logSegment.size(), 49L);
            toAppend = this.buildWithOffset(50L, 50);
            logSegment.append(99L, toAppend);
            bytesToOffset.put(logSegment.size(), 99L);
            toAppend = this.buildWithOffset(100L, 50);
            logSegment.append(149L, toAppend);
            bytesToOffset.put(logSegment.size(), 149L);
            logSegment.flush();
            logSegment.offsetIndex().flush();
            logSegment.offsetIndex().trimToValidSize();
            File offsetIndexFile = logSegment.offsetIndex().file();
            ByteBuffer offsetIndexBuffer = ByteBuffer.wrap(Files.readAllBytes(offsetIndexFile.toPath()));
            File timestampIndexFile = logSegment.timeIndex().file();
            ByteBuffer timestampIndexBuffer = ByteBuffer.wrap(Files.readAllBytes(timestampIndexFile.toPath()));
            File segmentFile = logSegment.log().file();
            ByteBuffer segmentFileBuffer = ByteBuffer.wrap(Files.readAllBytes(segmentFile.toPath()));
            MockedTierObjectStore tierObjectStore = new MockedTierObjectStore(segmentFileBuffer, offsetIndexBuffer, timestampIndexBuffer);
            TopicIdPartition topicIdPartition = new TopicIdPartition("foo", UUID.randomUUID(), 0);
            TopicPartition topicPartition = topicIdPartition.topicPartition();
            TierObjectStore.ObjectMetadata tierObjectMetadata = new TierObjectStore.ObjectMetadata(topicIdPartition, UUID.randomUUID(), 0, 0L, false, false, false, TierObjectStore.OpaqueData.ZEROED);
            Metrics metrics = new Metrics();
            KafkaScheduler kafkaScheduler = (KafkaScheduler)EasyMock.createNiceMock(KafkaScheduler.class);
            try (TierFetcher tierFetcher = new TierFetcher((Time)this.mockTime, (TierObjectStore)tierObjectStore, kafkaScheduler, metrics);){
                int maxBytes = 600;
                int[] overrideMaxBytesArray = new int[]{0, 400, 600, segmentFileBuffer.remaining() - 1, segmentFileBuffer.remaining(), segmentFileBuffer.remaining() + 1};
                TierFetchMetadata fetchMetadata = new TierFetchMetadata(topicIdPartition.topicPartition(), 0L, Integer.valueOf(maxBytes), 1000L, true, tierObjectMetadata, OptionConverters.toScala(Optional.empty()), 0L, 1000);
                Assertions.assertEquals((Object)metrics.metric(tierFetcher.tierFetcherMetrics.bytesFetchedTotalMetricName).metricValue(), (Object)0.0);
                for (int overrideMaxBytes : overrideMaxBytesArray) {
                    CompletableFuture f = new CompletableFuture();
                    PendingFetch pending = tierFetcher.fetch(new ArrayList<TierFetchMetadata>(Collections.singletonList(fetchMetadata)), IsolationLevel.READ_UNCOMMITTED, ignored -> f.complete(true), overrideMaxBytes);
                    MockDelayedFetch delayedFetch = new MockDelayedFetch(pending);
                    Assertions.assertTrue((boolean)((Boolean)f.get(2000L, TimeUnit.MILLISECONDS)));
                    Map fetchResults = pending.finish();
                    Assertions.assertNotNull((Object)fetchResults, (String)"expected non-null fetch result");
                    Assertions.assertTrue(((Double)metrics.metric(tierFetcher.tierFetcherMetrics.bytesFetchedTotalMetricName).metricValue() > 0.0 ? 1 : 0) != 0);
                    Assertions.assertTrue((boolean)delayedFetch.tryComplete());
                    TierFetchResult fetchResult = (TierFetchResult)fetchResults.get(topicPartition);
                    ReclaimableMemoryRecords records = fetchResult.records;
                    int expectedFetchBytes = Math.min(Math.max(maxBytes, overrideMaxBytes), segmentFileBuffer.remaining());
                    Assertions.assertTrue((fetchResult.records.sizeInBytes() <= expectedFetchBytes ? 1 : 0) != 0);
                    long currentOffset = -1L;
                    for (Record record : records.records()) {
                        Assertions.assertEquals((long)record.offset(), (long)(++currentOffset), (String)"Offset not expected");
                    }
                    long expectedLastOffset = (Long)bytesToOffset.floorEntry(expectedFetchBytes).getValue();
                    Assertions.assertEquals((long)expectedLastOffset, (long)currentOffset, (String)("Unexpected lastOffset for overrideMaxBytes " + overrideMaxBytes));
                    fetchResult.records.release();
                }
            }
            Assertions.assertEquals((long)0L, (long)tierFetcher.memoryTracker().leased(), (String)"expected zero leased bytes");
        }
    }

    @Test
    public void testTierObjectStoreExceptionReleasesLease() {
        CancellationContext ctx = CancellationContext.newContext();
        ByteBuffer combinedBuffer = this.getMemoryRecordsBuffer();
        MockedTierObjectStore tierObjectStore = new MockedTierObjectStore(combinedBuffer, ByteBuffer.allocate(0), ByteBuffer.allocate(0));
        FetchOffsetCache fetchOffsetCache = new FetchOffsetCache((Time)this.mockTime, 0, 0);
        TopicIdPartition topicIdPartition = new TopicIdPartition("foo", UUID.randomUUID(), 0);
        TierObjectStore.ObjectMetadata objectMetadata = new TierObjectStore.ObjectMetadata(topicIdPartition, UUID.randomUUID(), 0, 0L, false, false, false, TierObjectStore.OpaqueData.ZEROED);
        MemoryTracker memoryTracker = new MemoryTracker((Time)this.mockTime, 1L);
        PendingFetch pendingFetch = new PendingFetch(ctx, (TierObjectStore)tierObjectStore, fetchOffsetCache, Optional.empty(), objectMetadata, ignored -> {}, 0L, 1024, 1024, IsolationLevel.READ_UNCOMMITTED, memoryTracker, Collections.emptyList(), (Time)this.mockTime);
        tierObjectStore.failNextRequest();
        pendingFetch.run();
        Map results = pendingFetch.finish();
        TierFetchResult tierFetchResult = (TierFetchResult)results.get(topicIdPartition.topicPartition());
        Assertions.assertNotNull((Object)tierFetchResult.exception, (String)"expected fetch to return an exception");
        Assertions.assertEquals((long)0L, (long)memoryTracker.leased(), (String)"expected all memory to be returned to the memory tracker");
    }

    @Test
    public void memoryLeaseWithKnownFirstBatchSize() throws InterruptedException {
        CancellationContext ctx = CancellationContext.newContext();
        ByteBuffer combinedBuffer = this.getMemoryRecordsBuffer();
        MockedTierObjectStore tierObjectStore = new MockedTierObjectStore(combinedBuffer, ByteBuffer.allocate(0), ByteBuffer.allocate(0));
        FetchOffsetCache fetchOffsetCache = new FetchOffsetCache((Time)this.mockTime, Integer.MAX_VALUE, Integer.MAX_VALUE);
        TopicIdPartition topicIdPartition = new TopicIdPartition("foo", UUID.randomUUID(), 0);
        TierObjectStore.ObjectMetadata objectMetadata = new TierObjectStore.ObjectMetadata(topicIdPartition, UUID.randomUUID(), 0, 0L, false, false, false, TierObjectStore.OpaqueData.ZEROED);
        int firstBatchSize = 2048;
        long memoryTrackerCapacity = 1024L;
        fetchOffsetCache.put(objectMetadata.objectId(), 1L, 0, OptionalInt.of(firstBatchSize));
        MemoryTracker memoryTracker = new MemoryTracker((Time)this.mockTime, memoryTrackerCapacity);
        MemoryTracker.MemoryLease firstLease = memoryTracker.newLease(ctx, memoryTrackerCapacity * 2L);
        PendingFetch pendingFetch = new PendingFetch(ctx, (TierObjectStore)tierObjectStore, fetchOffsetCache, Optional.empty(), objectMetadata, ignored -> {}, 1L, 1024, 1024, IsolationLevel.READ_UNCOMMITTED, memoryTracker, Collections.emptyList(), (Time)this.mockTime);
        Thread running = new Thread((Runnable)pendingFetch);
        running.start();
        Assertions.assertFalse((boolean)pendingFetch.isComplete(), (String)"expected pending fetch to be blocked on memory allocation");
        firstLease.release();
        memoryTracker.wakeup();
        running.join();
        Map results = pendingFetch.finish();
        TierFetchResult tierFetchResult = (TierFetchResult)results.get(topicIdPartition.topicPartition());
        Assertions.assertEquals((long)firstBatchSize, (long)memoryTracker.leased(), (String)"expected exactly firstBatchSize bytes to be leased");
        tierFetchResult.records.release();
        Assertions.assertEquals((long)0L, (long)memoryTracker.leased(), (String)"expected releasing the records returns leased memory to the MemoryTracker");
    }

    @Test
    public void testResizeTierFetcherMemoryPoolDynamically() {
        HashMap<String, String> cfg = new HashMap<String, String>();
        cfg.put(KafkaConfig.ZkConnectProp(), "127.0.0.1:0000");
        cfg.put(KafkaConfig.TierFetcherMemoryPoolSizeBytesProp(), "1024");
        KafkaConfig oldConfig = new KafkaConfig(cfg);
        TierFetcherConfig config = new TierFetcherConfig(oldConfig);
        KafkaScheduler kafkaScheduler = (KafkaScheduler)EasyMock.createNiceMock(KafkaScheduler.class);
        TierFetcher tierFetcher = new TierFetcher((Time)this.mockTime, config, (TierObjectStore)new MockInMemoryTierObjectStore((Time)this.mockTime, new MockInMemoryTierObjectStoreConfig()), kafkaScheduler, new Metrics(), new LogContext());
        Assertions.assertTrue((boolean)tierFetcher.reconfigurableConfigs().contains((Object)KafkaConfig.TierFetcherMemoryPoolSizeBytesProp()), (String)"expected TierFetcher memory pool size to be reconfigurable");
        Assertions.assertEquals((long)tierFetcher.memoryTracker().poolSize(), (long)1024L, (String)"expected TierFetcher memory pool size to match what was set originally");
        cfg.put(KafkaConfig.TierFetcherMemoryPoolSizeBytesProp(), "0");
        tierFetcher.reconfigure(null, new KafkaConfig(cfg));
        Assertions.assertEquals((long)tierFetcher.memoryTracker().poolSize(), (long)0L, (String)"expected TierFetcher memory pool size to be updated to the new size");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testTierFetchTotalTimeMs() throws Exception {
        SimpleRecord[] simpleRecords = new SimpleRecord[]{new SimpleRecord(1L, "foo".getBytes(), "1".getBytes()), new SimpleRecord(2L, "b".getBytes(), "2".getBytes()), new SimpleRecord(3L, "c".getBytes(), "3".getBytes())};
        ByteBuffer buffer = MemoryRecords.withRecords((byte)2, (long)0L, (CompressionType)CompressionType.NONE, (TimestampType)TimestampType.CREATE_TIME, (SimpleRecord[])simpleRecords).buffer();
        File file = TestUtils.tempFile();
        FileChannel channel = new FileOutputStream(file, false).getChannel();
        channel.write(buffer);
        channel.close();
        MockInMemoryTierObjectStore tierObjectStore = (MockInMemoryTierObjectStore)Mockito.mock(MockInMemoryTierObjectStore.class);
        ((MockInMemoryTierObjectStore)Mockito.doAnswer((Answer)new Answer<TierObjectStoreResponse>(){

            public TierObjectStoreResponse answer(InvocationOnMock invocation) throws Throwable {
                return (TierObjectStoreResponse)invocation.callRealMethod();
            }
        }).when((Object)tierObjectStore)).getObject((TierObjectStore.ObjectStoreMetadata)ArgumentMatchers.any(), (TierObjectStore.FileType)ArgumentMatchers.any(), (Integer)ArgumentMatchers.any());
        ((MockInMemoryTierObjectStore)Mockito.doAnswer((Answer)new Answer<Void>(){

            public Void answer(InvocationOnMock invocation) throws Throwable {
                invocation.callRealMethod();
                return null;
            }
        }).when((Object)tierObjectStore)).putObject((TierObjectStore.ObjectStoreMetadata)ArgumentMatchers.any(), (File)ArgumentMatchers.any(), (TierObjectStore.FileType)ArgumentMatchers.any());
        TopicIdPartition topicIdPartition = new TopicIdPartition("foo", UUID.randomUUID(), 0);
        TierObjectStore.ObjectMetadata tierObjectMetadata = new TierObjectStore.ObjectMetadata(topicIdPartition, UUID.randomUUID(), 0, 0L, false, false, false, TierObjectStore.OpaqueData.ZEROED);
        tierObjectStore.putObject((TierObjectStore.ObjectStoreMetadata)tierObjectMetadata, file, TierObjectStore.FileType.SEGMENT);
        Metrics metrics = new Metrics();
        KafkaScheduler kafkaScheduler = (KafkaScheduler)EasyMock.createNiceMock(KafkaScheduler.class);
        int maxBytes = buffer.array().length;
        try (TierFetcher tierFetcher = new TierFetcher((Time)this.mockTime, (TierObjectStore)tierObjectStore, kafkaScheduler, metrics);){
            TierFetchMetadata fetchMetadata = new TierFetchMetadata(topicIdPartition.topicPartition(), 0L, Integer.valueOf(maxBytes), 1000L, true, tierObjectMetadata, OptionConverters.toScala(Optional.empty()), 0L, 1000);
            for (int i = 0; i < 100; ++i) {
                int sleepTimeMs = i < 89 ? 100 : (i < 98 ? 500 : 1000);
                final int finalSleepTimeMs = sleepTimeMs;
                ((MockInMemoryTierObjectStore)Mockito.doAnswer((Answer)new Answer<TierObjectStoreResponse>(){

                    public TierObjectStoreResponse answer(InvocationOnMock invocation) throws Throwable {
                        TierFetcherTest.this.mockTime.sleep((long)finalSleepTimeMs);
                        return (TierObjectStoreResponse)invocation.callRealMethod();
                    }
                }).when((Object)tierObjectStore)).getObject((TierObjectStore.ObjectStoreMetadata)ArgumentMatchers.any(), (TierObjectStore.FileType)ArgumentMatchers.any(), (Integer)ArgumentMatchers.any(), (Integer)ArgumentMatchers.any());
                CompletableFuture future = new CompletableFuture();
                PendingFetch pending = tierFetcher.fetch(new ArrayList<TierFetchMetadata>(Collections.singletonList(fetchMetadata)), IsolationLevel.READ_UNCOMMITTED, ignored -> future.complete(true), 0);
                this.futureReady(2000L, future);
                Map fetchResults = pending.finish();
                Assertions.assertNotNull((Object)fetchResults, (String)"expected non-null fetch result");
            }
            Assertions.assertEquals((double)100.0, (double)((Double)metrics.metric(tierFetcher.tierFetcherMetrics.fetchTotalTimeMs50PercentileMetricName).metricValue()), (double)0.001);
            Assertions.assertEquals((double)500.0, (double)((Double)metrics.metric(tierFetcher.tierFetcherMetrics.fetchTotalTimeMs90PercentileMetricName).metricValue()), (double)0.001);
            Assertions.assertEquals((double)1000.0, (double)((Double)metrics.metric(tierFetcher.tierFetcherMetrics.fetchTotalTimeMs99PercentileMetricName).metricValue()), (double)0.001);
        }
        Assertions.assertEquals((long)0L, (long)tierFetcher.memoryTracker().leased(), (String)"expected zero leased bytes");
    }

    class MockedTierObjectStore
    implements TierObjectStore {
        private final ByteBuffer segmentByteBuffer;
        private final ByteBuffer offsetByteBuffer;
        private final ByteBuffer timestampByteBuffer;
        private final AtomicBoolean failNextRequest = new AtomicBoolean(false);
        int segmentReads = 0;
        int offsetIndexReads = 0;
        int timestampIndexReads = 0;

        MockedTierObjectStore(ByteBuffer segmentByteBuffer, ByteBuffer indexByteBuffer, ByteBuffer timestampByteBuffer) {
            this.segmentByteBuffer = segmentByteBuffer;
            this.offsetByteBuffer = indexByteBuffer;
            this.timestampByteBuffer = timestampByteBuffer;
        }

        void failNextRequest() {
            this.failNextRequest.set(true);
        }

        public void close() {
        }

        public TierObjectStore.Backend getBackend() {
            return TierObjectStore.Backend.Mock;
        }

        public Map<String, List<VersionInformation>> listObject(String keyPrefix, boolean getVersionInfo) {
            return null;
        }

        public TierObjectStoreResponse getObject(TierObjectStore.ObjectStoreMetadata objectMetadata, TierObjectStore.FileType fileType, Integer byteOffset, Integer byteOffsetEnd) throws IOException {
            ByteBuffer buffer;
            if (this.failNextRequest.compareAndSet(true, false)) {
                throw new IOException("Failed to retrieve object.");
            }
            if (fileType == TierObjectStore.FileType.OFFSET_INDEX) {
                ++this.offsetIndexReads;
                buffer = this.offsetByteBuffer;
            } else if (fileType == TierObjectStore.FileType.SEGMENT) {
                ++this.segmentReads;
                buffer = this.segmentByteBuffer;
            } else if (fileType == TierObjectStore.FileType.TIMESTAMP_INDEX) {
                ++this.timestampIndexReads;
                buffer = this.timestampByteBuffer;
            } else {
                throw new UnsupportedOperationException();
            }
            int start = byteOffset == null ? 0 : byteOffset;
            int end = byteOffsetEnd == null ? buffer.limit() : Math.min(byteOffsetEnd, buffer.limit());
            int byteBufferSize = Math.min(end - start, buffer.array().length);
            ByteBuffer buf = ByteBuffer.allocate(byteBufferSize);
            buf.put(buffer.array(), start, byteBufferSize);
            buf.flip();
            return new MockTierObjectStoreResponse((InputStream)new ByteBufferInputStream(buf));
        }

        public boolean objectExists(TierObjectStore.ObjectMetadata objectMetadata, TierObjectStore.FileType type) throws IOException, TierObjectStoreRetriableException {
            return this.segmentByteBuffer.limit() > 0;
        }

        public TierObjectStore.OpaqueData prepPutSegment() throws TierObjectStoreRetriableException, IOException {
            return TierObjectStore.OpaqueData.ZEROED;
        }

        public void putSegment(TierObjectStore.ObjectMetadata objectMetadata, File segmentData, File offsetIndexData, File timestampIndexData, Optional<File> producerStateSnapshotData, Optional<ByteBuffer> transactionIndexData, Optional<ByteBuffer> epochState) {
            throw new UnsupportedOperationException();
        }

        public void putInMemorySegment(TierObjectStore.ObjectMetadata objectMetadata, File segmentData, File offsetIndexData, File timestampIndexData, Optional<ByteBuffer> producerStateSnapshotData, Optional<ByteBuffer> transactionIndexData, Optional<ByteBuffer> epochState) {
            throw new UnsupportedOperationException();
        }

        public void putObject(TierObjectStore.ObjectStoreMetadata objectMetadata, File file, TierObjectStore.FileType fileType) throws IOException {
            throw new IOException("");
        }

        public void restoreObjectByCopy(TierObjectStore.ObjectMetadata objectMetadata, String key, VersionInformation lastLiveVersion) {
        }

        public void putBuf(String key, Map<String, String> metadata, ByteBuffer buf) throws IOException {
            throw new IOException("");
        }

        public void deleteSegment(TierObjectStore.ObjectMetadata objectMetadata) {
        }

        public void deleteVersions(List<TierObjectStore.KeyAndVersion> keys) {
        }

        class MockTierObjectStoreResponse
        implements TierObjectStoreResponse {
            private final InputStream is;

            MockTierObjectStoreResponse(InputStream is) {
                this.is = is;
            }

            public InputStream getInputStream() {
                return this.is;
            }

            public void close() {
            }
        }
    }

    static class MockDelayedFetch
    extends DelayedOperation {
        PendingFetch fetch;

        MockDelayedFetch(PendingFetch fetch) {
            super(0L, OptionConverters.toScala(Optional.empty()));
            this.fetch = fetch;
        }

        public void onExpiration() {
        }

        public void onComplete() {
            this.fetch.finish();
        }

        public boolean tryComplete() {
            if (this.fetch.isComplete()) {
                return this.forceComplete();
            }
            return false;
        }
    }
}

