package kafka.log.remote;

import com.yammer.metrics.core.Gauge;
import com.yammer.metrics.core.MetricName;
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.nio.file.attribute.FileAttribute;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Properties;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import kafka.cluster.EndPoint;
import kafka.cluster.Partition;
import kafka.log.UnifiedLog;
import kafka.log.remote.RemoteLogManager;
import kafka.log.remote.quota.RLMQuotaManager;
import kafka.log.remote.quota.RLMQuotaManagerConfig;
import kafka.server.BrokerTopicStats;
import kafka.server.KafkaConfig;
import kafka.server.StopPartition;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.compress.Compression;
import org.apache.kafka.common.errors.ReplicaNotAvailableException;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.record.FileRecords;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.RemoteLogInputStream;
import org.apache.kafka.common.record.SimpleRecord;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.common.OffsetAndEpoch;
import org.apache.kafka.server.log.remote.storage.ClassLoaderAwareRemoteStorageManager;
import org.apache.kafka.server.log.remote.storage.LogSegmentData;
import org.apache.kafka.server.log.remote.storage.NoOpRemoteLogMetadataManager;
import org.apache.kafka.server.log.remote.storage.NoOpRemoteStorageManager;
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig;
import org.apache.kafka.server.log.remote.storage.RemoteLogMetadataManager;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
import org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics;
import org.apache.kafka.server.metrics.KafkaMetricsGroup;
import org.apache.kafka.server.metrics.KafkaYammerMetrics;
import org.apache.kafka.server.util.MockScheduler;
import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile;
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache;
import org.apache.kafka.storage.internals.log.EpochEntry;
import org.apache.kafka.storage.internals.log.FetchDataInfo;
import org.apache.kafka.storage.internals.log.FetchIsolation;
import org.apache.kafka.storage.internals.log.LazyIndex;
import org.apache.kafka.storage.internals.log.LogConfig;
import org.apache.kafka.storage.internals.log.LogDirFailureChannel;
import org.apache.kafka.storage.internals.log.LogFileUtils;
import org.apache.kafka.storage.internals.log.LogSegment;
import org.apache.kafka.storage.internals.log.OffsetIndex;
import org.apache.kafka.storage.internals.log.ProducerStateManager;
import org.apache.kafka.storage.internals.log.RemoteStorageFetchInfo;
import org.apache.kafka.storage.internals.log.TimeIndex;
import org.apache.kafka.storage.internals.log.TransactionIndex;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.InOrder;
import org.mockito.MockedConstruction;
import org.mockito.Mockito;
import org.opentest4j.AssertionFailedError;
import scala.Option;
import scala.collection.JavaConverters;

/* loaded from: input_file:kafka/log/remote/RemoteLogManagerTest.class */
public class RemoteLogManagerTest {
    private KafkaConfig config;
    private LeaderEpochCheckpointFile checkpoint;
    private final Time time = new MockTime();
    private final int brokerId = 0;
    private final String logDir = TestUtils.tempDirectory("kafka-").toString();
    private final String clusterId = "dummyId";
    private final String remoteLogStorageTestProp = "remote.log.storage.test";
    private final String remoteLogStorageTestVal = "storage.test";
    private final String remoteLogMetadataTestProp = "remote.log.metadata.test";
    private final String remoteLogMetadataTestVal = "metadata.test";
    private final String remoteLogMetadataCommonClientTestProp = "remote.log.metadata.common.client.common.client.test";
    private final String remoteLogMetadataCommonClientTestVal = "common.test";
    private final String remoteLogMetadataProducerTestProp = "remote.log.metadata.producer.producer.test";
    private final String remoteLogMetadataProducerTestVal = "producer.test";
    private final String remoteLogMetadataConsumerTestProp = "remote.log.metadata.consumer.consumer.test";
    private final String remoteLogMetadataConsumerTestVal = "consumer.test";
    private final String remoteLogMetadataTopicPartitionsNum = "1";
    private final RemoteStorageManager remoteStorageManager = (RemoteStorageManager) Mockito.mock(RemoteStorageManager.class);
    private final RemoteLogMetadataManager remoteLogMetadataManager = (RemoteLogMetadataManager) Mockito.mock(RemoteLogMetadataManager.class);
    private final RLMQuotaManager rlmCopyQuotaManager = (RLMQuotaManager) Mockito.mock(RLMQuotaManager.class);
    private BrokerTopicStats brokerTopicStats = null;
    private final Metrics metrics = new Metrics(this.time);
    private RemoteLogManager remoteLogManager = null;
    private final TopicIdPartition leaderTopicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("Leader", 0));
    private final String leaderTopic = "Leader";
    private final TopicIdPartition followerTopicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("Follower", 0));
    private final Map<String, Uuid> topicIds = new HashMap();
    private final TopicPartition tp = new TopicPartition("TestTopic", 5);
    private final EpochEntry epochEntry0 = new EpochEntry(0, 0);
    private final EpochEntry epochEntry1 = new EpochEntry(1, 100);
    private final EpochEntry epochEntry2 = new EpochEntry(2, 200);
    private final List<EpochEntry> totalEpochEntries = Arrays.asList(this.epochEntry0, this.epochEntry1, this.epochEntry2);
    private final AtomicLong currentLogStartOffset = new AtomicLong(0);
    private UnifiedLog mockLog = (UnifiedLog) Mockito.mock(UnifiedLog.class);
    private final MockScheduler scheduler = new MockScheduler(this.time);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: kafka.log.remote.RemoteLogManagerTest$12, reason: invalid class name */
    /* loaded from: input_file:kafka/log/remote/RemoteLogManagerTest$12.class */
    public static /* synthetic */ class AnonymousClass12 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$kafka$server$log$remote$storage$RemoteStorageManager$IndexType = new int[RemoteStorageManager.IndexType.values().length];

        static {
            try {
                $SwitchMap$org$apache$kafka$server$log$remote$storage$RemoteStorageManager$IndexType[RemoteStorageManager.IndexType.OFFSET.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$kafka$server$log$remote$storage$RemoteStorageManager$IndexType[RemoteStorageManager.IndexType.TIMESTAMP.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$kafka$server$log$remote$storage$RemoteStorageManager$IndexType[RemoteStorageManager.IndexType.TRANSACTION.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    @BeforeEach
    void setUp() throws Exception {
        this.checkpoint = new LeaderEpochCheckpointFile(TestUtils.tempFile(), new LogDirFailureChannel(1));
        this.topicIds.put(this.leaderTopicIdPartition.topicPartition().topic(), this.leaderTopicIdPartition.topicId());
        this.topicIds.put(this.followerTopicIdPartition.topicPartition().topic(), this.followerTopicIdPartition.topicId());
        Properties createDummyBrokerConfig = kafka.utils.TestUtils.createDummyBrokerConfig();
        createDummyBrokerConfig.setProperty("remote.log.storage.system.enable", "true");
        createDummyBrokerConfig.setProperty("remote.log.manager.task.interval.ms", "100");
        createRLMConfig(createDummyBrokerConfig);
        this.config = KafkaConfig.fromProps(createDummyBrokerConfig);
        this.brokerTopicStats = new BrokerTopicStats(KafkaConfig.fromProps(createDummyBrokerConfig).remoteLogManagerConfig().isRemoteStorageSystemEnabled());
        this.remoteLogManager = new RemoteLogManager(this.config, 0, this.logDir, "dummyId", this.time, topicPartition -> {
            return Optional.of(this.mockLog);
        }, (topicPartition2, l) -> {
            this.currentLogStartOffset.set(l.longValue());
        }, this.brokerTopicStats, this.metrics) { // from class: kafka.log.remote.RemoteLogManagerTest.1
            public RemoteStorageManager createRemoteStorageManager() {
                return RemoteLogManagerTest.this.remoteStorageManager;
            }

            public RemoteLogMetadataManager createRemoteLogMetadataManager() {
                return RemoteLogManagerTest.this.remoteLogMetadataManager;
            }

            public RLMQuotaManager createRLMCopyQuotaManager() {
                return RemoteLogManagerTest.this.rlmCopyQuotaManager;
            }

            public Duration quotaTimeout() {
                return Duration.ofMillis(100L);
            }

            long findLogStartOffset(TopicIdPartition topicIdPartition, UnifiedLog unifiedLog) {
                return 0L;
            }
        };
    }

    @AfterEach
    void tearDown() {
        if (this.remoteLogManager != null) {
            this.remoteLogManager.close();
            this.remoteLogManager = null;
        }
        kafka.utils.TestUtils.clearYammerMetrics();
    }

    @Test
    void testGetLeaderEpochCheckpoint() {
        this.checkpoint.write(this.totalEpochEntries);
        Mockito.when(this.mockLog.leaderEpochCache()).thenReturn(Option.apply(new LeaderEpochFileCache(this.tp, this.checkpoint, this.scheduler)));
        Assertions.assertEquals(this.totalEpochEntries, this.remoteLogManager.getLeaderEpochEntries(this.mockLog, 0L, 300L));
        List leaderEpochEntries = this.remoteLogManager.getLeaderEpochEntries(this.mockLog, 100L, 200L);
        Assertions.assertEquals(1, leaderEpochEntries.size());
        Assertions.assertEquals(this.epochEntry1, leaderEpochEntries.get(0));
    }

    @Test
    void testFindHighestRemoteOffsetOnEmptyRemoteStorage() throws RemoteStorageException {
        this.checkpoint.write(Arrays.asList(new EpochEntry(0, 0L), new EpochEntry(1, 500L)));
        Mockito.when(this.mockLog.leaderEpochCache()).thenReturn(Option.apply(new LeaderEpochFileCache(this.tp, this.checkpoint, this.scheduler)));
        Assertions.assertEquals(new OffsetAndEpoch(-1L, -1), this.remoteLogManager.findHighestRemoteOffset(new TopicIdPartition(Uuid.randomUuid(), this.tp), this.mockLog));
    }

    @Test
    void testFindHighestRemoteOffset() throws RemoteStorageException {
        this.checkpoint.write(Arrays.asList(new EpochEntry(0, 0L), new EpochEntry(1, 500L)));
        Mockito.when(this.mockLog.leaderEpochCache()).thenReturn(Option.apply(new LeaderEpochFileCache(this.tp, this.checkpoint, this.scheduler)));
        TopicIdPartition topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), this.tp);
        Mockito.when(this.remoteLogMetadataManager.highestOffsetForEpoch((TopicIdPartition) ArgumentMatchers.eq(topicIdPartition), ArgumentMatchers.anyInt())).thenAnswer(invocationOnMock -> {
            return ((Integer) invocationOnMock.getArgument(1, Integer.class)).intValue() == 0 ? Optional.of(200L) : Optional.empty();
        });
        Assertions.assertEquals(new OffsetAndEpoch(200L, 0), this.remoteLogManager.findHighestRemoteOffset(topicIdPartition, this.mockLog));
    }

    @Test
    void testFindHighestRemoteOffsetWithUncleanLeaderElection() throws RemoteStorageException {
        this.checkpoint.write(Arrays.asList(new EpochEntry(0, 0L), new EpochEntry(1, 150L), new EpochEntry(2, 300L)));
        Mockito.when(this.mockLog.leaderEpochCache()).thenReturn(Option.apply(new LeaderEpochFileCache(this.tp, this.checkpoint, this.scheduler)));
        TopicIdPartition topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), this.tp);
        Mockito.when(this.remoteLogMetadataManager.highestOffsetForEpoch((TopicIdPartition) ArgumentMatchers.eq(topicIdPartition), ArgumentMatchers.anyInt())).thenAnswer(invocationOnMock -> {
            return ((Integer) invocationOnMock.getArgument(1, Integer.class)).intValue() == 0 ? Optional.of(200L) : Optional.empty();
        });
        Assertions.assertEquals(new OffsetAndEpoch(149L, 0), this.remoteLogManager.findHighestRemoteOffset(topicIdPartition, this.mockLog));
    }

    @Test
    void testRemoteLogMetadataManagerWithUserDefinedConfigs() {
        Properties properties = new Properties();
        properties.put("remote.log.metadata.manager.impl.prefix", "config.prefix");
        properties.put("config.prefixkey", "world");
        properties.put("remote.log.metadata.y", "z");
        Map remoteLogMetadataManagerProps = createRLMConfig(properties).remoteLogMetadataManagerProps();
        Assertions.assertEquals(properties.get("config.prefixkey"), remoteLogMetadataManagerProps.get("key"));
        Assertions.assertFalse(remoteLogMetadataManagerProps.containsKey("remote.log.metadata.y"));
    }

    @Test
    void testRemoteStorageManagerWithUserDefinedConfigs() {
        Properties properties = new Properties();
        properties.put("remote.log.storage.manager.impl.prefix", "config.prefix");
        properties.put("config.prefixkey", "world");
        properties.put("remote.storage.manager.y", "z");
        Map remoteStorageManagerProps = createRLMConfig(properties).remoteStorageManagerProps();
        Assertions.assertEquals(properties.get("config.prefixkey"), remoteStorageManagerProps.get("key"));
        Assertions.assertFalse(remoteStorageManagerProps.containsKey("remote.storage.manager.y"));
    }

    @Test
    void testRemoteLogMetadataManagerWithEndpointConfig() {
        this.remoteLogManager.onEndPointCreated(new EndPoint("localhost", Integer.parseInt("1234"), new ListenerName("PLAINTEXT"), SecurityProtocol.PLAINTEXT));
        this.remoteLogManager.startup();
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Map.class);
        ((RemoteLogMetadataManager) Mockito.verify(this.remoteLogMetadataManager, Mockito.times(1))).configure((Map) forClass.capture());
        Assertions.assertEquals("localhost:1234", ((Map) forClass.getValue()).get("remote.log.metadata.common.client.bootstrap.servers"));
        Assertions.assertEquals("PLAINTEXT", ((Map) forClass.getValue()).get("remote.log.metadata.common.client.security.protocol"));
        Assertions.assertEquals("dummyId", ((Map) forClass.getValue()).get("cluster.id"));
        Assertions.assertEquals(0, ((Map) forClass.getValue()).get("broker.id"));
    }

    @Test
    void testRemoteLogMetadataManagerWithEndpointConfigOverridden() throws IOException {
        Properties properties = new Properties();
        properties.put("zookeeper.connect", kafka.utils.TestUtils.MockZkConnect());
        properties.put("rlmm.config.remote.log.metadata.common.client.security.protocol", "SSL");
        createRLMConfig(properties);
        RemoteLogManager remoteLogManager = new RemoteLogManager(KafkaConfig.fromProps(properties), 0, this.logDir, "dummyId", this.time, topicPartition -> {
            return Optional.of(this.mockLog);
        }, (topicPartition2, l) -> {
        }, this.brokerTopicStats, this.metrics) { // from class: kafka.log.remote.RemoteLogManagerTest.2
            public RemoteStorageManager createRemoteStorageManager() {
                return RemoteLogManagerTest.this.remoteStorageManager;
            }

            public RemoteLogMetadataManager createRemoteLogMetadataManager() {
                return RemoteLogManagerTest.this.remoteLogMetadataManager;
            }
        };
        Throwable th = null;
        try {
            try {
                remoteLogManager.onEndPointCreated(new EndPoint("localhost", Integer.parseInt("1234"), new ListenerName("PLAINTEXT"), SecurityProtocol.PLAINTEXT));
                remoteLogManager.startup();
                ArgumentCaptor forClass = ArgumentCaptor.forClass(Map.class);
                ((RemoteLogMetadataManager) Mockito.verify(this.remoteLogMetadataManager, Mockito.times(1))).configure((Map) forClass.capture());
                Assertions.assertEquals("localhost:1234", ((Map) forClass.getValue()).get("remote.log.metadata.common.client.bootstrap.servers"));
                Assertions.assertEquals("SSL", ((Map) forClass.getValue()).get("remote.log.metadata.common.client.security.protocol"));
                Assertions.assertEquals("dummyId", ((Map) forClass.getValue()).get("cluster.id"));
                Assertions.assertEquals(0, ((Map) forClass.getValue()).get("broker.id"));
                if (remoteLogManager != null) {
                    if (0 == 0) {
                        remoteLogManager.close();
                        return;
                    }
                    try {
                        remoteLogManager.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (remoteLogManager != null) {
                if (th != null) {
                    try {
                        remoteLogManager.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    remoteLogManager.close();
                }
            }
            throw th4;
        }
    }

    @Test
    void testStartup() {
        this.remoteLogManager.startup();
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Map.class);
        ((RemoteStorageManager) Mockito.verify(this.remoteStorageManager, Mockito.times(1))).configure((Map) forClass.capture());
        Assertions.assertEquals(0, ((Map) forClass.getValue()).get("broker.id"));
        Assertions.assertEquals("storage.test", ((Map) forClass.getValue()).get("remote.log.storage.test"));
        ((RemoteLogMetadataManager) Mockito.verify(this.remoteLogMetadataManager, Mockito.times(1))).configure((Map) forClass.capture());
        Assertions.assertEquals(0, ((Map) forClass.getValue()).get("broker.id"));
        Assertions.assertEquals(this.logDir, ((Map) forClass.getValue()).get("log.dir"));
        Assertions.assertEquals("1", ((Map) forClass.getValue()).get("remote.log.metadata.topic.num.partitions"));
        Assertions.assertEquals("metadata.test", ((Map) forClass.getValue()).get("remote.log.metadata.test"));
        Assertions.assertEquals("consumer.test", ((Map) forClass.getValue()).get("remote.log.metadata.consumer.consumer.test"));
        Assertions.assertEquals("producer.test", ((Map) forClass.getValue()).get("remote.log.metadata.producer.producer.test"));
        Assertions.assertEquals("common.test", ((Map) forClass.getValue()).get("remote.log.metadata.common.client.common.client.test"));
    }

    @Test
    void testCopyLogSegmentsToRemoteShouldCopyExpectedLogSegment() throws Exception {
        assertCopyExpectedLogSegmentsToRemote(0L, 150L, 250L, 300L);
    }

    @Test
    void testCopyLogSegmentToRemoteForStaleTopic() throws Exception {
        assertCopyExpectedLogSegmentsToRemote(0L, 150L, 150L, 150L);
    }

    private void assertCopyExpectedLogSegmentsToRemote(long j, long j2, long j3, long j4) throws Exception {
        long j5 = j2 - 1;
        Mockito.when(this.mockLog.topicPartition()).thenReturn(this.leaderTopicIdPartition.topicPartition());
        this.checkpoint.write(this.totalEpochEntries);
        Mockito.when(this.mockLog.leaderEpochCache()).thenReturn(Option.apply(new LeaderEpochFileCache(this.leaderTopicIdPartition.topicPartition(), this.checkpoint, this.scheduler)));
        Mockito.when(this.remoteLogMetadataManager.highestOffsetForEpoch((TopicIdPartition) ArgumentMatchers.any(TopicIdPartition.class), ArgumentMatchers.anyInt())).thenReturn(Optional.of(-1L));
        File tempFile = TestUtils.tempFile();
        File tempFile2 = TestUtils.tempFile();
        File tempDirectory = TestUtils.tempDirectory();
        LogSegment logSegment = (LogSegment) Mockito.mock(LogSegment.class);
        LogSegment logSegment2 = (LogSegment) Mockito.mock(LogSegment.class);
        Mockito.when(Long.valueOf(logSegment.baseOffset())).thenReturn(Long.valueOf(j));
        Mockito.when(Long.valueOf(logSegment2.baseOffset())).thenReturn(Long.valueOf(j2));
        ((LogSegment) Mockito.verify(logSegment, Mockito.times(0))).readNextOffset();
        ((LogSegment) Mockito.verify(logSegment2, Mockito.times(0))).readNextOffset();
        FileRecords fileRecords = (FileRecords) Mockito.mock(FileRecords.class);
        Mockito.when(logSegment.log()).thenReturn(fileRecords);
        Mockito.when(fileRecords.file()).thenReturn(tempFile);
        Mockito.when(Integer.valueOf(fileRecords.sizeInBytes())).thenReturn(10);
        Mockito.when(Long.valueOf(logSegment.readNextOffset())).thenReturn(Long.valueOf(j2));
        Mockito.when(this.mockLog.activeSegment()).thenReturn(logSegment2);
        Mockito.when(Long.valueOf(this.mockLog.logStartOffset())).thenReturn(Long.valueOf(j));
        Mockito.when(this.mockLog.logSegments(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(logSegment, logSegment2)));
        ProducerStateManager producerStateManager = (ProducerStateManager) Mockito.mock(ProducerStateManager.class);
        Mockito.when(this.mockLog.producerStateManager()).thenReturn(producerStateManager);
        Mockito.when(producerStateManager.fetchSnapshot(ArgumentMatchers.anyLong())).thenReturn(Optional.of(tempFile2));
        Mockito.when(Long.valueOf(this.mockLog.lastStableOffset())).thenReturn(Long.valueOf(j3));
        Mockito.when(Long.valueOf(this.mockLog.logEndOffset())).thenReturn(Long.valueOf(j4));
        OffsetIndex offsetIndex = (OffsetIndex) LazyIndex.forOffset(LogFileUtils.offsetIndexFile(tempDirectory, j, ""), j, 1000).get();
        TimeIndex timeIndex = (TimeIndex) LazyIndex.forTime(LogFileUtils.timeIndexFile(tempDirectory, j, ""), j, 1500).get();
        File transactionIndexFile = UnifiedLog.transactionIndexFile(tempDirectory, j, "");
        transactionIndexFile.createNewFile();
        TransactionIndex transactionIndex = new TransactionIndex(j, transactionIndexFile);
        Mockito.when(logSegment.timeIndex()).thenReturn(timeIndex);
        Mockito.when(logSegment.offsetIndex()).thenReturn(offsetIndex);
        Mockito.when(logSegment.txnIndex()).thenReturn(transactionIndex);
        CompletableFuture completableFuture = new CompletableFuture();
        completableFuture.complete(null);
        Mockito.when(this.remoteLogMetadataManager.addRemoteLogSegmentMetadata((RemoteLogSegmentMetadata) ArgumentMatchers.any(RemoteLogSegmentMetadata.class))).thenReturn(completableFuture);
        Mockito.when(this.remoteLogMetadataManager.updateRemoteLogSegmentMetadata((RemoteLogSegmentMetadataUpdate) ArgumentMatchers.any(RemoteLogSegmentMetadataUpdate.class))).thenReturn(completableFuture);
        Mockito.when(this.remoteStorageManager.copyLogSegmentData((RemoteLogSegmentMetadata) ArgumentMatchers.any(RemoteLogSegmentMetadata.class), (LogSegmentData) ArgumentMatchers.any(LogSegmentData.class))).thenReturn(Optional.empty());
        Assertions.assertEquals(0L, this.brokerTopicStats.topicStats(this.leaderTopicIdPartition.topic()).remoteCopyRequestRate().count());
        Assertions.assertEquals(0L, this.brokerTopicStats.topicStats(this.leaderTopicIdPartition.topic()).remoteCopyBytesRate().count());
        Assertions.assertEquals(0L, this.brokerTopicStats.topicStats(this.leaderTopicIdPartition.topic()).failedRemoteCopyRequestRate().count());
        Assertions.assertEquals(0L, this.brokerTopicStats.allTopicsStats().remoteCopyRequestRate().count());
        Assertions.assertEquals(0L, this.brokerTopicStats.allTopicsStats().remoteCopyBytesRate().count());
        Assertions.assertEquals(0L, this.brokerTopicStats.allTopicsStats().failedRemoteCopyRequestRate().count());
        RemoteLogManager remoteLogManager = this.remoteLogManager;
        remoteLogManager.getClass();
        RemoteLogManager.RLMTask rLMTask = new RemoteLogManager.RLMTask(remoteLogManager, this.leaderTopicIdPartition, 128);
        rLMTask.convertToLeader(2);
        rLMTask.copyLogSegmentsToRemote(this.mockLog);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(RemoteLogSegmentMetadata.class);
        ((RemoteLogMetadataManager) Mockito.verify(this.remoteLogMetadataManager)).addRemoteLogSegmentMetadata((RemoteLogSegmentMetadata) forClass.capture());
        TreeMap treeMap = new TreeMap();
        treeMap.put(Integer.valueOf(this.epochEntry0.epoch), Long.valueOf(this.epochEntry0.startOffset));
        treeMap.put(Integer.valueOf(this.epochEntry1.epoch), Long.valueOf(this.epochEntry1.startOffset));
        verifyRemoteLogSegmentMetadata((RemoteLogSegmentMetadata) forClass.getValue(), j, j5, treeMap);
        ArgumentCaptor forClass2 = ArgumentCaptor.forClass(RemoteLogSegmentMetadata.class);
        ArgumentCaptor forClass3 = ArgumentCaptor.forClass(LogSegmentData.class);
        ((RemoteStorageManager) Mockito.verify(this.remoteStorageManager, Mockito.times(1))).copyLogSegmentData((RemoteLogSegmentMetadata) forClass2.capture(), (LogSegmentData) forClass3.capture());
        Assertions.assertEquals(forClass.getValue(), forClass2.getValue());
        verifyLogSegmentData((LogSegmentData) forClass3.getValue(), offsetIndex, timeIndex, transactionIndex, tempFile, tempFile2, Arrays.asList(this.epochEntry0, this.epochEntry1));
        ArgumentCaptor forClass4 = ArgumentCaptor.forClass(RemoteLogSegmentMetadataUpdate.class);
        ((RemoteLogMetadataManager) Mockito.verify(this.remoteLogMetadataManager, Mockito.times(1))).updateRemoteLogSegmentMetadata((RemoteLogSegmentMetadataUpdate) forClass4.capture());
        verifyRemoteLogSegmentMetadataUpdate((RemoteLogSegmentMetadataUpdate) forClass4.getValue());
        ArgumentCaptor forClass5 = ArgumentCaptor.forClass(Long.class);
        ((UnifiedLog) Mockito.verify(this.mockLog, Mockito.times(2))).updateHighestOffsetInRemoteStorage(((Long) forClass5.capture()).longValue());
        Assertions.assertEquals(j5, (Long) forClass5.getValue());
        Assertions.assertEquals(1L, this.brokerTopicStats.topicStats(this.leaderTopicIdPartition.topic()).remoteCopyRequestRate().count());
        Assertions.assertEquals(10L, this.brokerTopicStats.topicStats(this.leaderTopicIdPartition.topic()).remoteCopyBytesRate().count());
        Assertions.assertEquals(0L, this.brokerTopicStats.topicStats(this.leaderTopicIdPartition.topic()).failedRemoteCopyRequestRate().count());
        Assertions.assertEquals(1L, this.brokerTopicStats.allTopicsStats().remoteCopyRequestRate().count());
        Assertions.assertEquals(10L, this.brokerTopicStats.allTopicsStats().remoteCopyBytesRate().count());
        Assertions.assertEquals(0L, this.brokerTopicStats.allTopicsStats().failedRemoteCopyRequestRate().count());
    }

    @Test
    void testCustomMetadataSizeExceedsLimit() throws Exception {
        Mockito.when(this.mockLog.topicPartition()).thenReturn(this.leaderTopicIdPartition.topicPartition());
        this.checkpoint.write(this.totalEpochEntries);
        Mockito.when(this.mockLog.leaderEpochCache()).thenReturn(Option.apply(new LeaderEpochFileCache(this.leaderTopicIdPartition.topicPartition(), this.checkpoint, this.scheduler)));
        Mockito.when(this.remoteLogMetadataManager.highestOffsetForEpoch((TopicIdPartition) ArgumentMatchers.any(TopicIdPartition.class), ArgumentMatchers.anyInt())).thenReturn(Optional.of(-1L));
        File tempFile = TestUtils.tempFile();
        File tempFile2 = TestUtils.tempFile();
        File tempDirectory = TestUtils.tempDirectory();
        LogSegment logSegment = (LogSegment) Mockito.mock(LogSegment.class);
        LogSegment logSegment2 = (LogSegment) Mockito.mock(LogSegment.class);
        Mockito.when(Long.valueOf(logSegment.baseOffset())).thenReturn(0L);
        Mockito.when(Long.valueOf(logSegment2.baseOffset())).thenReturn(150L);
        ((LogSegment) Mockito.verify(logSegment, Mockito.times(0))).readNextOffset();
        ((LogSegment) Mockito.verify(logSegment2, Mockito.times(0))).readNextOffset();
        FileRecords fileRecords = (FileRecords) Mockito.mock(FileRecords.class);
        Mockito.when(logSegment.log()).thenReturn(fileRecords);
        Mockito.when(fileRecords.file()).thenReturn(tempFile);
        Mockito.when(Integer.valueOf(fileRecords.sizeInBytes())).thenReturn(10);
        Mockito.when(Long.valueOf(logSegment.readNextOffset())).thenReturn(150L);
        Mockito.when(this.mockLog.activeSegment()).thenReturn(logSegment2);
        Mockito.when(Long.valueOf(this.mockLog.logStartOffset())).thenReturn(0L);
        Mockito.when(this.mockLog.logSegments(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(logSegment, logSegment2)));
        ProducerStateManager producerStateManager = (ProducerStateManager) Mockito.mock(ProducerStateManager.class);
        Mockito.when(this.mockLog.producerStateManager()).thenReturn(producerStateManager);
        Mockito.when(producerStateManager.fetchSnapshot(ArgumentMatchers.anyLong())).thenReturn(Optional.of(tempFile2));
        Mockito.when(Long.valueOf(this.mockLog.lastStableOffset())).thenReturn(150L);
        Mockito.when(Long.valueOf(this.mockLog.logEndOffset())).thenReturn(150L);
        OffsetIndex offsetIndex = LazyIndex.forOffset(LogFileUtils.offsetIndexFile(tempDirectory, 0L, ""), 0L, 1000).get();
        TimeIndex timeIndex = LazyIndex.forTime(LogFileUtils.timeIndexFile(tempDirectory, 0L, ""), 0L, 1500).get();
        File transactionIndexFile = UnifiedLog.transactionIndexFile(tempDirectory, 0L, "");
        transactionIndexFile.createNewFile();
        TransactionIndex transactionIndex = new TransactionIndex(0L, transactionIndexFile);
        Mockito.when(logSegment.timeIndex()).thenReturn(timeIndex);
        Mockito.when(logSegment.offsetIndex()).thenReturn(offsetIndex);
        Mockito.when(logSegment.txnIndex()).thenReturn(transactionIndex);
        RemoteLogSegmentMetadata.CustomMetadata customMetadata = new RemoteLogSegmentMetadata.CustomMetadata(new byte[128 * 2]);
        CompletableFuture completableFuture = new CompletableFuture();
        completableFuture.complete(null);
        Mockito.when(this.remoteLogMetadataManager.addRemoteLogSegmentMetadata((RemoteLogSegmentMetadata) ArgumentMatchers.any(RemoteLogSegmentMetadata.class))).thenReturn(completableFuture);
        Mockito.when(this.remoteStorageManager.copyLogSegmentData((RemoteLogSegmentMetadata) ArgumentMatchers.any(RemoteLogSegmentMetadata.class), (LogSegmentData) ArgumentMatchers.any(LogSegmentData.class))).thenReturn(Optional.of(customMetadata));
        RemoteLogManager remoteLogManager = this.remoteLogManager;
        remoteLogManager.getClass();
        RemoteLogManager.RLMTask rLMTask = new RemoteLogManager.RLMTask(remoteLogManager, this.leaderTopicIdPartition, 128);
        rLMTask.convertToLeader(2);
        rLMTask.copyLogSegmentsToRemote(this.mockLog);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(RemoteLogSegmentMetadata.class);
        ((RemoteLogMetadataManager) Mockito.verify(this.remoteLogMetadataManager)).addRemoteLogSegmentMetadata((RemoteLogSegmentMetadata) forClass.capture());
        ((RemoteStorageManager) Mockito.verify(this.remoteStorageManager, Mockito.times(1))).deleteLogSegmentData((RemoteLogSegmentMetadata) ArgumentMatchers.eq(((RemoteLogSegmentMetadata) forClass.getValue()).createWithUpdates(new RemoteLogSegmentMetadataUpdate(((RemoteLogSegmentMetadata) forClass.getValue()).remoteLogSegmentId(), this.time.milliseconds(), Optional.of(customMetadata), RemoteLogSegmentState.COPY_SEGMENT_FINISHED, 0))));
        Assertions.assertTrue(rLMTask.isCancelled());
        ((RemoteLogMetadataManager) Mockito.verify(this.remoteLogMetadataManager, Mockito.never())).updateRemoteLogSegmentMetadata((RemoteLogSegmentMetadataUpdate) ArgumentMatchers.any(RemoteLogSegmentMetadataUpdate.class));
        Assertions.assertEquals(1L, this.brokerTopicStats.topicStats(this.leaderTopicIdPartition.topic()).remoteCopyRequestRate().count());
        Assertions.assertEquals(0L, this.brokerTopicStats.topicStats(this.leaderTopicIdPartition.topic()).remoteCopyBytesRate().count());
        Assertions.assertEquals(1L, this.brokerTopicStats.topicStats(this.leaderTopicIdPartition.topic()).failedRemoteCopyRequestRate().count());
        Assertions.assertEquals(1L, this.brokerTopicStats.allTopicsStats().remoteCopyRequestRate().count());
        Assertions.assertEquals(0L, this.brokerTopicStats.allTopicsStats().remoteCopyBytesRate().count());
        Assertions.assertEquals(1L, this.brokerTopicStats.allTopicsStats().failedRemoteCopyRequestRate().count());
    }

    @Test
    void testFailedCopyShouldDeleteTheDanglingSegment() throws Exception {
        Mockito.when(this.mockLog.topicPartition()).thenReturn(this.leaderTopicIdPartition.topicPartition());
        this.checkpoint.write(this.totalEpochEntries);
        Mockito.when(this.mockLog.leaderEpochCache()).thenReturn(Option.apply(new LeaderEpochFileCache(this.leaderTopicIdPartition.topicPartition(), this.checkpoint, this.scheduler)));
        Mockito.when(this.remoteLogMetadataManager.highestOffsetForEpoch((TopicIdPartition) ArgumentMatchers.any(TopicIdPartition.class), ArgumentMatchers.anyInt())).thenReturn(Optional.of(-1L));
        File tempFile = TestUtils.tempFile();
        File tempFile2 = TestUtils.tempFile();
        File tempDirectory = TestUtils.tempDirectory();
        LogSegment logSegment = (LogSegment) Mockito.mock(LogSegment.class);
        LogSegment logSegment2 = (LogSegment) Mockito.mock(LogSegment.class);
        Mockito.when(Long.valueOf(logSegment.baseOffset())).thenReturn(0L);
        Mockito.when(Long.valueOf(logSegment2.baseOffset())).thenReturn(150L);
        ((LogSegment) Mockito.verify(logSegment, Mockito.times(0))).readNextOffset();
        ((LogSegment) Mockito.verify(logSegment2, Mockito.times(0))).readNextOffset();
        FileRecords fileRecords = (FileRecords) Mockito.mock(FileRecords.class);
        Mockito.when(logSegment.log()).thenReturn(fileRecords);
        Mockito.when(fileRecords.file()).thenReturn(tempFile);
        Mockito.when(Integer.valueOf(fileRecords.sizeInBytes())).thenReturn(10);
        Mockito.when(Long.valueOf(logSegment.readNextOffset())).thenReturn(150L);
        Mockito.when(this.mockLog.activeSegment()).thenReturn(logSegment2);
        Mockito.when(Long.valueOf(this.mockLog.logStartOffset())).thenReturn(0L);
        Mockito.when(this.mockLog.logSegments(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(logSegment, logSegment2)));
        ProducerStateManager producerStateManager = (ProducerStateManager) Mockito.mock(ProducerStateManager.class);
        Mockito.when(this.mockLog.producerStateManager()).thenReturn(producerStateManager);
        Mockito.when(producerStateManager.fetchSnapshot(ArgumentMatchers.anyLong())).thenReturn(Optional.of(tempFile2));
        Mockito.when(Long.valueOf(this.mockLog.lastStableOffset())).thenReturn(150L);
        Mockito.when(Long.valueOf(this.mockLog.logEndOffset())).thenReturn(150L);
        OffsetIndex offsetIndex = LazyIndex.forOffset(LogFileUtils.offsetIndexFile(tempDirectory, 0L, ""), 0L, 1000).get();
        TimeIndex timeIndex = LazyIndex.forTime(LogFileUtils.timeIndexFile(tempDirectory, 0L, ""), 0L, 1500).get();
        File transactionIndexFile = UnifiedLog.transactionIndexFile(tempDirectory, 0L, "");
        transactionIndexFile.createNewFile();
        TransactionIndex transactionIndex = new TransactionIndex(0L, transactionIndexFile);
        Mockito.when(logSegment.timeIndex()).thenReturn(timeIndex);
        Mockito.when(logSegment.offsetIndex()).thenReturn(offsetIndex);
        Mockito.when(logSegment.txnIndex()).thenReturn(transactionIndex);
        CompletableFuture completableFuture = new CompletableFuture();
        completableFuture.complete(null);
        Mockito.when(this.remoteLogMetadataManager.addRemoteLogSegmentMetadata((RemoteLogSegmentMetadata) ArgumentMatchers.any(RemoteLogSegmentMetadata.class))).thenReturn(completableFuture);
        Mockito.when(Boolean.valueOf(this.rlmCopyQuotaManager.isQuotaExceeded())).thenReturn(false);
        Mockito.when(this.remoteStorageManager.copyLogSegmentData((RemoteLogSegmentMetadata) ArgumentMatchers.any(RemoteLogSegmentMetadata.class), (LogSegmentData) ArgumentMatchers.any(LogSegmentData.class))).thenThrow(new Throwable[]{new RemoteStorageException(RemoteLogReaderTest.TOPIC)});
        RemoteLogManager remoteLogManager = this.remoteLogManager;
        remoteLogManager.getClass();
        RemoteLogManager.RLMTask rLMTask = new RemoteLogManager.RLMTask(remoteLogManager, this.leaderTopicIdPartition, 128);
        rLMTask.convertToLeader(this.epochEntry2.epoch);
        rLMTask.copyLogSegmentsToRemote(this.mockLog);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(RemoteLogSegmentMetadata.class);
        ((RemoteLogMetadataManager) Mockito.verify(this.remoteLogMetadataManager)).addRemoteLogSegmentMetadata((RemoteLogSegmentMetadata) forClass.capture());
        ((RemoteStorageManager) Mockito.verify(this.remoteStorageManager, Mockito.times(1))).deleteLogSegmentData((RemoteLogSegmentMetadata) ArgumentMatchers.eq(forClass.getValue()));
        ((RemoteLogMetadataManager) Mockito.verify(this.remoteLogMetadataManager, Mockito.never())).updateRemoteLogSegmentMetadata((RemoteLogSegmentMetadataUpdate) ArgumentMatchers.any(RemoteLogSegmentMetadataUpdate.class));
        Assertions.assertEquals(1L, this.brokerTopicStats.topicStats(this.leaderTopicIdPartition.topic()).remoteCopyRequestRate().count());
        Assertions.assertEquals(0L, this.brokerTopicStats.topicStats(this.leaderTopicIdPartition.topic()).remoteCopyBytesRate().count());
        Assertions.assertEquals(1L, this.brokerTopicStats.topicStats(this.leaderTopicIdPartition.topic()).failedRemoteCopyRequestRate().count());
        Assertions.assertEquals(1L, this.brokerTopicStats.allTopicsStats().remoteCopyRequestRate().count());
        Assertions.assertEquals(0L, this.brokerTopicStats.allTopicsStats().remoteCopyBytesRate().count());
        Assertions.assertEquals(1L, this.brokerTopicStats.allTopicsStats().failedRemoteCopyRequestRate().count());
    }

    @Test
    void testLeadershipChangesWithoutRemoteLogManagerConfiguring() {
        Assertions.assertThrows(KafkaException.class, () -> {
            this.remoteLogManager.onLeadershipChange(Collections.singleton(mockPartition(this.leaderTopicIdPartition)), Collections.singleton(mockPartition(this.followerTopicIdPartition)), this.topicIds);
        }, "RemoteLogManager is not configured when remote storage system is enabled");
    }

    @Test
    void testRemoteLogManagerTasksAvgIdlePercentAndMetadataCountMetrics() throws Exception {
        this.remoteLogManager.startup();
        int i = 3;
        Mockito.when(this.mockLog.topicPartition()).thenReturn(this.leaderTopicIdPartition.topicPartition());
        Mockito.when(this.mockLog.parentDir()).thenReturn("dir1");
        this.checkpoint.write(this.totalEpochEntries);
        Mockito.when(this.mockLog.leaderEpochCache()).thenReturn(Option.apply(new LeaderEpochFileCache(this.leaderTopicIdPartition.topicPartition(), this.checkpoint, this.scheduler)));
        Mockito.when(this.remoteLogMetadataManager.highestOffsetForEpoch((TopicIdPartition) ArgumentMatchers.any(TopicIdPartition.class), ArgumentMatchers.anyInt())).thenReturn(Optional.of(0L));
        File tempFile = TestUtils.tempFile();
        File tempFile2 = TestUtils.tempFile();
        File tempDirectory = TestUtils.tempDirectory();
        LogSegment logSegment = (LogSegment) Mockito.mock(LogSegment.class);
        LogSegment logSegment2 = (LogSegment) Mockito.mock(LogSegment.class);
        Mockito.when(Long.valueOf(logSegment.baseOffset())).thenReturn(0L);
        Mockito.when(Long.valueOf(logSegment2.baseOffset())).thenReturn(150L);
        FileRecords fileRecords = (FileRecords) Mockito.mock(FileRecords.class);
        Mockito.when(logSegment.log()).thenReturn(fileRecords);
        Mockito.when(fileRecords.file()).thenReturn(tempFile);
        Mockito.when(Integer.valueOf(fileRecords.sizeInBytes())).thenReturn(10);
        Mockito.when(Long.valueOf(logSegment.readNextOffset())).thenReturn(150L);
        Mockito.when(this.mockLog.activeSegment()).thenReturn(logSegment2);
        Mockito.when(Long.valueOf(this.mockLog.logStartOffset())).thenReturn(0L);
        Mockito.when(this.mockLog.logSegments(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(logSegment, logSegment2)));
        ProducerStateManager producerStateManager = (ProducerStateManager) Mockito.mock(ProducerStateManager.class);
        Mockito.when(this.mockLog.producerStateManager()).thenReturn(producerStateManager);
        Mockito.when(producerStateManager.fetchSnapshot(ArgumentMatchers.anyLong())).thenReturn(Optional.of(tempFile2));
        Mockito.when(Long.valueOf(this.mockLog.lastStableOffset())).thenReturn(250L);
        Mockito.when(Long.valueOf(this.mockLog.logEndOffset())).thenReturn(500L);
        HashMap hashMap = new HashMap();
        hashMap.put("retention.bytes", 100L);
        hashMap.put("retention.ms", -1L);
        Mockito.when(this.mockLog.config()).thenReturn(new LogConfig(hashMap));
        OffsetIndex offsetIndex = LazyIndex.forOffset(LogFileUtils.offsetIndexFile(tempDirectory, 0L, ""), 0L, 1000).get();
        TimeIndex timeIndex = LazyIndex.forTime(LogFileUtils.timeIndexFile(tempDirectory, 0L, ""), 0L, 1500).get();
        File transactionIndexFile = UnifiedLog.transactionIndexFile(tempDirectory, 0L, "");
        transactionIndexFile.createNewFile();
        TransactionIndex transactionIndex = new TransactionIndex(0L, transactionIndexFile);
        Mockito.when(logSegment.timeIndex()).thenReturn(timeIndex);
        Mockito.when(logSegment.offsetIndex()).thenReturn(offsetIndex);
        Mockito.when(logSegment.txnIndex()).thenReturn(transactionIndex);
        CompletableFuture completableFuture = new CompletableFuture();
        completableFuture.complete(null);
        Mockito.when(this.remoteLogMetadataManager.addRemoteLogSegmentMetadata((RemoteLogSegmentMetadata) ArgumentMatchers.any(RemoteLogSegmentMetadata.class))).thenReturn(completableFuture);
        Mockito.when(this.remoteLogMetadataManager.updateRemoteLogSegmentMetadata((RemoteLogSegmentMetadataUpdate) ArgumentMatchers.any(RemoteLogSegmentMetadataUpdate.class))).thenReturn(completableFuture);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        ((RemoteStorageManager) Mockito.doAnswer(invocationOnMock -> {
            countDownLatch.await(5000L, TimeUnit.MILLISECONDS);
            return Optional.empty();
        }).when(this.remoteStorageManager)).copyLogSegmentData((RemoteLogSegmentMetadata) ArgumentMatchers.any(RemoteLogSegmentMetadata.class), (LogSegmentData) ArgumentMatchers.any(LogSegmentData.class));
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        ((RemoteStorageManager) Mockito.doAnswer(invocationOnMock2 -> {
            countDownLatch2.await(5000L, TimeUnit.MILLISECONDS);
            return null;
        }).when(this.remoteStorageManager)).deleteLogSegmentData((RemoteLogSegmentMetadata) ArgumentMatchers.any(RemoteLogSegmentMetadata.class));
        Partition mockPartition = mockPartition(this.leaderTopicIdPartition);
        Partition mockPartition2 = mockPartition(this.followerTopicIdPartition);
        List<RemoteLogSegmentMetadata> listRemoteLogSegmentMetadata = listRemoteLogSegmentMetadata(this.leaderTopicIdPartition, 3, 100, 1024, RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
        Mockito.when(this.remoteLogMetadataManager.listRemoteLogSegments(this.leaderTopicIdPartition)).thenReturn(listRemoteLogSegmentMetadata.iterator()).thenReturn(Collections.emptyIterator());
        Mockito.when(this.remoteLogMetadataManager.listRemoteLogSegments(this.leaderTopicIdPartition, 0)).thenReturn(listRemoteLogSegmentMetadata.iterator()).thenReturn(listRemoteLogSegmentMetadata.iterator());
        Mockito.when(this.remoteLogMetadataManager.listRemoteLogSegments(this.leaderTopicIdPartition, 1)).thenReturn(listRemoteLogSegmentMetadata.iterator());
        Mockito.when(this.remoteLogMetadataManager.listRemoteLogSegments(this.leaderTopicIdPartition, 2)).thenReturn(listRemoteLogSegmentMetadata.iterator());
        Assertions.assertEquals(1.0d, ((Double) yammerMetricValue("RemoteLogManagerTasksAvgIdlePercent")).doubleValue());
        Assertions.assertEquals(0L, safeLongYammerMetricValue("RemoteLogMetadataCount,topic=Leader"));
        Assertions.assertEquals(0L, safeLongYammerMetricValue("RemoteLogSizeBytes,topic=Leader"));
        Assertions.assertEquals(0L, safeLongYammerMetricValue("RemoteLogMetadataCount"));
        Assertions.assertEquals(0L, safeLongYammerMetricValue("RemoteLogSizeBytes"));
        this.remoteLogManager.onLeadershipChange(Collections.singleton(mockPartition), Collections.singleton(mockPartition2), this.topicIds);
        Assertions.assertTrue(((Double) yammerMetricValue("RemoteLogManagerTasksAvgIdlePercent")).doubleValue() < 1.0d);
        countDownLatch.countDown();
        TestUtils.waitForCondition(() -> {
            return safeLongYammerMetricValue("RemoteLogMetadataCount,topic=Leader") == ((long) i) && safeLongYammerMetricValue("RemoteLogMetadataCount") == ((long) i);
        }, "Didn't show the expected RemoteLogMetadataCount metric value.");
        TestUtils.waitForCondition(() -> {
            return 3072 == safeLongYammerMetricValue("RemoteLogSizeBytes,topic=Leader") && 3072 == safeLongYammerMetricValue("RemoteLogSizeBytes");
        }, String.format("Expected to find 3072 for RemoteLogSizeBytes metric value, but found %d for 'Leader' topic and %d for all topic", Long.valueOf(safeLongYammerMetricValue("RemoteLogSizeBytes,topic=Leader")), Long.valueOf(safeLongYammerMetricValue("RemoteLogSizeBytes"))));
        countDownLatch2.countDown();
        TestUtils.waitForCondition(() -> {
            return safeLongYammerMetricValue("RemoteLogMetadataCount,topic=Leader") == 0 && safeLongYammerMetricValue("RemoteLogMetadataCount") == 0;
        }, "Didn't reset to 0 for RemoteLogMetadataCount metric value when no remote log metadata.");
        TestUtils.waitForCondition(() -> {
            return 0 == safeLongYammerMetricValue("RemoteLogSizeBytes,topic=Leader") && 0 == safeLongYammerMetricValue("RemoteLogSizeBytes");
        }, String.format("Didn't reset to 0 for RemoteLogSizeBytes metric value when no remote log metadata - found %d for 'Leader' topic and %d for all topic.", Long.valueOf(safeLongYammerMetricValue("RemoteLogSizeBytes,topic=Leader")), Long.valueOf(safeLongYammerMetricValue("RemoteLogSizeBytes"))));
    }

    @Test
    void testRemoteLogTaskUpdateRemoteLogSegmentMetadataAfterLogDirChanged() throws Exception {
        this.remoteLogManager.startup();
        long j = 150;
        Mockito.when(this.mockLog.topicPartition()).thenReturn(this.leaderTopicIdPartition.topicPartition());
        Mockito.when(this.mockLog.parentDir()).thenReturn("dir1");
        this.checkpoint.write(this.totalEpochEntries);
        LeaderEpochFileCache leaderEpochFileCache = new LeaderEpochFileCache(this.leaderTopicIdPartition.topicPartition(), this.checkpoint, this.scheduler);
        Mockito.when(this.mockLog.leaderEpochCache()).thenReturn(Option.apply(leaderEpochFileCache));
        Mockito.when(this.remoteLogMetadataManager.highestOffsetForEpoch((TopicIdPartition) ArgumentMatchers.any(TopicIdPartition.class), ArgumentMatchers.anyInt())).thenReturn(Optional.of(0L)).thenReturn(Optional.of(Long.valueOf(150 - 1)));
        File tempFile = TestUtils.tempFile();
        File tempFile2 = TestUtils.tempFile();
        File tempDirectory = TestUtils.tempDirectory();
        LogSegment logSegment = (LogSegment) Mockito.mock(LogSegment.class);
        LogSegment logSegment2 = (LogSegment) Mockito.mock(LogSegment.class);
        Mockito.when(Long.valueOf(logSegment.baseOffset())).thenReturn(0L);
        Mockito.when(Long.valueOf(logSegment2.baseOffset())).thenReturn(150L);
        FileRecords fileRecords = (FileRecords) Mockito.mock(FileRecords.class);
        Mockito.when(logSegment.log()).thenReturn(fileRecords);
        Mockito.when(fileRecords.file()).thenReturn(tempFile);
        Mockito.when(Integer.valueOf(fileRecords.sizeInBytes())).thenReturn(10);
        Mockito.when(Long.valueOf(logSegment.readNextOffset())).thenReturn(150L);
        Mockito.when(this.mockLog.activeSegment()).thenReturn(logSegment2);
        Mockito.when(Long.valueOf(this.mockLog.logStartOffset())).thenReturn(0L);
        Mockito.when(this.mockLog.logSegments(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(logSegment, logSegment2)));
        ProducerStateManager producerStateManager = (ProducerStateManager) Mockito.mock(ProducerStateManager.class);
        Mockito.when(this.mockLog.producerStateManager()).thenReturn(producerStateManager);
        Mockito.when(producerStateManager.fetchSnapshot(ArgumentMatchers.anyLong())).thenReturn(Optional.of(tempFile2));
        Mockito.when(Long.valueOf(this.mockLog.lastStableOffset())).thenReturn(250L);
        Mockito.when(Long.valueOf(this.mockLog.logEndOffset())).thenReturn(500L);
        HashMap hashMap = new HashMap();
        hashMap.put("retention.bytes", 100L);
        hashMap.put("retention.ms", -1L);
        LogConfig logConfig = new LogConfig(hashMap);
        Mockito.when(this.mockLog.config()).thenReturn(logConfig);
        OffsetIndex offsetIndex = LazyIndex.forOffset(LogFileUtils.offsetIndexFile(tempDirectory, 0L, ""), 0L, 1000).get();
        TimeIndex timeIndex = LazyIndex.forTime(LogFileUtils.timeIndexFile(tempDirectory, 0L, ""), 0L, 1500).get();
        File transactionIndexFile = UnifiedLog.transactionIndexFile(tempDirectory, 0L, "");
        transactionIndexFile.createNewFile();
        TransactionIndex transactionIndex = new TransactionIndex(0L, transactionIndexFile);
        Mockito.when(logSegment.timeIndex()).thenReturn(timeIndex);
        Mockito.when(logSegment.offsetIndex()).thenReturn(offsetIndex);
        Mockito.when(logSegment.txnIndex()).thenReturn(transactionIndex);
        CompletableFuture completableFuture = new CompletableFuture();
        completableFuture.complete(null);
        Mockito.when(this.remoteLogMetadataManager.addRemoteLogSegmentMetadata((RemoteLogSegmentMetadata) ArgumentMatchers.any(RemoteLogSegmentMetadata.class))).thenReturn(completableFuture);
        Mockito.when(this.remoteLogMetadataManager.updateRemoteLogSegmentMetadata((RemoteLogSegmentMetadataUpdate) ArgumentMatchers.any(RemoteLogSegmentMetadataUpdate.class))).thenReturn(completableFuture);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        ((RemoteStorageManager) Mockito.doAnswer(invocationOnMock -> {
            countDownLatch.await(5000L, TimeUnit.MILLISECONDS);
            return Optional.empty();
        }).when(this.remoteStorageManager)).copyLogSegmentData((RemoteLogSegmentMetadata) ArgumentMatchers.any(RemoteLogSegmentMetadata.class), (LogSegmentData) ArgumentMatchers.any(LogSegmentData.class));
        Partition mockPartition = mockPartition(this.leaderTopicIdPartition);
        List<RemoteLogSegmentMetadata> listRemoteLogSegmentMetadata = listRemoteLogSegmentMetadata(this.leaderTopicIdPartition, 3, 100, 1024, RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
        Mockito.when(this.remoteLogMetadataManager.listRemoteLogSegments(this.leaderTopicIdPartition)).thenReturn(listRemoteLogSegmentMetadata.iterator());
        Mockito.when(this.remoteLogMetadataManager.listRemoteLogSegments(this.leaderTopicIdPartition, 0)).thenReturn(listRemoteLogSegmentMetadata.iterator()).thenReturn(listRemoteLogSegmentMetadata.iterator());
        Mockito.when(this.remoteLogMetadataManager.listRemoteLogSegments(this.leaderTopicIdPartition, 1)).thenReturn(listRemoteLogSegmentMetadata.iterator());
        Mockito.when(this.remoteLogMetadataManager.listRemoteLogSegments(this.leaderTopicIdPartition, 2)).thenReturn(listRemoteLogSegmentMetadata.iterator());
        this.remoteLogManager.onLeadershipChange(Collections.singleton(mockPartition), Collections.emptySet(), this.topicIds);
        TestUtils.waitForCondition(() -> {
            ArgumentCaptor forClass = ArgumentCaptor.forClass(Long.class);
            ((UnifiedLog) Mockito.verify(this.mockLog, Mockito.times(1))).updateHighestOffsetInRemoteStorage(((Long) forClass.capture()).longValue());
            return 0 == ((Long) forClass.getValue()).longValue();
        }, "Timed out waiting for updateHighestOffsetInRemoteStorage(0) get invoked for dir1 log");
        UnifiedLog unifiedLog = this.mockLog;
        Mockito.clearInvocations(new UnifiedLog[]{unifiedLog});
        this.mockLog = (UnifiedLog) Mockito.mock(UnifiedLog.class);
        Mockito.when(this.mockLog.parentDir()).thenReturn("dir2");
        Mockito.when(this.mockLog.leaderEpochCache()).thenReturn(Option.apply(leaderEpochFileCache));
        Mockito.when(this.mockLog.config()).thenReturn(logConfig);
        Mockito.when(Long.valueOf(this.mockLog.logEndOffset())).thenReturn(500L);
        this.remoteLogManager.onLeadershipChange(Collections.singleton(mockPartition), Collections.emptySet(), this.topicIds);
        countDownLatch.countDown();
        TestUtils.waitForCondition(() -> {
            ArgumentCaptor forClass = ArgumentCaptor.forClass(Long.class);
            ((UnifiedLog) Mockito.verify(unifiedLog, Mockito.times(1))).updateHighestOffsetInRemoteStorage(((Long) forClass.capture()).longValue());
            return j - 1 == ((Long) forClass.getValue()).longValue();
        }, "Timed out waiting for updateHighestOffsetInRemoteStorage(149) get invoked for dir1 log");
        TestUtils.waitForCondition(() -> {
            ArgumentCaptor forClass = ArgumentCaptor.forClass(Long.class);
            ((UnifiedLog) Mockito.verify(this.mockLog, Mockito.times(1))).updateHighestOffsetInRemoteStorage(((Long) forClass.capture()).longValue());
            return j - 1 == ((Long) forClass.getValue()).longValue();
        }, "Timed out waiting for updateHighestOffsetInRemoteStorage(149) get invoked for dir2 log");
    }

    @Test
    void testRemoteLogManagerRemoteMetrics() throws Exception {
        this.remoteLogManager.startup();
        Mockito.when(this.mockLog.topicPartition()).thenReturn(this.leaderTopicIdPartition.topicPartition());
        Mockito.when(this.mockLog.parentDir()).thenReturn("dir1");
        this.checkpoint.write(this.totalEpochEntries);
        Mockito.when(this.mockLog.leaderEpochCache()).thenReturn(Option.apply(new LeaderEpochFileCache(this.leaderTopicIdPartition.topicPartition(), this.checkpoint, this.scheduler)));
        Mockito.when(this.remoteLogMetadataManager.highestOffsetForEpoch((TopicIdPartition) ArgumentMatchers.any(TopicIdPartition.class), ArgumentMatchers.anyInt())).thenReturn(Optional.of(0L));
        File tempFile = TestUtils.tempFile();
        File tempFile2 = TestUtils.tempFile();
        File tempDirectory = TestUtils.tempDirectory();
        LogSegment logSegment = (LogSegment) Mockito.mock(LogSegment.class);
        LogSegment logSegment2 = (LogSegment) Mockito.mock(LogSegment.class);
        LogSegment logSegment3 = (LogSegment) Mockito.mock(LogSegment.class);
        Mockito.when(Long.valueOf(logSegment.baseOffset())).thenReturn(0L);
        Mockito.when(Long.valueOf(logSegment2.baseOffset())).thenReturn(75L);
        Mockito.when(Long.valueOf(logSegment3.baseOffset())).thenReturn(150L);
        FileRecords fileRecords = (FileRecords) Mockito.mock(FileRecords.class);
        Mockito.when(logSegment.log()).thenReturn(fileRecords);
        Mockito.when(fileRecords.file()).thenReturn(tempFile);
        Mockito.when(Integer.valueOf(fileRecords.sizeInBytes())).thenReturn(10);
        Mockito.when(Long.valueOf(logSegment.readNextOffset())).thenReturn(75L);
        FileRecords fileRecords2 = (FileRecords) Mockito.mock(FileRecords.class);
        Mockito.when(logSegment2.log()).thenReturn(fileRecords2);
        Mockito.when(fileRecords2.file()).thenReturn(tempFile);
        Mockito.when(Integer.valueOf(fileRecords2.sizeInBytes())).thenReturn(10);
        Mockito.when(Long.valueOf(logSegment2.readNextOffset())).thenReturn(150L);
        Mockito.when(this.mockLog.activeSegment()).thenReturn(logSegment3);
        Mockito.when(Long.valueOf(this.mockLog.logStartOffset())).thenReturn(0L);
        Mockito.when(this.mockLog.logSegments(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(logSegment, logSegment2, logSegment3)));
        ProducerStateManager producerStateManager = (ProducerStateManager) Mockito.mock(ProducerStateManager.class);
        Mockito.when(this.mockLog.producerStateManager()).thenReturn(producerStateManager);
        Mockito.when(producerStateManager.fetchSnapshot(ArgumentMatchers.anyLong())).thenReturn(Optional.of(tempFile2));
        Mockito.when(Long.valueOf(this.mockLog.lastStableOffset())).thenReturn(250L);
        HashMap hashMap = new HashMap();
        hashMap.put("retention.bytes", 1000000L);
        hashMap.put("retention.ms", -1L);
        Mockito.when(this.mockLog.config()).thenReturn(new LogConfig(hashMap));
        OffsetIndex offsetIndex = LazyIndex.forOffset(LogFileUtils.offsetIndexFile(tempDirectory, 0L, ""), 0L, 1000).get();
        TimeIndex timeIndex = LazyIndex.forTime(LogFileUtils.timeIndexFile(tempDirectory, 0L, ""), 0L, 1500).get();
        File transactionIndexFile = UnifiedLog.transactionIndexFile(tempDirectory, 0L, "");
        transactionIndexFile.createNewFile();
        TransactionIndex transactionIndex = new TransactionIndex(0L, transactionIndexFile);
        Mockito.when(logSegment.timeIndex()).thenReturn(timeIndex);
        Mockito.when(logSegment.offsetIndex()).thenReturn(offsetIndex);
        Mockito.when(logSegment.txnIndex()).thenReturn(transactionIndex);
        OffsetIndex offsetIndex2 = LazyIndex.forOffset(LogFileUtils.offsetIndexFile(tempDirectory, 75L, ""), 75L, 1000).get();
        TimeIndex timeIndex2 = LazyIndex.forTime(LogFileUtils.timeIndexFile(tempDirectory, 75L, ""), 75L, 1500).get();
        File transactionIndexFile2 = UnifiedLog.transactionIndexFile(tempDirectory, 75L, "");
        transactionIndexFile.createNewFile();
        TransactionIndex transactionIndex2 = new TransactionIndex(75L, transactionIndexFile2);
        Mockito.when(logSegment2.timeIndex()).thenReturn(timeIndex2);
        Mockito.when(logSegment2.offsetIndex()).thenReturn(offsetIndex2);
        Mockito.when(logSegment2.txnIndex()).thenReturn(transactionIndex2);
        CompletableFuture completableFuture = new CompletableFuture();
        completableFuture.complete(null);
        Mockito.when(this.remoteLogMetadataManager.addRemoteLogSegmentMetadata((RemoteLogSegmentMetadata) ArgumentMatchers.any(RemoteLogSegmentMetadata.class))).thenReturn(completableFuture);
        Mockito.when(this.remoteLogMetadataManager.updateRemoteLogSegmentMetadata((RemoteLogSegmentMetadataUpdate) ArgumentMatchers.any(RemoteLogSegmentMetadataUpdate.class))).thenReturn(completableFuture);
        Iterator<RemoteLogSegmentMetadata> it = listRemoteLogSegmentMetadata(this.leaderTopicIdPartition, 5, 100, 1024, RemoteLogSegmentState.COPY_SEGMENT_FINISHED).iterator();
        Mockito.when(this.remoteLogMetadataManager.listRemoteLogSegments(this.leaderTopicIdPartition)).thenReturn(it);
        Mockito.when(this.remoteLogMetadataManager.listRemoteLogSegments(this.leaderTopicIdPartition, 2)).thenReturn(it);
        Mockito.when(this.remoteLogMetadataManager.listRemoteLogSegments(this.leaderTopicIdPartition, 1)).thenReturn(it);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        Mockito.when(this.remoteLogMetadataManager.listRemoteLogSegments(this.leaderTopicIdPartition, 0)).thenAnswer(invocationOnMock -> {
            this.time.sleep(1000L);
            return it;
        }).thenAnswer(invocationOnMock2 -> {
            countDownLatch.await(5000L, TimeUnit.MILLISECONDS);
            return Collections.emptyIterator();
        });
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        ((RemoteStorageManager) Mockito.doAnswer(invocationOnMock3 -> {
            return Optional.empty();
        }).doAnswer(invocationOnMock4 -> {
            countDownLatch2.await(5000L, TimeUnit.MILLISECONDS);
            return Optional.empty();
        }).when(this.remoteStorageManager)).copyLogSegmentData((RemoteLogSegmentMetadata) ArgumentMatchers.any(RemoteLogSegmentMetadata.class), (LogSegmentData) ArgumentMatchers.any(LogSegmentData.class));
        Partition mockPartition = mockPartition(this.leaderTopicIdPartition);
        Mockito.when(Long.valueOf(this.mockLog.onlyLocalLogSegmentsSize())).thenReturn(175L, new Long[]{100L});
        Mockito.when(Integer.valueOf(logSegment3.size())).thenReturn(100);
        Mockito.when(Long.valueOf(this.mockLog.onlyLocalLogSegmentsCount())).thenReturn(1L);
        Assertions.assertThrows(NoSuchElementException.class, () -> {
            yammerMetricValue("RemoteCopyLagBytes,topic=Leader");
        });
        Assertions.assertThrows(NoSuchElementException.class, () -> {
            yammerMetricValue("RemoteCopyLagSegments,topic=Leader");
        });
        Assertions.assertThrows(NoSuchElementException.class, () -> {
            yammerMetricValue("RemoteLogSizeComputationTime,topic=Leader");
        });
        Assertions.assertEquals(0L, yammerMetricValue("RemoteCopyLagBytes"));
        Assertions.assertEquals(0L, yammerMetricValue("RemoteCopyLagSegments"));
        Assertions.assertEquals(0L, yammerMetricValue("RemoteLogSizeComputationTime"));
        this.remoteLogManager.onLeadershipChange(Collections.singleton(mockPartition), Collections.emptySet(), this.topicIds);
        TestUtils.waitForCondition(() -> {
            return 75 == safeLongYammerMetricValue("RemoteCopyLagBytes") && 75 == safeLongYammerMetricValue("RemoteCopyLagBytes,topic=Leader");
        }, String.format("Expected to find 75 for RemoteCopyLagBytes metric value, but found %d for topic 'Leader' and %d for all topics.", Long.valueOf(safeLongYammerMetricValue("RemoteCopyLagBytes,topic=Leader")), Long.valueOf(safeLongYammerMetricValue("RemoteCopyLagBytes"))));
        TestUtils.waitForCondition(() -> {
            return 1 == safeLongYammerMetricValue("RemoteCopyLagSegments") && 1 == safeLongYammerMetricValue("RemoteCopyLagSegments,topic=Leader");
        }, String.format("Expected to find 1 for RemoteCopyLagSegments metric value, but found %d for topic 'Leader' and %d for all topics.", Long.valueOf(safeLongYammerMetricValue("RemoteCopyLagSegments,topic=Leader")), Long.valueOf(safeLongYammerMetricValue("RemoteCopyLagSegments"))));
        countDownLatch2.countDown();
        TestUtils.waitForCondition(() -> {
            return safeLongYammerMetricValue("RemoteLogSizeComputationTime") >= 1000 && safeLongYammerMetricValue("RemoteLogSizeComputationTime,topic=Leader") >= 1000;
        }, String.format("Expected to find 1000 for RemoteLogSizeComputationTime metric value, but found %d for topic 'Leader' and %d for all topics.", Long.valueOf(safeLongYammerMetricValue("RemoteLogSizeComputationTime,topic=Leader")), Long.valueOf(safeLongYammerMetricValue("RemoteLogSizeComputationTime"))));
        countDownLatch.countDown();
    }

    private Object yammerMetricValue(String str) {
        return ((Gauge) ((Map.Entry) KafkaYammerMetrics.defaultRegistry().allMetrics().entrySet().stream().filter(entry -> {
            return ((MetricName) entry.getKey()).getMBeanName().endsWith(str);
        }).findFirst().get()).getValue()).value();
    }

    private long safeLongYammerMetricValue(String str) {
        try {
            return ((Long) yammerMetricValue(str)).longValue();
        } catch (NoSuchElementException e) {
            return 0L;
        }
    }

    @Test
    void testMetricsUpdateOnCopyLogSegmentsFailure() throws Exception {
        Mockito.when(this.mockLog.topicPartition()).thenReturn(this.leaderTopicIdPartition.topicPartition());
        this.checkpoint.write(this.totalEpochEntries);
        Mockito.when(this.mockLog.leaderEpochCache()).thenReturn(Option.apply(new LeaderEpochFileCache(this.leaderTopicIdPartition.topicPartition(), this.checkpoint, this.scheduler)));
        Mockito.when(this.remoteLogMetadataManager.highestOffsetForEpoch((TopicIdPartition) ArgumentMatchers.any(TopicIdPartition.class), ArgumentMatchers.anyInt())).thenReturn(Optional.of(0L));
        File tempFile = TestUtils.tempFile();
        File tempFile2 = TestUtils.tempFile();
        File tempDirectory = TestUtils.tempDirectory();
        LogSegment logSegment = (LogSegment) Mockito.mock(LogSegment.class);
        LogSegment logSegment2 = (LogSegment) Mockito.mock(LogSegment.class);
        Mockito.when(Long.valueOf(logSegment.baseOffset())).thenReturn(0L);
        Mockito.when(Long.valueOf(logSegment2.baseOffset())).thenReturn(150L);
        FileRecords fileRecords = (FileRecords) Mockito.mock(FileRecords.class);
        Mockito.when(logSegment.log()).thenReturn(fileRecords);
        Mockito.when(fileRecords.file()).thenReturn(tempFile);
        Mockito.when(Integer.valueOf(fileRecords.sizeInBytes())).thenReturn(10);
        Mockito.when(Long.valueOf(logSegment.readNextOffset())).thenReturn(150L);
        Mockito.when(this.mockLog.activeSegment()).thenReturn(logSegment2);
        Mockito.when(Long.valueOf(this.mockLog.logStartOffset())).thenReturn(0L);
        Mockito.when(this.mockLog.logSegments(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(logSegment, logSegment2)));
        ProducerStateManager producerStateManager = (ProducerStateManager) Mockito.mock(ProducerStateManager.class);
        Mockito.when(this.mockLog.producerStateManager()).thenReturn(producerStateManager);
        Mockito.when(producerStateManager.fetchSnapshot(ArgumentMatchers.anyLong())).thenReturn(Optional.of(tempFile2));
        Mockito.when(Long.valueOf(this.mockLog.lastStableOffset())).thenReturn(250L);
        OffsetIndex offsetIndex = LazyIndex.forOffset(LogFileUtils.offsetIndexFile(tempDirectory, 0L, ""), 0L, 1000).get();
        TimeIndex timeIndex = LazyIndex.forTime(LogFileUtils.timeIndexFile(tempDirectory, 0L, ""), 0L, 1500).get();
        File transactionIndexFile = UnifiedLog.transactionIndexFile(tempDirectory, 0L, "");
        transactionIndexFile.createNewFile();
        TransactionIndex transactionIndex = new TransactionIndex(0L, transactionIndexFile);
        Mockito.when(logSegment.timeIndex()).thenReturn(timeIndex);
        Mockito.when(logSegment.offsetIndex()).thenReturn(offsetIndex);
        Mockito.when(logSegment.txnIndex()).thenReturn(transactionIndex);
        CompletableFuture completableFuture = new CompletableFuture();
        completableFuture.complete(null);
        Mockito.when(this.remoteLogMetadataManager.addRemoteLogSegmentMetadata((RemoteLogSegmentMetadata) ArgumentMatchers.any(RemoteLogSegmentMetadata.class))).thenReturn(completableFuture);
        ((RemoteStorageManager) Mockito.doThrow(new Throwable[]{new RuntimeException()}).when(this.remoteStorageManager)).copyLogSegmentData((RemoteLogSegmentMetadata) ArgumentMatchers.any(RemoteLogSegmentMetadata.class), (LogSegmentData) ArgumentMatchers.any(LogSegmentData.class));
        Assertions.assertEquals(0L, this.brokerTopicStats.topicStats(this.leaderTopicIdPartition.topic()).remoteCopyRequestRate().count());
        Assertions.assertEquals(0L, this.brokerTopicStats.topicStats(this.leaderTopicIdPartition.topic()).failedRemoteCopyRequestRate().count());
        Assertions.assertEquals(0L, this.brokerTopicStats.allTopicsStats().remoteCopyRequestRate().count());
        Assertions.assertEquals(0L, this.brokerTopicStats.allTopicsStats().failedRemoteCopyRequestRate().count());
        RemoteLogManager remoteLogManager = this.remoteLogManager;
        remoteLogManager.getClass();
        RemoteLogManager.RLMTask rLMTask = new RemoteLogManager.RLMTask(remoteLogManager, this.leaderTopicIdPartition, 128);
        rLMTask.convertToLeader(2);
        rLMTask.copyLogSegmentsToRemote(this.mockLog);
        ((RemoteStorageManager) Mockito.verify(this.remoteStorageManager, Mockito.times(1))).copyLogSegmentData((RemoteLogSegmentMetadata) ArgumentMatchers.any(RemoteLogSegmentMetadata.class), (LogSegmentData) ArgumentMatchers.any(LogSegmentData.class));
        ((UnifiedLog) Mockito.verify(this.mockLog)).updateHighestOffsetInRemoteStorage(ArgumentMatchers.anyLong());
        Assertions.assertEquals(1L, this.brokerTopicStats.topicStats(this.leaderTopicIdPartition.topic()).remoteCopyRequestRate().count());
        Assertions.assertEquals(1L, this.brokerTopicStats.topicStats(this.leaderTopicIdPartition.topic()).failedRemoteCopyRequestRate().count());
        Assertions.assertEquals(1L, this.brokerTopicStats.allTopicsStats().remoteCopyRequestRate().count());
        Assertions.assertEquals(1L, this.brokerTopicStats.allTopicsStats().failedRemoteCopyRequestRate().count());
    }

    @Test
    void testCopyLogSegmentsToRemoteShouldNotCopySegmentForFollower() throws Exception {
        this.checkpoint.write(this.totalEpochEntries);
        Mockito.when(this.mockLog.leaderEpochCache()).thenReturn(Option.apply(new LeaderEpochFileCache(this.leaderTopicIdPartition.topicPartition(), this.checkpoint, this.scheduler)));
        Mockito.when(this.remoteLogMetadataManager.highestOffsetForEpoch((TopicIdPartition) ArgumentMatchers.any(TopicIdPartition.class), ArgumentMatchers.anyInt())).thenReturn(Optional.of(0L));
        LogSegment logSegment = (LogSegment) Mockito.mock(LogSegment.class);
        LogSegment logSegment2 = (LogSegment) Mockito.mock(LogSegment.class);
        Mockito.when(Long.valueOf(logSegment.baseOffset())).thenReturn(0L);
        Mockito.when(Long.valueOf(logSegment2.baseOffset())).thenReturn(150L);
        Mockito.when(this.mockLog.activeSegment()).thenReturn(logSegment2);
        Mockito.when(Long.valueOf(this.mockLog.logStartOffset())).thenReturn(0L);
        Mockito.when(this.mockLog.logSegments(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(logSegment, logSegment2)));
        Mockito.when(Long.valueOf(this.mockLog.lastStableOffset())).thenReturn(250L);
        RemoteLogManager remoteLogManager = this.remoteLogManager;
        remoteLogManager.getClass();
        RemoteLogManager.RLMTask rLMTask = new RemoteLogManager.RLMTask(remoteLogManager, this.leaderTopicIdPartition, 128);
        rLMTask.convertToFollower();
        rLMTask.copyLogSegmentsToRemote(this.mockLog);
        ((RemoteLogMetadataManager) Mockito.verify(this.remoteLogMetadataManager, Mockito.never())).addRemoteLogSegmentMetadata((RemoteLogSegmentMetadata) ArgumentMatchers.any(RemoteLogSegmentMetadata.class));
        ((RemoteStorageManager) Mockito.verify(this.remoteStorageManager, Mockito.never())).copyLogSegmentData((RemoteLogSegmentMetadata) ArgumentMatchers.any(RemoteLogSegmentMetadata.class), (LogSegmentData) ArgumentMatchers.any(LogSegmentData.class));
        ((RemoteLogMetadataManager) Mockito.verify(this.remoteLogMetadataManager, Mockito.never())).updateRemoteLogSegmentMetadata((RemoteLogSegmentMetadataUpdate) ArgumentMatchers.any(RemoteLogSegmentMetadataUpdate.class));
        ((UnifiedLog) Mockito.verify(this.mockLog)).updateHighestOffsetInRemoteStorage(ArgumentMatchers.anyLong());
    }

    @Test
    void testRLMTaskDoesNotUploadSegmentsWhenRemoteLogMetadataManagerIsNotInitialized() throws Exception {
        Mockito.when(this.mockLog.topicPartition()).thenReturn(this.leaderTopicIdPartition.topicPartition());
        this.checkpoint.write(this.totalEpochEntries);
        Mockito.when(this.mockLog.leaderEpochCache()).thenReturn(Option.apply(new LeaderEpochFileCache(this.leaderTopicIdPartition.topicPartition(), this.checkpoint, this.scheduler)));
        Mockito.when(this.remoteLogMetadataManager.highestOffsetForEpoch((TopicIdPartition) ArgumentMatchers.any(TopicIdPartition.class), ArgumentMatchers.anyInt())).thenThrow(new Throwable[]{new ReplicaNotAvailableException("Remote log metadata cache is not initialized for partition: " + this.leaderTopicIdPartition)});
        LogSegment logSegment = (LogSegment) Mockito.mock(LogSegment.class);
        LogSegment logSegment2 = (LogSegment) Mockito.mock(LogSegment.class);
        Mockito.when(Long.valueOf(logSegment.baseOffset())).thenReturn(0L);
        Mockito.when(Long.valueOf(logSegment2.baseOffset())).thenReturn(150L);
        Mockito.when(this.mockLog.activeSegment()).thenReturn(logSegment2);
        Mockito.when(Long.valueOf(this.mockLog.logStartOffset())).thenReturn(0L);
        Mockito.when(this.mockLog.logSegments(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(logSegment, logSegment2)));
        Mockito.when(Long.valueOf(this.mockLog.lastStableOffset())).thenReturn(250L);
        Assertions.assertEquals(0L, this.brokerTopicStats.topicStats(this.leaderTopicIdPartition.topic()).remoteCopyRequestRate().count());
        Assertions.assertEquals(0L, this.brokerTopicStats.topicStats(this.leaderTopicIdPartition.topic()).failedRemoteCopyRequestRate().count());
        Assertions.assertEquals(0L, this.brokerTopicStats.allTopicsStats().remoteCopyRequestRate().count());
        Assertions.assertEquals(0L, this.brokerTopicStats.allTopicsStats().failedRemoteCopyRequestRate().count());
        RemoteLogManager remoteLogManager = this.remoteLogManager;
        remoteLogManager.getClass();
        RemoteLogManager.RLMTask rLMTask = new RemoteLogManager.RLMTask(remoteLogManager, this.leaderTopicIdPartition, 128);
        rLMTask.convertToLeader(0);
        rLMTask.run();
        ((RemoteLogMetadataManager) Mockito.verify(this.remoteLogMetadataManager, Mockito.never())).addRemoteLogSegmentMetadata((RemoteLogSegmentMetadata) ArgumentMatchers.any(RemoteLogSegmentMetadata.class));
        ((RemoteStorageManager) Mockito.verify(this.remoteStorageManager, Mockito.never())).copyLogSegmentData((RemoteLogSegmentMetadata) ArgumentMatchers.any(RemoteLogSegmentMetadata.class), (LogSegmentData) ArgumentMatchers.any(LogSegmentData.class));
        ((RemoteLogMetadataManager) Mockito.verify(this.remoteLogMetadataManager, Mockito.never())).updateRemoteLogSegmentMetadata((RemoteLogSegmentMetadataUpdate) ArgumentMatchers.any(RemoteLogSegmentMetadataUpdate.class));
        ((UnifiedLog) Mockito.verify(this.mockLog, Mockito.never())).updateHighestOffsetInRemoteStorage(ArgumentMatchers.anyLong());
        Assertions.assertEquals(0L, this.brokerTopicStats.topicStats(this.leaderTopicIdPartition.topic()).remoteCopyRequestRate().count());
        Assertions.assertEquals(0L, this.brokerTopicStats.topicStats(this.leaderTopicIdPartition.topic()).failedRemoteCopyRequestRate().count());
        Assertions.assertEquals(0L, this.brokerTopicStats.allTopicsStats().remoteCopyRequestRate().count());
        Assertions.assertEquals(0L, this.brokerTopicStats.allTopicsStats().failedRemoteCopyRequestRate().count());
    }

    private void verifyRemoteLogSegmentMetadata(RemoteLogSegmentMetadata remoteLogSegmentMetadata, long j, long j2, Map<Integer, Long> map) {
        Assertions.assertEquals(this.leaderTopicIdPartition, remoteLogSegmentMetadata.remoteLogSegmentId().topicIdPartition());
        Assertions.assertEquals(j, remoteLogSegmentMetadata.startOffset());
        Assertions.assertEquals(j2, remoteLogSegmentMetadata.endOffset());
        NavigableMap segmentLeaderEpochs = remoteLogSegmentMetadata.segmentLeaderEpochs();
        Assertions.assertEquals(map.size(), segmentLeaderEpochs.size());
        Iterator<Map.Entry<Integer, Long>> it = map.entrySet().iterator();
        Assertions.assertEquals(it.next(), segmentLeaderEpochs.firstEntry());
        Assertions.assertEquals(it.next(), segmentLeaderEpochs.lastEntry());
        Assertions.assertEquals(0, remoteLogSegmentMetadata.brokerId());
        Assertions.assertEquals(RemoteLogSegmentState.COPY_SEGMENT_STARTED, remoteLogSegmentMetadata.state());
    }

    private void verifyRemoteLogSegmentMetadataUpdate(RemoteLogSegmentMetadataUpdate remoteLogSegmentMetadataUpdate) {
        Assertions.assertEquals(this.leaderTopicIdPartition, remoteLogSegmentMetadataUpdate.remoteLogSegmentId().topicIdPartition());
        Assertions.assertEquals(0, remoteLogSegmentMetadataUpdate.brokerId());
        Assertions.assertEquals(RemoteLogSegmentState.COPY_SEGMENT_FINISHED, remoteLogSegmentMetadataUpdate.state());
    }

    private void verifyLogSegmentData(LogSegmentData logSegmentData, OffsetIndex offsetIndex, TimeIndex timeIndex, TransactionIndex transactionIndex, File file, File file2, List<EpochEntry> list) throws IOException {
        Assertions.assertEquals(offsetIndex.file().getAbsolutePath(), logSegmentData.offsetIndex().toAbsolutePath().toString());
        Assertions.assertEquals(timeIndex.file().getAbsolutePath(), logSegmentData.timeIndex().toAbsolutePath().toString());
        Assertions.assertEquals(transactionIndex.file().getPath(), ((Path) logSegmentData.transactionIndex().get()).toAbsolutePath().toString());
        Assertions.assertEquals(file.getAbsolutePath(), logSegmentData.logSegment().toAbsolutePath().toString());
        Assertions.assertEquals(file2.getAbsolutePath(), logSegmentData.producerSnapshotIndex().toAbsolutePath().toString());
        Assertions.assertEquals(RemoteLogManager.epochEntriesAsByteBuffer(list), logSegmentData.leaderEpochIndex());
    }

    @Test
    void testGetClassLoaderAwareRemoteStorageManager() throws Exception {
        final ClassLoaderAwareRemoteStorageManager classLoaderAwareRemoteStorageManager = (ClassLoaderAwareRemoteStorageManager) Mockito.mock(ClassLoaderAwareRemoteStorageManager.class);
        RemoteLogManager remoteLogManager = new RemoteLogManager(this.config, 0, this.logDir, "dummyId", this.time, topicPartition -> {
            return Optional.empty();
        }, (topicPartition2, l) -> {
        }, this.brokerTopicStats, this.metrics) { // from class: kafka.log.remote.RemoteLogManagerTest.3
            public RemoteStorageManager createRemoteStorageManager() {
                return classLoaderAwareRemoteStorageManager;
            }
        };
        Throwable th = null;
        try {
            try {
                Assertions.assertEquals(classLoaderAwareRemoteStorageManager, remoteLogManager.storageManager());
                if (remoteLogManager != null) {
                    if (0 == 0) {
                        remoteLogManager.close();
                        return;
                    }
                    try {
                        remoteLogManager.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (remoteLogManager != null) {
                if (th != null) {
                    try {
                        remoteLogManager.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    remoteLogManager.close();
                }
            }
            throw th4;
        }
    }

    private void verifyInCache(TopicIdPartition... topicIdPartitionArr) {
        Arrays.stream(topicIdPartitionArr).forEach(topicIdPartition -> {
            Assertions.assertDoesNotThrow(() -> {
                return this.remoteLogManager.fetchRemoteLogSegmentMetadata(topicIdPartition.topicPartition(), 0, 0L);
            });
        });
    }

    private void verifyNotInCache(TopicIdPartition... topicIdPartitionArr) {
        Arrays.stream(topicIdPartitionArr).forEach(topicIdPartition -> {
            Assertions.assertThrows(KafkaException.class, () -> {
                this.remoteLogManager.fetchRemoteLogSegmentMetadata(topicIdPartition.topicPartition(), 0, 0L);
            });
        });
    }

    @Test
    void testTopicIdCacheUpdates() throws RemoteStorageException {
        this.remoteLogManager.startup();
        Partition mockPartition = mockPartition(this.leaderTopicIdPartition);
        Partition mockPartition2 = mockPartition(this.followerTopicIdPartition);
        Mockito.when(this.remoteLogMetadataManager.remoteLogSegmentMetadata((TopicIdPartition) ArgumentMatchers.any(TopicIdPartition.class), ArgumentMatchers.anyInt(), ArgumentMatchers.anyLong())).thenReturn(Optional.empty());
        verifyNotInCache(this.followerTopicIdPartition, this.leaderTopicIdPartition);
        this.remoteLogManager.onLeadershipChange(Collections.singleton(mockPartition), Collections.singleton(mockPartition2), this.topicIds);
        ((RemoteLogMetadataManager) Mockito.verify(this.remoteLogMetadataManager, Mockito.times(1))).onPartitionLeadershipChanges(Collections.singleton(this.leaderTopicIdPartition), Collections.singleton(this.followerTopicIdPartition));
        verifyInCache(this.followerTopicIdPartition, this.leaderTopicIdPartition);
        this.remoteLogManager.stopPartitions(Collections.singleton(new StopPartition(this.leaderTopicIdPartition.topicPartition(), true, true)), (topicPartition, th) -> {
        });
        verifyNotInCache(this.leaderTopicIdPartition);
        verifyInCache(this.followerTopicIdPartition);
        this.remoteLogManager.stopPartitions(Collections.singleton(new StopPartition(this.followerTopicIdPartition.topicPartition(), true, true)), (topicPartition2, th2) -> {
        });
        verifyNotInCache(this.leaderTopicIdPartition, this.followerTopicIdPartition);
    }

    @Test
    void testFetchRemoteLogSegmentMetadata() throws RemoteStorageException {
        this.remoteLogManager.startup();
        this.remoteLogManager.onLeadershipChange(Collections.singleton(mockPartition(this.leaderTopicIdPartition)), Collections.singleton(mockPartition(this.followerTopicIdPartition)), this.topicIds);
        this.remoteLogManager.fetchRemoteLogSegmentMetadata(this.leaderTopicIdPartition.topicPartition(), 10, 100L);
        this.remoteLogManager.fetchRemoteLogSegmentMetadata(this.followerTopicIdPartition.topicPartition(), 20, 200L);
        ((RemoteLogMetadataManager) Mockito.verify(this.remoteLogMetadataManager)).remoteLogSegmentMetadata((TopicIdPartition) ArgumentMatchers.eq(this.leaderTopicIdPartition), ArgumentMatchers.anyInt(), ArgumentMatchers.anyLong());
        ((RemoteLogMetadataManager) Mockito.verify(this.remoteLogMetadataManager)).remoteLogSegmentMetadata((TopicIdPartition) ArgumentMatchers.eq(this.followerTopicIdPartition), ArgumentMatchers.anyInt(), ArgumentMatchers.anyLong());
    }

    @Test
    void testOnLeadershipChangeWillInvokeHandleLeaderOrFollowerPartitions() {
        this.remoteLogManager.startup();
        RemoteLogManager remoteLogManager = (RemoteLogManager) Mockito.spy(this.remoteLogManager);
        remoteLogManager.onLeadershipChange(Collections.emptySet(), Collections.singleton(mockPartition(this.followerTopicIdPartition)), this.topicIds);
        ((RemoteLogManager) Mockito.verify(remoteLogManager)).doHandleLeaderOrFollowerPartitions((TopicIdPartition) ArgumentMatchers.eq(this.followerTopicIdPartition), (Consumer) ArgumentMatchers.any(Consumer.class));
        Mockito.reset(new RemoteLogManager[]{remoteLogManager});
        remoteLogManager.onLeadershipChange(Collections.singleton(mockPartition(this.leaderTopicIdPartition)), Collections.emptySet(), this.topicIds);
        ((RemoteLogManager) Mockito.verify(remoteLogManager)).doHandleLeaderOrFollowerPartitions((TopicIdPartition) ArgumentMatchers.eq(this.leaderTopicIdPartition), (Consumer) ArgumentMatchers.any(Consumer.class));
    }

    private MemoryRecords records(long j, long j2, int i) {
        return MemoryRecords.withRecords(j2, Compression.NONE, Integer.valueOf(i), new SimpleRecord[]{new SimpleRecord(j - 1, "first message".getBytes()), new SimpleRecord(j + 1, "second message".getBytes()), new SimpleRecord(j + 2, "third message".getBytes())});
    }

    @Test
    void testRLMTaskShouldSetLeaderEpochCorrectly() {
        RemoteLogManager remoteLogManager = this.remoteLogManager;
        remoteLogManager.getClass();
        RemoteLogManager.RLMTask rLMTask = new RemoteLogManager.RLMTask(remoteLogManager, this.leaderTopicIdPartition, 128);
        Assertions.assertFalse(rLMTask.isLeader());
        rLMTask.convertToLeader(1);
        Assertions.assertTrue(rLMTask.isLeader());
        rLMTask.convertToFollower();
        Assertions.assertFalse(rLMTask.isLeader());
    }

    @Test
    void testFindOffsetByTimestamp() throws IOException, RemoteStorageException {
        this.remoteLogManager.startup();
        TopicPartition topicPartition = this.leaderTopicIdPartition.topicPartition();
        long milliseconds = this.time.milliseconds();
        TreeMap<Integer, Long> treeMap = new TreeMap<>();
        treeMap.put(10, 120L);
        LeaderEpochFileCache leaderEpochFileCache = new LeaderEpochFileCache(topicPartition, this.checkpoint, this.scheduler);
        leaderEpochFileCache.assign(4, 99L);
        leaderEpochFileCache.assign(5, 99L);
        leaderEpochFileCache.assign(10, 120L);
        leaderEpochFileCache.assign(12, 500L);
        doTestFindOffsetByTimestamp(milliseconds, 120L, 10, treeMap, RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
        Assertions.assertEquals(Optional.of(new FileRecords.TimestampAndOffset(milliseconds + 1, 120 + 1, Optional.of(10))), this.remoteLogManager.findOffsetByTimestamp(topicPartition, milliseconds, 120L, leaderEpochFileCache));
        Assertions.assertEquals(Optional.of(new FileRecords.TimestampAndOffset(milliseconds + 2, 120 + 2, Optional.of(10))), this.remoteLogManager.findOffsetByTimestamp(topicPartition, milliseconds + 2, 120L, leaderEpochFileCache));
        Assertions.assertEquals(Optional.empty(), this.remoteLogManager.findOffsetByTimestamp(topicPartition, milliseconds + 3, 120L, leaderEpochFileCache));
    }

    @Test
    void testFindOffsetByTimestampWithInvalidEpochSegments() throws IOException, RemoteStorageException {
        this.remoteLogManager.startup();
        TopicPartition topicPartition = this.leaderTopicIdPartition.topicPartition();
        long milliseconds = this.time.milliseconds();
        TreeMap<Integer, Long> treeMap = new TreeMap<>();
        treeMap.put(Integer.valueOf(10 - 1), Long.valueOf(120 - 1));
        treeMap.put(10, 120L);
        LeaderEpochFileCache leaderEpochFileCache = new LeaderEpochFileCache(topicPartition, this.checkpoint, this.scheduler);
        leaderEpochFileCache.assign(4, 99L);
        leaderEpochFileCache.assign(5, 99L);
        leaderEpochFileCache.assign(10, 120L);
        leaderEpochFileCache.assign(12, 500L);
        doTestFindOffsetByTimestamp(milliseconds, 120L, 10, treeMap, RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
        Assertions.assertEquals(Optional.empty(), this.remoteLogManager.findOffsetByTimestamp(topicPartition, milliseconds, 120L, leaderEpochFileCache));
        Assertions.assertEquals(Optional.empty(), this.remoteLogManager.findOffsetByTimestamp(topicPartition, milliseconds + 2, 120L, leaderEpochFileCache));
        Assertions.assertEquals(Optional.empty(), this.remoteLogManager.findOffsetByTimestamp(topicPartition, milliseconds + 3, 120L, leaderEpochFileCache));
    }

    @Test
    void testFindOffsetByTimestampWithSegmentNotReady() throws IOException, RemoteStorageException {
        this.remoteLogManager.startup();
        TopicPartition topicPartition = this.leaderTopicIdPartition.topicPartition();
        long milliseconds = this.time.milliseconds();
        TreeMap<Integer, Long> treeMap = new TreeMap<>();
        treeMap.put(10, 120L);
        LeaderEpochFileCache leaderEpochFileCache = new LeaderEpochFileCache(topicPartition, this.checkpoint, this.scheduler);
        leaderEpochFileCache.assign(4, 99L);
        leaderEpochFileCache.assign(5, 99L);
        leaderEpochFileCache.assign(10, 120L);
        leaderEpochFileCache.assign(12, 500L);
        doTestFindOffsetByTimestamp(milliseconds, 120L, 10, treeMap, RemoteLogSegmentState.COPY_SEGMENT_STARTED);
        Assertions.assertEquals(Optional.empty(), this.remoteLogManager.findOffsetByTimestamp(topicPartition, milliseconds, 120L, leaderEpochFileCache));
    }

    private void doTestFindOffsetByTimestamp(long j, long j2, int i, TreeMap<Integer, Long> treeMap, RemoteLogSegmentState remoteLogSegmentState) throws IOException, RemoteStorageException {
        TopicPartition topicPartition = this.leaderTopicIdPartition.topicPartition();
        RemoteLogSegmentId remoteLogSegmentId = new RemoteLogSegmentId(this.leaderTopicIdPartition, Uuid.randomUuid());
        RemoteLogSegmentMetadata remoteLogSegmentMetadata = (RemoteLogSegmentMetadata) Mockito.mock(RemoteLogSegmentMetadata.class);
        Mockito.when(remoteLogSegmentMetadata.remoteLogSegmentId()).thenReturn(remoteLogSegmentId);
        Mockito.when(Long.valueOf(remoteLogSegmentMetadata.maxTimestampMs())).thenReturn(Long.valueOf(j + 2));
        Mockito.when(Long.valueOf(remoteLogSegmentMetadata.startOffset())).thenReturn(Long.valueOf(j2));
        Mockito.when(Long.valueOf(remoteLogSegmentMetadata.endOffset())).thenReturn(Long.valueOf(j2 + 2));
        Mockito.when(remoteLogSegmentMetadata.segmentLeaderEpochs()).thenReturn(treeMap);
        Mockito.when(remoteLogSegmentMetadata.state()).thenReturn(remoteLogSegmentState);
        File file = new File(this.logDir, topicPartition.toString());
        Files.createDirectory(file.toPath(), new FileAttribute[0]);
        File file2 = new File(file, "txn-index" + UnifiedLog.TxnIndexFileSuffix());
        file2.createNewFile();
        Mockito.when(this.remoteStorageManager.fetchIndex((RemoteLogSegmentMetadata) ArgumentMatchers.any(RemoteLogSegmentMetadata.class), (RemoteStorageManager.IndexType) ArgumentMatchers.any(RemoteStorageManager.IndexType.class))).thenAnswer(invocationOnMock -> {
            RemoteLogSegmentMetadata remoteLogSegmentMetadata2 = (RemoteLogSegmentMetadata) invocationOnMock.getArgument(0);
            RemoteStorageManager.IndexType indexType = (RemoteStorageManager.IndexType) invocationOnMock.getArgument(1);
            int endOffset = (int) (remoteLogSegmentMetadata2.endOffset() - remoteLogSegmentMetadata2.startOffset());
            OffsetIndex offsetIndex = new OffsetIndex(new File(file, remoteLogSegmentMetadata2.startOffset() + UnifiedLog.IndexFileSuffix()), remoteLogSegmentMetadata2.startOffset(), endOffset * 8);
            TimeIndex timeIndex = new TimeIndex(new File(file, remoteLogSegmentMetadata2.startOffset() + UnifiedLog.TimeIndexFileSuffix()), remoteLogSegmentMetadata2.startOffset(), endOffset * 12);
            switch (AnonymousClass12.$SwitchMap$org$apache$kafka$server$log$remote$storage$RemoteStorageManager$IndexType[indexType.ordinal()]) {
                case 1:
                    return Files.newInputStream(offsetIndex.file().toPath(), new OpenOption[0]);
                case 2:
                    return Files.newInputStream(timeIndex.file().toPath(), new OpenOption[0]);
                case 3:
                    return Files.newInputStream(file2.toPath(), new OpenOption[0]);
                default:
                    return null;
            }
        });
        Mockito.when(this.remoteLogMetadataManager.listRemoteLogSegments((TopicIdPartition) ArgumentMatchers.eq(this.leaderTopicIdPartition), ArgumentMatchers.anyInt())).thenAnswer(invocationOnMock2 -> {
            return ((Integer) invocationOnMock2.getArgument(1)).intValue() == i ? Collections.singleton(remoteLogSegmentMetadata).iterator() : Collections.emptyIterator();
        });
        Mockito.when(this.remoteStorageManager.fetchLogSegment(remoteLogSegmentMetadata, 0)).thenAnswer(invocationOnMock3 -> {
            return new ByteArrayInputStream(records(j, j2, i).buffer().array());
        });
        Mockito.when(Long.valueOf(this.mockLog.logEndOffset())).thenReturn(600L);
        this.remoteLogManager.onLeadershipChange(Collections.singleton(mockPartition(this.leaderTopicIdPartition)), Collections.emptySet(), this.topicIds);
    }

    @Test
    void testIdempotentClose() throws IOException {
        this.remoteLogManager.close();
        this.remoteLogManager.close();
        InOrder inOrder = Mockito.inOrder(new Object[]{this.remoteStorageManager, this.remoteLogMetadataManager});
        ((RemoteStorageManager) inOrder.verify(this.remoteStorageManager, Mockito.times(1))).close();
        ((RemoteLogMetadataManager) inOrder.verify(this.remoteLogMetadataManager, Mockito.times(1))).close();
    }

    @Test
    public void testRemoveMetricsOnClose() throws IOException {
        MockedConstruction mockConstruction = Mockito.mockConstruction(KafkaMetricsGroup.class);
        try {
            new RemoteLogManager(this.config, 0, this.logDir, "dummyId", this.time, topicPartition -> {
                return Optional.of(this.mockLog);
            }, (topicPartition2, l) -> {
            }, this.brokerTopicStats, this.metrics) { // from class: kafka.log.remote.RemoteLogManagerTest.4
                public RemoteStorageManager createRemoteStorageManager() {
                    return RemoteLogManagerTest.this.remoteStorageManager;
                }

                public RemoteLogMetadataManager createRemoteLogMetadataManager() {
                    return RemoteLogManagerTest.this.remoteLogMetadataManager;
                }
            }.close();
            KafkaMetricsGroup kafkaMetricsGroup = (KafkaMetricsGroup) mockConstruction.constructed().get(0);
            KafkaMetricsGroup kafkaMetricsGroup2 = (KafkaMetricsGroup) mockConstruction.constructed().get(1);
            List asList = Arrays.asList(RemoteStorageMetrics.REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC, RemoteStorageMetrics.REMOTE_LOG_READER_FETCH_RATE_AND_TIME_METRIC);
            Set set = RemoteStorageMetrics.REMOTE_STORAGE_THREAD_POOL_METRICS;
            ((KafkaMetricsGroup) Mockito.verify(kafkaMetricsGroup, Mockito.times(1))).newGauge((MetricName) ArgumentMatchers.any(MetricName.class), (Gauge) ArgumentMatchers.any());
            ((KafkaMetricsGroup) Mockito.verify(kafkaMetricsGroup, Mockito.times(1))).newTimer((MetricName) ArgumentMatchers.any(MetricName.class), (TimeUnit) ArgumentMatchers.any(), (TimeUnit) ArgumentMatchers.any());
            asList.forEach(metricName -> {
                ((KafkaMetricsGroup) Mockito.verify(kafkaMetricsGroup)).removeMetric(metricName);
            });
            ((KafkaMetricsGroup) Mockito.verify(kafkaMetricsGroup2, Mockito.times(set.size()))).newGauge(ArgumentMatchers.anyString(), (Gauge) ArgumentMatchers.any());
            set.forEach(str -> {
                ((KafkaMetricsGroup) Mockito.verify(kafkaMetricsGroup2)).removeMetric(str);
            });
            Mockito.verifyNoMoreInteractions(new Object[]{kafkaMetricsGroup});
            Mockito.verifyNoMoreInteractions(new Object[]{kafkaMetricsGroup2});
            mockConstruction.close();
        } catch (Throwable th) {
            mockConstruction.close();
            throw th;
        }
    }

    private static RemoteLogSegmentMetadata createRemoteLogSegmentMetadata(long j, long j2, Map<Integer, Long> map) {
        return new RemoteLogSegmentMetadata(new RemoteLogSegmentId(new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("topic", 0)), Uuid.randomUuid()), j, j2, 100000L, 1, 100000L, 1000, Optional.empty(), RemoteLogSegmentState.COPY_SEGMENT_FINISHED, map);
    }

    @Test
    public void testBuildFilteredLeaderEpochMap() {
        TreeMap treeMap = new TreeMap();
        treeMap.put(0, 0L);
        treeMap.put(1, 0L);
        treeMap.put(2, 0L);
        treeMap.put(3, 30L);
        treeMap.put(4, 40L);
        treeMap.put(5, 60L);
        treeMap.put(6, 60L);
        treeMap.put(7, 70L);
        treeMap.put(8, 70L);
        TreeMap treeMap2 = new TreeMap();
        treeMap2.put(2, 0L);
        treeMap2.put(3, 30L);
        treeMap2.put(4, 40L);
        treeMap2.put(6, 60L);
        treeMap2.put(8, 70L);
        Assertions.assertEquals(treeMap2, RemoteLogManager.buildFilteredLeaderEpochMap(treeMap));
    }

    @Test
    public void testRemoteSegmentWithinLeaderEpochs() {
        TreeMap treeMap = new TreeMap();
        treeMap.put(0, 0L);
        treeMap.put(1, 10L);
        treeMap.put(2, 20L);
        treeMap.put(3, 30L);
        treeMap.put(4, 40L);
        treeMap.put(5, 50L);
        treeMap.put(7, 70L);
        TreeMap treeMap2 = new TreeMap();
        treeMap2.put(1, 15L);
        treeMap2.put(2, 20L);
        treeMap2.put(3, 30L);
        Assertions.assertTrue(RemoteLogManager.isRemoteSegmentWithinLeaderEpochs(createRemoteLogSegmentMetadata(15L, 35L, treeMap2), 90L, treeMap));
        TreeMap treeMap3 = new TreeMap();
        treeMap3.put(1, 15L);
        Assertions.assertTrue(RemoteLogManager.isRemoteSegmentWithinLeaderEpochs(createRemoteLogSegmentMetadata(15L, 19L, treeMap3), 90L, treeMap));
        TreeMap treeMap4 = new TreeMap();
        treeMap4.put(0, 0L);
        Assertions.assertTrue(RemoteLogManager.isRemoteSegmentWithinLeaderEpochs(createRemoteLogSegmentMetadata(0L, 5L, treeMap4), 90L, treeMap));
        TreeMap treeMap5 = new TreeMap();
        treeMap5.put(7, 70L);
        Assertions.assertTrue(RemoteLogManager.isRemoteSegmentWithinLeaderEpochs(createRemoteLogSegmentMetadata(70L, 75L, treeMap5), 90L, treeMap));
        TreeMap treeMap6 = new TreeMap();
        treeMap6.put(1, 15L);
        treeMap6.put(2, 20L);
        Assertions.assertTrue(RemoteLogManager.isRemoteSegmentWithinLeaderEpochs(createRemoteLogSegmentMetadata(15L, 29L, treeMap6), 90L, treeMap));
        TreeMap treeMap7 = new TreeMap();
        treeMap7.put(5, 55L);
        treeMap7.put(6, 60L);
        treeMap7.put(7, 70L);
        Assertions.assertFalse(RemoteLogManager.isRemoteSegmentWithinLeaderEpochs(createRemoteLogSegmentMetadata(55L, 85L, treeMap7), 90L, treeMap));
        TreeMap treeMap8 = new TreeMap();
        treeMap8.put(1, 15L);
        treeMap8.put(2, 20L);
        treeMap8.put(4, 40L);
        Assertions.assertFalse(RemoteLogManager.isRemoteSegmentWithinLeaderEpochs(createRemoteLogSegmentMetadata(15L, 45L, treeMap8), 90L, treeMap));
        Assertions.assertFalse(RemoteLogManager.isRemoteSegmentWithinLeaderEpochs(createRemoteLogSegmentMetadata(15L, 95L, treeMap), 90L, treeMap));
        Assertions.assertFalse(RemoteLogManager.isRemoteSegmentWithinLeaderEpochs(createRemoteLogSegmentMetadata(15L, 90L, treeMap), 90L, treeMap));
        TreeMap treeMap9 = new TreeMap();
        treeMap9.put(1, 5L);
        treeMap9.put(2, 20L);
        Assertions.assertFalse(RemoteLogManager.isRemoteSegmentWithinLeaderEpochs(createRemoteLogSegmentMetadata(5L, 25L, treeMap9), 90L, treeMap));
        TreeMap treeMap10 = new TreeMap();
        treeMap10.put(1, 15L);
        treeMap10.put(2, 20L);
        Assertions.assertFalse(RemoteLogManager.isRemoteSegmentWithinLeaderEpochs(createRemoteLogSegmentMetadata(15L, 35L, treeMap10), 90L, treeMap));
    }

    @Test
    public void testRemoteSegmentWithinLeaderEpochsForOverlappingSegments() {
        TreeMap treeMap = new TreeMap();
        treeMap.put(7, 51L);
        treeMap.put(9, 100L);
        TreeMap treeMap2 = new TreeMap();
        treeMap2.put(5, 14L);
        treeMap2.put(7, 15L);
        treeMap2.put(9, 100L);
        Assertions.assertTrue(RemoteLogManager.isRemoteSegmentWithinLeaderEpochs(createRemoteLogSegmentMetadata(14L, 150L, treeMap2), 210L, treeMap));
        TreeMap treeMap3 = new TreeMap();
        treeMap3.put(2, 5L);
        treeMap3.put(3, 6L);
        Assertions.assertFalse(RemoteLogManager.isRemoteSegmentWithinLeaderEpochs(createRemoteLogSegmentMetadata(2L, 7L, treeMap3), 210L, treeMap));
        TreeMap treeMap4 = new TreeMap();
        treeMap4.put(7, 15L);
        treeMap4.put(9, 100L);
        treeMap4.put(10, 200L);
        Assertions.assertFalse(RemoteLogManager.isRemoteSegmentWithinLeaderEpochs(createRemoteLogSegmentMetadata(15L, 250L, treeMap4), 210L, treeMap));
        TreeMap treeMap5 = new TreeMap();
        treeMap5.put(8, 75L);
        Assertions.assertFalse(RemoteLogManager.isRemoteSegmentWithinLeaderEpochs(createRemoteLogSegmentMetadata(75L, 100L, treeMap5), 210L, treeMap));
        TreeMap treeMap6 = new TreeMap();
        treeMap6.put(7, 15L);
        treeMap6.put(9, 101L);
        Assertions.assertFalse(RemoteLogManager.isRemoteSegmentWithinLeaderEpochs(createRemoteLogSegmentMetadata(15L, 150L, treeMap6), 210L, treeMap));
        TreeMap treeMap7 = new TreeMap();
        treeMap7.put(9, 99L);
        Assertions.assertFalse(RemoteLogManager.isRemoteSegmentWithinLeaderEpochs(createRemoteLogSegmentMetadata(99L, 150L, treeMap7), 210L, treeMap));
    }

    @Test
    public void testCandidateLogSegmentsSkipsActiveSegment() {
        UnifiedLog unifiedLog = (UnifiedLog) Mockito.mock(UnifiedLog.class);
        LogSegment logSegment = (LogSegment) Mockito.mock(LogSegment.class);
        LogSegment logSegment2 = (LogSegment) Mockito.mock(LogSegment.class);
        LogSegment logSegment3 = (LogSegment) Mockito.mock(LogSegment.class);
        Mockito.when(Long.valueOf(logSegment.baseOffset())).thenReturn(5L);
        Mockito.when(Long.valueOf(logSegment2.baseOffset())).thenReturn(10L);
        Mockito.when(Long.valueOf(logSegment3.baseOffset())).thenReturn(15L);
        Mockito.when(unifiedLog.logSegments(5L, Long.MAX_VALUE)).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(logSegment, logSegment2, logSegment3)));
        RemoteLogManager remoteLogManager = this.remoteLogManager;
        remoteLogManager.getClass();
        Assertions.assertEquals(Arrays.asList(new RemoteLogManager.EnrichedLogSegment(logSegment, 10L), new RemoteLogManager.EnrichedLogSegment(logSegment2, 15L)), new RemoteLogManager.RLMTask(remoteLogManager, this.leaderTopicIdPartition, 128).candidateLogSegments(unifiedLog, 5L, 20L));
    }

    @Test
    public void testCandidateLogSegmentsSkipsSegmentsAfterLastStableOffset() {
        UnifiedLog unifiedLog = (UnifiedLog) Mockito.mock(UnifiedLog.class);
        LogSegment logSegment = (LogSegment) Mockito.mock(LogSegment.class);
        LogSegment logSegment2 = (LogSegment) Mockito.mock(LogSegment.class);
        LogSegment logSegment3 = (LogSegment) Mockito.mock(LogSegment.class);
        LogSegment logSegment4 = (LogSegment) Mockito.mock(LogSegment.class);
        Mockito.when(Long.valueOf(logSegment.baseOffset())).thenReturn(5L);
        Mockito.when(Long.valueOf(logSegment2.baseOffset())).thenReturn(10L);
        Mockito.when(Long.valueOf(logSegment3.baseOffset())).thenReturn(15L);
        Mockito.when(Long.valueOf(logSegment4.baseOffset())).thenReturn(20L);
        Mockito.when(unifiedLog.logSegments(5L, Long.MAX_VALUE)).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(logSegment, logSegment2, logSegment3, logSegment4)));
        RemoteLogManager remoteLogManager = this.remoteLogManager;
        remoteLogManager.getClass();
        Assertions.assertEquals(Arrays.asList(new RemoteLogManager.EnrichedLogSegment(logSegment, 10L), new RemoteLogManager.EnrichedLogSegment(logSegment2, 15L)), new RemoteLogManager.RLMTask(remoteLogManager, this.leaderTopicIdPartition, 128).candidateLogSegments(unifiedLog, 5L, 15L));
    }

    @Test
    public void testRemoteSizeData() {
        for (Supplier supplier : new Supplier[]{() -> {
            return new RemoteLogManager.RetentionSizeData(10L, 0L);
        }, () -> {
            return new RemoteLogManager.RetentionSizeData(10L, -1L);
        }, () -> {
            return new RemoteLogManager.RetentionSizeData(-1L, 10L);
        }, () -> {
            return new RemoteLogManager.RetentionSizeData(-1L, -1L);
        }, () -> {
            return new RemoteLogManager.RetentionSizeData(-1L, 0L);
        }}) {
            supplier.getClass();
            Assertions.assertThrows(IllegalArgumentException.class, supplier::get);
        }
    }

    @Test
    public void testRemoteSizeTime() {
        for (Supplier supplier : new Supplier[]{() -> {
            return new RemoteLogManager.RetentionTimeData(-1L, 10L);
        }, () -> {
            return new RemoteLogManager.RetentionTimeData(10L, -1L);
        }}) {
            supplier.getClass();
            Assertions.assertThrows(IllegalArgumentException.class, supplier::get);
        }
    }

    @Test
    public void testStopPartitionsWithoutDeletion() throws RemoteStorageException {
        this.remoteLogManager.startup();
        BiConsumer biConsumer = (topicPartition, th) -> {
            Assertions.fail("shouldn't be called");
        };
        HashSet hashSet = new HashSet();
        hashSet.add(new StopPartition(this.leaderTopicIdPartition.topicPartition(), true, false));
        hashSet.add(new StopPartition(this.followerTopicIdPartition.topicPartition(), true, false));
        this.remoteLogManager.onLeadershipChange(Collections.singleton(mockPartition(this.leaderTopicIdPartition)), Collections.singleton(mockPartition(this.followerTopicIdPartition)), this.topicIds);
        Assertions.assertNotNull(this.remoteLogManager.task(this.leaderTopicIdPartition));
        Assertions.assertNotNull(this.remoteLogManager.task(this.followerTopicIdPartition));
        this.remoteLogManager.stopPartitions(hashSet, biConsumer);
        Assertions.assertNull(this.remoteLogManager.task(this.leaderTopicIdPartition));
        Assertions.assertNull(this.remoteLogManager.task(this.followerTopicIdPartition));
        ((RemoteLogMetadataManager) Mockito.verify(this.remoteLogMetadataManager, Mockito.times(1))).onStopPartitions((Set) ArgumentMatchers.any());
        ((RemoteStorageManager) Mockito.verify(this.remoteStorageManager, Mockito.times(0))).deleteLogSegmentData((RemoteLogSegmentMetadata) ArgumentMatchers.any());
        ((RemoteLogMetadataManager) Mockito.verify(this.remoteLogMetadataManager, Mockito.times(0))).updateRemoteLogSegmentMetadata((RemoteLogSegmentMetadataUpdate) ArgumentMatchers.any());
    }

    @Test
    public void testStopPartitionsWithDeletion() throws RemoteStorageException {
        this.remoteLogManager.startup();
        BiConsumer biConsumer = (topicPartition, th) -> {
            Assertions.fail("shouldn't be called: " + th);
        };
        HashSet hashSet = new HashSet();
        hashSet.add(new StopPartition(this.leaderTopicIdPartition.topicPartition(), true, true));
        hashSet.add(new StopPartition(this.followerTopicIdPartition.topicPartition(), true, true));
        this.remoteLogManager.onLeadershipChange(Collections.singleton(mockPartition(this.leaderTopicIdPartition)), Collections.singleton(mockPartition(this.followerTopicIdPartition)), this.topicIds);
        Assertions.assertNotNull(this.remoteLogManager.task(this.leaderTopicIdPartition));
        Assertions.assertNotNull(this.remoteLogManager.task(this.followerTopicIdPartition));
        Mockito.when(this.remoteLogMetadataManager.listRemoteLogSegments((TopicIdPartition) ArgumentMatchers.eq(this.leaderTopicIdPartition))).thenReturn(listRemoteLogSegmentMetadata(this.leaderTopicIdPartition, 5, 100, 1024, RemoteLogSegmentState.DELETE_SEGMENT_FINISHED).iterator());
        Mockito.when(this.remoteLogMetadataManager.listRemoteLogSegments((TopicIdPartition) ArgumentMatchers.eq(this.followerTopicIdPartition))).thenReturn(listRemoteLogSegmentMetadata(this.followerTopicIdPartition, 3, 100, 1024, RemoteLogSegmentState.DELETE_SEGMENT_FINISHED).iterator());
        CompletableFuture completableFuture = new CompletableFuture();
        completableFuture.complete(null);
        Mockito.when(this.remoteLogMetadataManager.updateRemoteLogSegmentMetadata((RemoteLogSegmentMetadataUpdate) ArgumentMatchers.any())).thenReturn(completableFuture);
        this.remoteLogManager.stopPartitions(hashSet, biConsumer);
        Assertions.assertNull(this.remoteLogManager.task(this.leaderTopicIdPartition));
        Assertions.assertNull(this.remoteLogManager.task(this.followerTopicIdPartition));
        ((RemoteLogMetadataManager) Mockito.verify(this.remoteLogMetadataManager, Mockito.times(1))).onStopPartitions((Set) ArgumentMatchers.any());
        ((RemoteStorageManager) Mockito.verify(this.remoteStorageManager, Mockito.times(8))).deleteLogSegmentData((RemoteLogSegmentMetadata) ArgumentMatchers.any());
        ((RemoteLogMetadataManager) Mockito.verify(this.remoteLogMetadataManager, Mockito.times(16))).updateRemoteLogSegmentMetadata((RemoteLogSegmentMetadataUpdate) ArgumentMatchers.any());
    }

    @Test
    public void testFindLogStartOffset() throws RemoteStorageException, IOException {
        ArrayList arrayList = new ArrayList();
        arrayList.add(new EpochEntry(0, 0L));
        arrayList.add(new EpochEntry(1, 250L));
        arrayList.add(new EpochEntry(2, 550L));
        this.checkpoint.write(arrayList);
        Mockito.when(this.mockLog.leaderEpochCache()).thenReturn(Option.apply(new LeaderEpochFileCache(this.leaderTopicIdPartition.topicPartition(), this.checkpoint, this.scheduler)));
        long milliseconds = this.time.milliseconds();
        List asList = Arrays.asList(new RemoteLogSegmentMetadata(new RemoteLogSegmentId(this.leaderTopicIdPartition, Uuid.randomUuid()), 500L, 539L, milliseconds, 0, milliseconds, 1024, truncateAndGetLeaderEpochs(arrayList, 500L, 539L)), new RemoteLogSegmentMetadata(new RemoteLogSegmentId(this.leaderTopicIdPartition, Uuid.randomUuid()), 540L, 700L, milliseconds, 0, milliseconds, 1024, truncateAndGetLeaderEpochs(arrayList, 540L, 700L)));
        Mockito.when(this.remoteLogMetadataManager.listRemoteLogSegments((TopicIdPartition) ArgumentMatchers.eq(this.leaderTopicIdPartition), ArgumentMatchers.anyInt())).thenAnswer(invocationOnMock -> {
            return ((Integer) invocationOnMock.getArgument(1)).intValue() == 1 ? asList.iterator() : Collections.emptyIterator();
        });
        RemoteLogManager remoteLogManager = new RemoteLogManager(this.config, 0, this.logDir, "dummyId", this.time, topicPartition -> {
            return Optional.of(this.mockLog);
        }, (topicPartition2, l) -> {
        }, this.brokerTopicStats, this.metrics) { // from class: kafka.log.remote.RemoteLogManagerTest.5
            public RemoteLogMetadataManager createRemoteLogMetadataManager() {
                return RemoteLogManagerTest.this.remoteLogMetadataManager;
            }
        };
        Throwable th = null;
        try {
            try {
                Assertions.assertEquals(500L, remoteLogManager.findLogStartOffset(this.leaderTopicIdPartition, this.mockLog));
                if (remoteLogManager != null) {
                    if (0 == 0) {
                        remoteLogManager.close();
                        return;
                    }
                    try {
                        remoteLogManager.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (remoteLogManager != null) {
                if (th != null) {
                    try {
                        remoteLogManager.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    remoteLogManager.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testFindLogStartOffsetFallbackToLocalLogStartOffsetWhenRemoteIsEmpty() throws RemoteStorageException, IOException {
        ArrayList arrayList = new ArrayList();
        arrayList.add(new EpochEntry(1, 250L));
        arrayList.add(new EpochEntry(2, 550L));
        this.checkpoint.write(arrayList);
        Mockito.when(this.mockLog.leaderEpochCache()).thenReturn(Option.apply(new LeaderEpochFileCache(this.leaderTopicIdPartition.topicPartition(), this.checkpoint, this.scheduler)));
        Mockito.when(Long.valueOf(this.mockLog.localLogStartOffset())).thenReturn(250L);
        Mockito.when(this.remoteLogMetadataManager.listRemoteLogSegments((TopicIdPartition) ArgumentMatchers.eq(this.leaderTopicIdPartition), ArgumentMatchers.anyInt())).thenReturn(Collections.emptyIterator());
        RemoteLogManager remoteLogManager = new RemoteLogManager(this.config, 0, this.logDir, "dummyId", this.time, topicPartition -> {
            return Optional.of(this.mockLog);
        }, (topicPartition2, l) -> {
        }, this.brokerTopicStats, this.metrics) { // from class: kafka.log.remote.RemoteLogManagerTest.6
            public RemoteLogMetadataManager createRemoteLogMetadataManager() {
                return RemoteLogManagerTest.this.remoteLogMetadataManager;
            }
        };
        Throwable th = null;
        try {
            try {
                Assertions.assertEquals(250L, remoteLogManager.findLogStartOffset(this.leaderTopicIdPartition, this.mockLog));
                if (remoteLogManager != null) {
                    if (0 == 0) {
                        remoteLogManager.close();
                        return;
                    }
                    try {
                        remoteLogManager.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (remoteLogManager != null) {
                if (th != null) {
                    try {
                        remoteLogManager.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    remoteLogManager.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testLogStartOffsetUpdatedOnStartup() throws RemoteStorageException, IOException, InterruptedException {
        ArrayList arrayList = new ArrayList();
        arrayList.add(new EpochEntry(1, 250L));
        arrayList.add(new EpochEntry(2, 550L));
        this.checkpoint.write(arrayList);
        Mockito.when(this.mockLog.leaderEpochCache()).thenReturn(Option.apply(new LeaderEpochFileCache(this.leaderTopicIdPartition.topicPartition(), this.checkpoint, this.scheduler)));
        RemoteLogSegmentMetadata remoteLogSegmentMetadata = (RemoteLogSegmentMetadata) Mockito.mock(RemoteLogSegmentMetadata.class);
        Mockito.when(Long.valueOf(remoteLogSegmentMetadata.startOffset())).thenReturn(600L);
        Mockito.when(this.remoteLogMetadataManager.listRemoteLogSegments((TopicIdPartition) ArgumentMatchers.eq(this.leaderTopicIdPartition), ArgumentMatchers.anyInt())).thenAnswer(invocationOnMock -> {
            return ((Integer) invocationOnMock.getArgument(1)).intValue() == 2 ? Collections.singletonList(remoteLogSegmentMetadata).iterator() : Collections.emptyIterator();
        });
        AtomicLong atomicLong = new AtomicLong(0L);
        RemoteLogManager remoteLogManager = new RemoteLogManager(this.config, 0, this.logDir, "dummyId", this.time, topicPartition -> {
            return Optional.of(this.mockLog);
        }, (topicPartition2, l) -> {
            atomicLong.set(l.longValue());
        }, this.brokerTopicStats, this.metrics) { // from class: kafka.log.remote.RemoteLogManagerTest.7
            public RemoteLogMetadataManager createRemoteLogMetadataManager() {
                return RemoteLogManagerTest.this.remoteLogMetadataManager;
            }
        };
        Throwable th = null;
        try {
            try {
                remoteLogManager.getClass();
                RemoteLogManager.RLMTask rLMTask = new RemoteLogManager.RLMTask(remoteLogManager, this.leaderTopicIdPartition, 128);
                rLMTask.convertToLeader(4);
                rLMTask.copyLogSegmentsToRemote(this.mockLog);
                Assertions.assertEquals(600L, atomicLong.get());
                if (remoteLogManager != null) {
                    if (0 == 0) {
                        remoteLogManager.close();
                        return;
                    }
                    try {
                        remoteLogManager.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (remoteLogManager != null) {
                if (th != null) {
                    try {
                        remoteLogManager.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    remoteLogManager.close();
                }
            }
            throw th4;
        }
    }

    @ParameterizedTest(name = "testDeletionOnRetentionBreachedSegments retentionSize={0} retentionMs={1}")
    @CsvSource({"0, -1", "-1, 0"})
    public void testDeletionOnRetentionBreachedSegments(long j, long j2) throws RemoteStorageException, ExecutionException, InterruptedException {
        HashMap hashMap = new HashMap();
        hashMap.put("retention.bytes", Long.valueOf(j));
        hashMap.put("retention.ms", Long.valueOf(j2));
        Mockito.when(this.mockLog.config()).thenReturn(new LogConfig(hashMap));
        List<EpochEntry> singletonList = Collections.singletonList(this.epochEntry0);
        this.checkpoint.write(singletonList);
        Mockito.when(this.mockLog.leaderEpochCache()).thenReturn(Option.apply(new LeaderEpochFileCache(this.tp, this.checkpoint, this.scheduler)));
        Mockito.when(this.mockLog.topicPartition()).thenReturn(this.leaderTopicIdPartition.topicPartition());
        Mockito.when(Long.valueOf(this.mockLog.logEndOffset())).thenReturn(200L);
        List<RemoteLogSegmentMetadata> listRemoteLogSegmentMetadata = listRemoteLogSegmentMetadata(this.leaderTopicIdPartition, 2, 100, 1024, singletonList, RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
        Mockito.when(this.remoteLogMetadataManager.listRemoteLogSegments(this.leaderTopicIdPartition)).thenReturn(listRemoteLogSegmentMetadata.iterator());
        Mockito.when(this.remoteLogMetadataManager.listRemoteLogSegments(this.leaderTopicIdPartition, 0)).thenAnswer(invocationOnMock -> {
            return listRemoteLogSegmentMetadata.iterator();
        });
        Mockito.when(this.remoteLogMetadataManager.updateRemoteLogSegmentMetadata((RemoteLogSegmentMetadataUpdate) ArgumentMatchers.any(RemoteLogSegmentMetadataUpdate.class))).thenReturn(CompletableFuture.runAsync(() -> {
        }));
        Assertions.assertEquals(0L, this.brokerTopicStats.topicStats(this.leaderTopicIdPartition.topic()).remoteDeleteRequestRate().count());
        Assertions.assertEquals(0L, this.brokerTopicStats.topicStats(this.leaderTopicIdPartition.topic()).failedRemoteDeleteRequestRate().count());
        Assertions.assertEquals(0L, this.brokerTopicStats.allTopicsStats().remoteDeleteRequestRate().count());
        Assertions.assertEquals(0L, this.brokerTopicStats.allTopicsStats().failedRemoteDeleteRequestRate().count());
        RemoteLogManager remoteLogManager = this.remoteLogManager;
        remoteLogManager.getClass();
        RemoteLogManager.RLMTask rLMTask = new RemoteLogManager.RLMTask(remoteLogManager, this.leaderTopicIdPartition, 128);
        rLMTask.convertToLeader(0);
        rLMTask.cleanupExpiredRemoteLogSegments();
        Assertions.assertEquals(200L, this.currentLogStartOffset.get());
        ((RemoteStorageManager) Mockito.verify(this.remoteStorageManager)).deleteLogSegmentData(listRemoteLogSegmentMetadata.get(0));
        ((RemoteStorageManager) Mockito.verify(this.remoteStorageManager)).deleteLogSegmentData(listRemoteLogSegmentMetadata.get(1));
        Assertions.assertEquals(2L, this.brokerTopicStats.topicStats(this.leaderTopicIdPartition.topic()).remoteDeleteRequestRate().count());
        Assertions.assertEquals(0L, this.brokerTopicStats.topicStats(this.leaderTopicIdPartition.topic()).failedRemoteDeleteRequestRate().count());
        Assertions.assertEquals(2L, this.brokerTopicStats.allTopicsStats().remoteDeleteRequestRate().count());
        Assertions.assertEquals(0L, this.brokerTopicStats.allTopicsStats().failedRemoteDeleteRequestRate().count());
    }

    @ParameterizedTest(name = "testDeletionOnOverlappingRetentionBreachedSegments retentionSize={0} retentionMs={1}")
    @CsvSource({"0, -1", "-1, 0"})
    public void testDeletionOnOverlappingRetentionBreachedSegments(long j, long j2) throws RemoteStorageException, ExecutionException, InterruptedException {
        HashMap hashMap = new HashMap();
        hashMap.put("retention.bytes", Long.valueOf(j));
        hashMap.put("retention.ms", Long.valueOf(j2));
        Mockito.when(this.mockLog.config()).thenReturn(new LogConfig(hashMap));
        List<EpochEntry> singletonList = Collections.singletonList(this.epochEntry0);
        this.checkpoint.write(singletonList);
        Mockito.when(this.mockLog.leaderEpochCache()).thenReturn(Option.apply(new LeaderEpochFileCache(this.tp, this.checkpoint, this.scheduler)));
        Mockito.when(this.mockLog.topicPartition()).thenReturn(this.leaderTopicIdPartition.topicPartition());
        Mockito.when(Long.valueOf(this.mockLog.logEndOffset())).thenReturn(200L);
        RemoteLogSegmentMetadata remoteLogSegmentMetadata = listRemoteLogSegmentMetadata(this.leaderTopicIdPartition, 1, 100, 1024, singletonList, RemoteLogSegmentState.COPY_SEGMENT_FINISHED).get(0);
        RemoteLogSegmentMetadata remoteLogSegmentMetadata2 = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(this.leaderTopicIdPartition, Uuid.randomUuid()), remoteLogSegmentMetadata.startOffset(), remoteLogSegmentMetadata.endOffset() + 5, remoteLogSegmentMetadata.maxTimestampMs(), remoteLogSegmentMetadata.brokerId() + 1, remoteLogSegmentMetadata.eventTimestampMs(), remoteLogSegmentMetadata.segmentSizeInBytes() + 128, remoteLogSegmentMetadata.customMetadata(), remoteLogSegmentMetadata.state(), remoteLogSegmentMetadata.segmentLeaderEpochs());
        ArrayList arrayList = new ArrayList();
        arrayList.add(remoteLogSegmentMetadata2);
        arrayList.add(remoteLogSegmentMetadata);
        Mockito.when(this.remoteLogMetadataManager.listRemoteLogSegments(this.leaderTopicIdPartition)).thenReturn(arrayList.iterator());
        Mockito.when(this.remoteLogMetadataManager.listRemoteLogSegments(this.leaderTopicIdPartition, 0)).thenAnswer(invocationOnMock -> {
            return arrayList.iterator();
        });
        Mockito.when(this.remoteLogMetadataManager.updateRemoteLogSegmentMetadata((RemoteLogSegmentMetadataUpdate) ArgumentMatchers.any(RemoteLogSegmentMetadataUpdate.class))).thenReturn(CompletableFuture.runAsync(() -> {
        }));
        Assertions.assertEquals(0L, this.brokerTopicStats.topicStats(this.leaderTopicIdPartition.topic()).remoteDeleteRequestRate().count());
        Assertions.assertEquals(0L, this.brokerTopicStats.topicStats(this.leaderTopicIdPartition.topic()).failedRemoteDeleteRequestRate().count());
        Assertions.assertEquals(0L, this.brokerTopicStats.allTopicsStats().remoteDeleteRequestRate().count());
        Assertions.assertEquals(0L, this.brokerTopicStats.allTopicsStats().failedRemoteDeleteRequestRate().count());
        RemoteLogManager remoteLogManager = this.remoteLogManager;
        remoteLogManager.getClass();
        RemoteLogManager.RLMTask rLMTask = new RemoteLogManager.RLMTask(remoteLogManager, this.leaderTopicIdPartition, 128);
        rLMTask.convertToLeader(0);
        rLMTask.cleanupExpiredRemoteLogSegments();
        Assertions.assertEquals(remoteLogSegmentMetadata2.endOffset() + 1, this.currentLogStartOffset.get());
        ((RemoteStorageManager) Mockito.verify(this.remoteStorageManager)).deleteLogSegmentData((RemoteLogSegmentMetadata) arrayList.get(0));
        ((RemoteStorageManager) Mockito.verify(this.remoteStorageManager)).deleteLogSegmentData((RemoteLogSegmentMetadata) arrayList.get(1));
        Assertions.assertEquals(2L, this.brokerTopicStats.topicStats(this.leaderTopicIdPartition.topic()).remoteDeleteRequestRate().count());
        Assertions.assertEquals(0L, this.brokerTopicStats.topicStats(this.leaderTopicIdPartition.topic()).failedRemoteDeleteRequestRate().count());
        Assertions.assertEquals(2L, this.brokerTopicStats.allTopicsStats().remoteDeleteRequestRate().count());
        Assertions.assertEquals(0L, this.brokerTopicStats.allTopicsStats().failedRemoteDeleteRequestRate().count());
    }

    @ParameterizedTest(name = "testRemoteDeleteLagsOnRetentionBreachedSegments retentionSize={0} retentionMs={1}")
    @CsvSource({"0, -1", "-1, 0"})
    public void testRemoteDeleteLagsOnRetentionBreachedSegments(long j, long j2) throws RemoteStorageException, ExecutionException, InterruptedException {
        HashMap hashMap = new HashMap();
        hashMap.put("retention.bytes", Long.valueOf(j));
        hashMap.put("retention.ms", Long.valueOf(j2));
        Mockito.when(this.mockLog.config()).thenReturn(new LogConfig(hashMap));
        List<EpochEntry> singletonList = Collections.singletonList(this.epochEntry0);
        this.checkpoint.write(singletonList);
        Mockito.when(this.mockLog.leaderEpochCache()).thenReturn(Option.apply(new LeaderEpochFileCache(this.tp, this.checkpoint, this.scheduler)));
        Mockito.when(this.mockLog.topicPartition()).thenReturn(this.leaderTopicIdPartition.topicPartition());
        Mockito.when(Long.valueOf(this.mockLog.logEndOffset())).thenReturn(200L);
        List<RemoteLogSegmentMetadata> listRemoteLogSegmentMetadata = listRemoteLogSegmentMetadata(this.leaderTopicIdPartition, 2, 100, 1024, singletonList, RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
        Mockito.when(this.remoteLogMetadataManager.listRemoteLogSegments(this.leaderTopicIdPartition)).thenReturn(listRemoteLogSegmentMetadata.iterator());
        Mockito.when(this.remoteLogMetadataManager.listRemoteLogSegments(this.leaderTopicIdPartition, 0)).thenAnswer(invocationOnMock -> {
            return listRemoteLogSegmentMetadata.iterator();
        });
        Mockito.when(this.remoteLogMetadataManager.updateRemoteLogSegmentMetadata((RemoteLogSegmentMetadataUpdate) ArgumentMatchers.any(RemoteLogSegmentMetadataUpdate.class))).thenReturn(CompletableFuture.runAsync(() -> {
        }));
        ((RemoteStorageManager) Mockito.doAnswer(invocationOnMock2 -> {
            verifyRemoteDeleteMetrics(2048L, 2L);
            return Optional.empty();
        }).doAnswer(invocationOnMock3 -> {
            verifyRemoteDeleteMetrics(1024L, 1L);
            return Optional.empty();
        }).when(this.remoteStorageManager)).deleteLogSegmentData((RemoteLogSegmentMetadata) ArgumentMatchers.any(RemoteLogSegmentMetadata.class));
        RemoteLogManager remoteLogManager = this.remoteLogManager;
        remoteLogManager.getClass();
        RemoteLogManager.RLMTask rLMTask = new RemoteLogManager.RLMTask(remoteLogManager, this.leaderTopicIdPartition, 128);
        verifyRemoteDeleteMetrics(0L, 0L);
        rLMTask.convertToLeader(0);
        rLMTask.cleanupExpiredRemoteLogSegments();
        Assertions.assertEquals(200L, this.currentLogStartOffset.get());
        ((RemoteStorageManager) Mockito.verify(this.remoteStorageManager)).deleteLogSegmentData(listRemoteLogSegmentMetadata.get(0));
        ((RemoteStorageManager) Mockito.verify(this.remoteStorageManager)).deleteLogSegmentData(listRemoteLogSegmentMetadata.get(1));
    }

    @Test
    public void testRemoteLogSizeRetentionShouldFilterOutCopySegmentStartState() throws RemoteStorageException, ExecutionException, InterruptedException {
        HashMap hashMap = new HashMap();
        hashMap.put("retention.bytes", Long.valueOf(1024 * 10));
        hashMap.put("retention.ms", -1L);
        Mockito.when(this.mockLog.config()).thenReturn(new LogConfig(hashMap));
        List<EpochEntry> singletonList = Collections.singletonList(this.epochEntry0);
        this.checkpoint.write(singletonList);
        Mockito.when(this.mockLog.leaderEpochCache()).thenReturn(Option.apply(new LeaderEpochFileCache(this.tp, this.checkpoint, this.scheduler)));
        Mockito.when(this.mockLog.topicPartition()).thenReturn(this.leaderTopicIdPartition.topicPartition());
        Mockito.when(Long.valueOf(this.mockLog.logEndOffset())).thenReturn(2000L);
        RemoteLogSegmentMetadata createRemoteLogSegmentMetadata = createRemoteLogSegmentMetadata(new RemoteLogSegmentId(this.leaderTopicIdPartition, Uuid.randomUuid()), 0L, 99L, 1024, singletonList, RemoteLogSegmentState.COPY_SEGMENT_STARTED);
        RemoteLogSegmentMetadata createRemoteLogSegmentMetadata2 = createRemoteLogSegmentMetadata(new RemoteLogSegmentId(this.leaderTopicIdPartition, Uuid.randomUuid()), 0L, 99L, 1024, singletonList, RemoteLogSegmentState.DELETE_SEGMENT_FINISHED);
        RemoteLogSegmentMetadata createRemoteLogSegmentMetadata3 = createRemoteLogSegmentMetadata(new RemoteLogSegmentId(this.leaderTopicIdPartition, Uuid.randomUuid()), 0L, 99L, 1024, singletonList, RemoteLogSegmentState.DELETE_SEGMENT_STARTED);
        RemoteLogSegmentMetadata createRemoteLogSegmentMetadata4 = createRemoteLogSegmentMetadata(new RemoteLogSegmentId(this.leaderTopicIdPartition, Uuid.randomUuid()), 200L, 299L, 1024, singletonList, RemoteLogSegmentState.COPY_SEGMENT_STARTED);
        RemoteLogSegmentMetadata createRemoteLogSegmentMetadata5 = createRemoteLogSegmentMetadata(createRemoteLogSegmentMetadata4.remoteLogSegmentId(), createRemoteLogSegmentMetadata4.startOffset(), createRemoteLogSegmentMetadata4.endOffset(), 1024, singletonList, RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
        List<RemoteLogSegmentMetadata> listRemoteLogSegmentMetadata = listRemoteLogSegmentMetadata(this.leaderTopicIdPartition, 11, 100, 1024, singletonList, RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
        LinkedList linkedList = new LinkedList();
        linkedList.addAll(Arrays.asList(createRemoteLogSegmentMetadata, createRemoteLogSegmentMetadata2, createRemoteLogSegmentMetadata3, createRemoteLogSegmentMetadata4));
        linkedList.addAll(listRemoteLogSegmentMetadata);
        Mockito.when(this.remoteLogMetadataManager.listRemoteLogSegments(this.leaderTopicIdPartition)).thenReturn(linkedList.iterator());
        Mockito.when(this.remoteLogMetadataManager.listRemoteLogSegments(this.leaderTopicIdPartition, 0)).thenReturn(linkedList.iterator()).thenReturn(linkedList.iterator());
        Mockito.when(this.remoteLogMetadataManager.updateRemoteLogSegmentMetadata((RemoteLogSegmentMetadataUpdate) ArgumentMatchers.any(RemoteLogSegmentMetadataUpdate.class))).thenReturn(CompletableFuture.runAsync(() -> {
        }));
        ((RemoteStorageManager) Mockito.doNothing().when(this.remoteStorageManager)).deleteLogSegmentData((RemoteLogSegmentMetadata) ArgumentMatchers.any(RemoteLogSegmentMetadata.class));
        RemoteLogManager remoteLogManager = this.remoteLogManager;
        remoteLogManager.getClass();
        RemoteLogManager.RLMTask rLMTask = new RemoteLogManager.RLMTask(remoteLogManager, this.leaderTopicIdPartition, 128);
        rLMTask.convertToLeader(this.epochEntry2.epoch);
        rLMTask.cleanupExpiredRemoteLogSegments();
        ((RemoteStorageManager) Mockito.verify(this.remoteStorageManager, Mockito.times(2))).deleteLogSegmentData((RemoteLogSegmentMetadata) ArgumentMatchers.any(RemoteLogSegmentMetadata.class));
        ((RemoteStorageManager) Mockito.verify(this.remoteStorageManager)).deleteLogSegmentData(createRemoteLogSegmentMetadata);
        ((RemoteStorageManager) Mockito.verify(this.remoteStorageManager, Mockito.never())).deleteLogSegmentData(createRemoteLogSegmentMetadata2);
        ((RemoteStorageManager) Mockito.verify(this.remoteStorageManager)).deleteLogSegmentData(createRemoteLogSegmentMetadata3);
        Mockito.clearInvocations(new RemoteStorageManager[]{this.remoteStorageManager});
        LinkedList linkedList2 = new LinkedList();
        linkedList2.addAll(Arrays.asList(createRemoteLogSegmentMetadata2, createRemoteLogSegmentMetadata5));
        linkedList2.addAll(listRemoteLogSegmentMetadata);
        Mockito.when(this.remoteLogMetadataManager.listRemoteLogSegments(this.leaderTopicIdPartition)).thenReturn(linkedList2.iterator());
        Mockito.when(this.remoteLogMetadataManager.listRemoteLogSegments(this.leaderTopicIdPartition, 0)).thenAnswer(invocationOnMock -> {
            return linkedList2.iterator();
        });
        ((RemoteStorageManager) Mockito.doNothing().when(this.remoteStorageManager)).deleteLogSegmentData((RemoteLogSegmentMetadata) ArgumentMatchers.any(RemoteLogSegmentMetadata.class));
        rLMTask.cleanupExpiredRemoteLogSegments();
        ((RemoteStorageManager) Mockito.verify(this.remoteStorageManager, Mockito.times(2))).deleteLogSegmentData((RemoteLogSegmentMetadata) ArgumentMatchers.any(RemoteLogSegmentMetadata.class));
        ((RemoteStorageManager) Mockito.verify(this.remoteStorageManager)).deleteLogSegmentData(createRemoteLogSegmentMetadata5);
        ((RemoteStorageManager) Mockito.verify(this.remoteStorageManager)).deleteLogSegmentData(listRemoteLogSegmentMetadata.get(0));
    }

    @Test
    public void testDeleteRetentionMsBeingCancelledBeforeSecondDelete() throws RemoteStorageException, ExecutionException, InterruptedException {
        RemoteLogManager remoteLogManager = this.remoteLogManager;
        remoteLogManager.getClass();
        RemoteLogManager.RLMTask rLMTask = new RemoteLogManager.RLMTask(remoteLogManager, this.leaderTopicIdPartition, 128);
        rLMTask.convertToLeader(0);
        Mockito.when(this.mockLog.topicPartition()).thenReturn(this.leaderTopicIdPartition.topicPartition());
        Mockito.when(Long.valueOf(this.mockLog.logEndOffset())).thenReturn(200L);
        List<EpochEntry> singletonList = Collections.singletonList(this.epochEntry0);
        List<RemoteLogSegmentMetadata> listRemoteLogSegmentMetadata = listRemoteLogSegmentMetadata(this.leaderTopicIdPartition, 2, 100, 1024, singletonList, RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
        Mockito.when(this.remoteLogMetadataManager.listRemoteLogSegments(this.leaderTopicIdPartition)).thenReturn(listRemoteLogSegmentMetadata.iterator());
        Mockito.when(this.remoteLogMetadataManager.listRemoteLogSegments(this.leaderTopicIdPartition, 0)).thenAnswer(invocationOnMock -> {
            return listRemoteLogSegmentMetadata.iterator();
        });
        this.checkpoint.write(singletonList);
        Mockito.when(this.mockLog.leaderEpochCache()).thenReturn(Option.apply(new LeaderEpochFileCache(this.tp, this.checkpoint, this.scheduler)));
        HashMap hashMap = new HashMap();
        hashMap.put("retention.bytes", -1L);
        hashMap.put("retention.ms", 0L);
        Mockito.when(this.mockLog.config()).thenReturn(new LogConfig(hashMap));
        Mockito.when(this.remoteLogMetadataManager.updateRemoteLogSegmentMetadata((RemoteLogSegmentMetadataUpdate) ArgumentMatchers.any(RemoteLogSegmentMetadataUpdate.class))).thenAnswer(invocationOnMock2 -> {
            rLMTask.cancel();
            return CompletableFuture.runAsync(() -> {
            });
        });
        rLMTask.cleanupExpiredRemoteLogSegments();
        Assertions.assertEquals(200L, this.currentLogStartOffset.get());
        ((RemoteStorageManager) Mockito.verify(this.remoteStorageManager)).deleteLogSegmentData(listRemoteLogSegmentMetadata.get(0));
        ((RemoteStorageManager) Mockito.verify(this.remoteStorageManager, Mockito.never())).deleteLogSegmentData(listRemoteLogSegmentMetadata.get(1));
        RemoteLogManager remoteLogManager2 = this.remoteLogManager;
        remoteLogManager2.getClass();
        RemoteLogManager.RLMTask rLMTask2 = new RemoteLogManager.RLMTask(remoteLogManager2, this.followerTopicIdPartition, 128);
        rLMTask2.convertToLeader(1);
        Iterator<RemoteLogSegmentMetadata> it = listRemoteLogSegmentMetadata.iterator();
        it.next();
        Iterator<RemoteLogSegmentMetadata> it2 = listRemoteLogSegmentMetadata.iterator();
        it2.next();
        Iterator<RemoteLogSegmentMetadata> it3 = listRemoteLogSegmentMetadata.iterator();
        it3.next();
        Mockito.when(this.remoteLogMetadataManager.listRemoteLogSegments(this.followerTopicIdPartition)).thenReturn(it);
        Mockito.when(this.remoteLogMetadataManager.listRemoteLogSegments(this.followerTopicIdPartition, 0)).thenReturn(it2).thenReturn(it3);
        Mockito.when(this.remoteLogMetadataManager.updateRemoteLogSegmentMetadata((RemoteLogSegmentMetadataUpdate) ArgumentMatchers.any(RemoteLogSegmentMetadataUpdate.class))).thenAnswer(invocationOnMock3 -> {
            return CompletableFuture.runAsync(() -> {
            });
        });
        rLMTask2.cleanupExpiredRemoteLogSegments();
        Assertions.assertEquals(200L, this.currentLogStartOffset.get());
        ((RemoteStorageManager) Mockito.verify(this.remoteStorageManager)).deleteLogSegmentData(listRemoteLogSegmentMetadata.get(0));
        ((RemoteStorageManager) Mockito.verify(this.remoteStorageManager)).deleteLogSegmentData(listRemoteLogSegmentMetadata.get(1));
    }

    @ParameterizedTest(name = "testFailedDeleteExpiredSegments retentionSize={0} retentionMs={1}")
    @CsvSource({"0, -1", "-1, 0"})
    public void testFailedDeleteExpiredSegments(long j, long j2) throws RemoteStorageException, ExecutionException, InterruptedException {
        HashMap hashMap = new HashMap();
        hashMap.put("retention.bytes", Long.valueOf(j));
        hashMap.put("retention.ms", Long.valueOf(j2));
        Mockito.when(this.mockLog.config()).thenReturn(new LogConfig(hashMap));
        this.checkpoint.write(Collections.singletonList(this.epochEntry0));
        Mockito.when(this.mockLog.leaderEpochCache()).thenReturn(Option.apply(new LeaderEpochFileCache(this.tp, this.checkpoint, this.scheduler)));
        Mockito.when(this.mockLog.topicPartition()).thenReturn(this.leaderTopicIdPartition.topicPartition());
        Mockito.when(Long.valueOf(this.mockLog.logEndOffset())).thenReturn(200L);
        List<RemoteLogSegmentMetadata> listRemoteLogSegmentMetadata = listRemoteLogSegmentMetadata(this.leaderTopicIdPartition, 1, 100, 1024, RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
        Mockito.when(this.remoteLogMetadataManager.listRemoteLogSegments(this.leaderTopicIdPartition)).thenReturn(listRemoteLogSegmentMetadata.iterator());
        Mockito.when(this.remoteLogMetadataManager.listRemoteLogSegments(this.leaderTopicIdPartition, 0)).thenAnswer(invocationOnMock -> {
            return listRemoteLogSegmentMetadata.iterator();
        });
        Mockito.when(this.remoteLogMetadataManager.updateRemoteLogSegmentMetadata((RemoteLogSegmentMetadataUpdate) ArgumentMatchers.any(RemoteLogSegmentMetadataUpdate.class))).thenReturn(CompletableFuture.runAsync(() -> {
        }));
        Assertions.assertEquals(0L, this.brokerTopicStats.topicStats(this.leaderTopicIdPartition.topic()).remoteDeleteRequestRate().count());
        Assertions.assertEquals(0L, this.brokerTopicStats.topicStats(this.leaderTopicIdPartition.topic()).failedRemoteDeleteRequestRate().count());
        Assertions.assertEquals(0L, this.brokerTopicStats.allTopicsStats().remoteDeleteRequestRate().count());
        Assertions.assertEquals(0L, this.brokerTopicStats.allTopicsStats().failedRemoteDeleteRequestRate().count());
        RemoteLogManager remoteLogManager = this.remoteLogManager;
        remoteLogManager.getClass();
        RemoteLogManager.RLMTask rLMTask = new RemoteLogManager.RLMTask(remoteLogManager, this.leaderTopicIdPartition, 128);
        rLMTask.convertToLeader(0);
        ((RemoteStorageManager) Mockito.doThrow(new Throwable[]{new RemoteStorageException("Failed to delete segment")}).when(this.remoteStorageManager)).deleteLogSegmentData((RemoteLogSegmentMetadata) ArgumentMatchers.any());
        rLMTask.getClass();
        Assertions.assertThrows(RemoteStorageException.class, rLMTask::cleanupExpiredRemoteLogSegments);
        Assertions.assertEquals(100L, this.currentLogStartOffset.get());
        ((RemoteStorageManager) Mockito.verify(this.remoteStorageManager)).deleteLogSegmentData(listRemoteLogSegmentMetadata.get(0));
        Assertions.assertEquals(1L, this.brokerTopicStats.topicStats(this.leaderTopicIdPartition.topic()).remoteDeleteRequestRate().count());
        Assertions.assertEquals(1L, this.brokerTopicStats.topicStats(this.leaderTopicIdPartition.topic()).failedRemoteDeleteRequestRate().count());
        Assertions.assertEquals(1L, this.brokerTopicStats.allTopicsStats().remoteDeleteRequestRate().count());
        Assertions.assertEquals(1L, this.brokerTopicStats.allTopicsStats().failedRemoteDeleteRequestRate().count());
        ((RemoteStorageManager) Mockito.doNothing().when(this.remoteStorageManager)).deleteLogSegmentData((RemoteLogSegmentMetadata) ArgumentMatchers.any());
        rLMTask.cleanupExpiredRemoteLogSegments();
        ((RemoteStorageManager) Mockito.verify(this.remoteStorageManager)).deleteLogSegmentData(listRemoteLogSegmentMetadata.get(0));
    }

    @ParameterizedTest(name = "testDeleteLogSegmentDueToRetentionSizeBreach segmentCount={0} deletableSegmentCount={1}")
    @CsvSource({"50, 0", "50, 1", "50, 23", "50, 50"})
    public void testDeleteLogSegmentDueToRetentionSizeBreach(int i, int i2) throws RemoteStorageException, ExecutionException, InterruptedException {
        List<EpochEntry> asList = Arrays.asList(new EpochEntry(0, 0L), new EpochEntry(1, 20L), new EpochEntry(3, 50L), new EpochEntry(4, 100L));
        this.checkpoint.write(asList);
        LeaderEpochFileCache leaderEpochFileCache = new LeaderEpochFileCache(this.tp, this.checkpoint, this.scheduler);
        int i3 = asList.get(asList.size() - 1).epoch;
        HashMap hashMap = new HashMap();
        hashMap.put("retention.bytes", Long.valueOf(((i - i2) * 1024) + 512));
        hashMap.put("retention.ms", -1L);
        Mockito.when(this.mockLog.config()).thenReturn(new LogConfig(hashMap));
        Mockito.when(this.mockLog.topicPartition()).thenReturn(this.tp);
        Mockito.when(this.mockLog.leaderEpochCache()).thenReturn(Option.apply(leaderEpochFileCache));
        Mockito.when(Long.valueOf(this.mockLog.localLogStartOffset())).thenReturn(Long.valueOf(i * 100));
        Mockito.when(Long.valueOf(this.mockLog.logEndOffset())).thenReturn(Long.valueOf((i * 100) + 1));
        Mockito.when(Long.valueOf(this.mockLog.onlyLocalLogSegmentsSize())).thenReturn(512L);
        verifyDeleteLogSegment(listRemoteLogSegmentMetadata(this.leaderTopicIdPartition, i, 100, 1024, asList, RemoteLogSegmentState.COPY_SEGMENT_FINISHED), i2, i3);
    }

    @ParameterizedTest(name = "testDeleteLogSegmentDueToRetentionTimeBreach segmentCount={0} deletableSegmentCount={1}")
    @CsvSource({"50, 0", "50, 1", "50, 23", "50, 50"})
    public void testDeleteLogSegmentDueToRetentionTimeBreach(int i, int i2) throws RemoteStorageException, ExecutionException, InterruptedException {
        List<EpochEntry> asList = Arrays.asList(new EpochEntry(0, 0L), new EpochEntry(1, 20L), new EpochEntry(3, 50L), new EpochEntry(4, 100L));
        this.checkpoint.write(asList);
        LeaderEpochFileCache leaderEpochFileCache = new LeaderEpochFileCache(this.tp, this.checkpoint, this.scheduler);
        int i3 = asList.get(asList.size() - 1).epoch;
        HashMap hashMap = new HashMap();
        hashMap.put("retention.bytes", -1L);
        hashMap.put("retention.ms", 1L);
        Mockito.when(this.mockLog.config()).thenReturn(new LogConfig(hashMap));
        Mockito.when(this.mockLog.topicPartition()).thenReturn(this.tp);
        Mockito.when(this.mockLog.leaderEpochCache()).thenReturn(Option.apply(leaderEpochFileCache));
        Mockito.when(Long.valueOf(this.mockLog.localLogStartOffset())).thenReturn(Long.valueOf(i * 100));
        Mockito.when(Long.valueOf(this.mockLog.logEndOffset())).thenReturn(Long.valueOf((i * 100) + 1));
        Mockito.when(Long.valueOf(this.mockLog.onlyLocalLogSegmentsSize())).thenReturn(512L);
        verifyDeleteLogSegment(listRemoteLogSegmentMetadataByTime(this.leaderTopicIdPartition, i, i2, 100, 1024, asList, RemoteLogSegmentState.COPY_SEGMENT_FINISHED), i2, i3);
    }

    private void verifyRemoteDeleteMetrics(long j, long j2) {
        Assertions.assertEquals(j, safeLongYammerMetricValue("RemoteDeleteLagBytes"), String.format("Expected to find %d for RemoteDeleteLagBytes metric value, but found %d", Long.valueOf(j), Long.valueOf(safeLongYammerMetricValue("RemoteDeleteLagBytes"))));
        Assertions.assertEquals(j2, safeLongYammerMetricValue("RemoteDeleteLagSegments"), String.format("Expected to find %d for RemoteDeleteLagSegments metric value, but found %d", Long.valueOf(j2), Long.valueOf(safeLongYammerMetricValue("RemoteDeleteLagSegments"))));
        Assertions.assertEquals(j, safeLongYammerMetricValue("RemoteDeleteLagBytes,topic=Leader"), String.format("Expected to find %d for RemoteDeleteLagBytes for 'Leader' topic metric value, but found %d", Long.valueOf(j), Long.valueOf(safeLongYammerMetricValue("RemoteDeleteLagBytes,topic=Leader"))));
        Assertions.assertEquals(j2, safeLongYammerMetricValue("RemoteDeleteLagSegments,topic=Leader"), String.format("Expected to find %d for RemoteDeleteLagSegments for 'Leader' topic metric value, but found %d", Long.valueOf(j2), Long.valueOf(safeLongYammerMetricValue("RemoteDeleteLagSegments,topic=Leader"))));
    }

    private void verifyDeleteLogSegment(List<RemoteLogSegmentMetadata> list, int i, int i2) throws RemoteStorageException, ExecutionException, InterruptedException {
        Mockito.when(this.remoteLogMetadataManager.listRemoteLogSegments(this.leaderTopicIdPartition)).thenReturn(list.iterator());
        Mockito.when(this.remoteLogMetadataManager.listRemoteLogSegments((TopicIdPartition) ArgumentMatchers.eq(this.leaderTopicIdPartition), ArgumentMatchers.anyInt())).thenAnswer(invocationOnMock -> {
            int intValue = ((Integer) invocationOnMock.getArgument(1)).intValue();
            return list.stream().filter(remoteLogSegmentMetadata -> {
                return remoteLogSegmentMetadata.segmentLeaderEpochs().containsKey(Integer.valueOf(intValue));
            }).iterator();
        });
        Mockito.when(this.remoteLogMetadataManager.updateRemoteLogSegmentMetadata((RemoteLogSegmentMetadataUpdate) ArgumentMatchers.any(RemoteLogSegmentMetadataUpdate.class))).thenAnswer(invocationOnMock2 -> {
            return CompletableFuture.runAsync(() -> {
            });
        });
        RemoteLogManager remoteLogManager = this.remoteLogManager;
        remoteLogManager.getClass();
        RemoteLogManager.RLMTask rLMTask = new RemoteLogManager.RLMTask(remoteLogManager, this.leaderTopicIdPartition, 128);
        rLMTask.convertToLeader(i2);
        rLMTask.cleanupExpiredRemoteLogSegments();
        ArgumentCaptor forClass = ArgumentCaptor.forClass(RemoteLogSegmentMetadata.class);
        ((RemoteStorageManager) Mockito.verify(this.remoteStorageManager, Mockito.times(i))).deleteLogSegmentData((RemoteLogSegmentMetadata) forClass.capture());
        if (i > 0) {
            List allValues = forClass.getAllValues();
            RemoteLogSegmentMetadata remoteLogSegmentMetadata = list.get(i - 1);
            Assertions.assertEquals(list.get(0), allValues.get(0));
            Assertions.assertEquals(remoteLogSegmentMetadata, allValues.get(allValues.size() - 1));
            Assertions.assertEquals(this.currentLogStartOffset.get(), remoteLogSegmentMetadata.endOffset() + 1);
        }
    }

    @Test
    public void testDeleteRetentionMsOnExpiredSegment() throws RemoteStorageException, IOException {
        AtomicLong atomicLong = new AtomicLong(0L);
        try {
            RemoteLogManager remoteLogManager = new RemoteLogManager(this.config, 0, this.logDir, "dummyId", this.time, topicPartition -> {
                return Optional.of(this.mockLog);
            }, (topicPartition2, l) -> {
                atomicLong.set(l.longValue());
            }, this.brokerTopicStats, this.metrics) { // from class: kafka.log.remote.RemoteLogManagerTest.8
                public RemoteStorageManager createRemoteStorageManager() {
                    return RemoteLogManagerTest.this.remoteStorageManager;
                }

                public RemoteLogMetadataManager createRemoteLogMetadataManager() {
                    return RemoteLogManagerTest.this.remoteLogMetadataManager;
                }
            };
            Throwable th = null;
            try {
                try {
                    remoteLogManager.getClass();
                    RemoteLogManager.RLMTask rLMTask = new RemoteLogManager.RLMTask(remoteLogManager, this.leaderTopicIdPartition, 128);
                    rLMTask.convertToLeader(0);
                    Mockito.when(this.mockLog.topicPartition()).thenReturn(this.leaderTopicIdPartition.topicPartition());
                    Mockito.when(Long.valueOf(this.mockLog.logEndOffset())).thenReturn(200L);
                    List<EpochEntry> singletonList = Collections.singletonList(this.epochEntry0);
                    List<RemoteLogSegmentMetadata> listRemoteLogSegmentMetadata = listRemoteLogSegmentMetadata(this.leaderTopicIdPartition, 2, 100, 1024, singletonList, RemoteLogSegmentState.DELETE_SEGMENT_FINISHED);
                    Mockito.when(this.remoteLogMetadataManager.listRemoteLogSegments(this.leaderTopicIdPartition)).thenReturn(listRemoteLogSegmentMetadata.iterator());
                    Mockito.when(this.remoteLogMetadataManager.listRemoteLogSegments(this.leaderTopicIdPartition, 0)).thenReturn(listRemoteLogSegmentMetadata.iterator()).thenReturn(listRemoteLogSegmentMetadata.iterator());
                    this.checkpoint.write(singletonList);
                    Mockito.when(this.mockLog.leaderEpochCache()).thenReturn(Option.apply(new LeaderEpochFileCache(this.tp, this.checkpoint, this.scheduler)));
                    HashMap hashMap = new HashMap();
                    hashMap.put("retention.bytes", -1L);
                    hashMap.put("retention.ms", 0L);
                    Mockito.when(this.mockLog.config()).thenReturn(new LogConfig(hashMap));
                    Mockito.when(this.remoteLogMetadataManager.updateRemoteLogSegmentMetadata((RemoteLogSegmentMetadataUpdate) ArgumentMatchers.any(RemoteLogSegmentMetadataUpdate.class))).thenAnswer(invocationOnMock -> {
                        return CompletableFuture.runAsync(() -> {
                        });
                    });
                    rLMTask.cleanupExpiredRemoteLogSegments();
                    Mockito.verifyNoMoreInteractions(new Object[]{this.remoteStorageManager});
                    Assertions.assertEquals(0L, atomicLong.get());
                    if (remoteLogManager != null) {
                        if (0 != 0) {
                            try {
                                remoteLogManager.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            remoteLogManager.close();
                        }
                    }
                } finally {
                }
            } finally {
            }
        } catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    private List<RemoteLogSegmentMetadata> listRemoteLogSegmentMetadata(TopicIdPartition topicIdPartition, int i, int i2, int i3, RemoteLogSegmentState remoteLogSegmentState) {
        return listRemoteLogSegmentMetadata(topicIdPartition, i, i2, i3, Collections.emptyList(), remoteLogSegmentState);
    }

    private List<RemoteLogSegmentMetadata> listRemoteLogSegmentMetadata(TopicIdPartition topicIdPartition, int i, int i2, int i3, List<EpochEntry> list, RemoteLogSegmentState remoteLogSegmentState) {
        return listRemoteLogSegmentMetadataByTime(topicIdPartition, i, 0, i2, i3, list, remoteLogSegmentState);
    }

    private List<RemoteLogSegmentMetadata> listRemoteLogSegmentMetadataByTime(TopicIdPartition topicIdPartition, int i, int i2, int i3, int i4, List<EpochEntry> list, RemoteLogSegmentState remoteLogSegmentState) {
        ArrayList arrayList = new ArrayList();
        for (int i5 = 0; i5 < i; i5++) {
            long milliseconds = this.time.milliseconds();
            if (i5 < i2) {
                milliseconds = this.time.milliseconds() - 1;
            }
            long j = i5 * i3;
            long j2 = (j + i3) - 1;
            arrayList.add(new RemoteLogSegmentMetadata(new RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid()), j, j2, milliseconds, 0, milliseconds, i4, Optional.empty(), remoteLogSegmentState, truncateAndGetLeaderEpochs(list.isEmpty() ? this.totalEpochEntries : list, Long.valueOf(j), Long.valueOf(j2))));
        }
        return arrayList;
    }

    private RemoteLogSegmentMetadata createRemoteLogSegmentMetadata(RemoteLogSegmentId remoteLogSegmentId, long j, long j2, int i, List<EpochEntry> list, RemoteLogSegmentState remoteLogSegmentState) {
        return new RemoteLogSegmentMetadata(remoteLogSegmentId, j, j2, this.time.milliseconds(), 0, this.time.milliseconds(), i, Optional.empty(), remoteLogSegmentState, truncateAndGetLeaderEpochs(list, Long.valueOf(j), Long.valueOf(j2)));
    }

    private Map<Integer, Long> truncateAndGetLeaderEpochs(List<EpochEntry> list, Long l, Long l2) {
        try {
            LeaderEpochCheckpointFile leaderEpochCheckpointFile = new LeaderEpochCheckpointFile(TestUtils.tempFile(), new LogDirFailureChannel(1));
            leaderEpochCheckpointFile.write(list);
            LeaderEpochFileCache leaderEpochFileCache = new LeaderEpochFileCache((TopicPartition) null, leaderEpochCheckpointFile, this.scheduler);
            leaderEpochFileCache.truncateFromStartAsyncFlush(l.longValue());
            leaderEpochFileCache.truncateFromEndAsyncFlush(l2.longValue());
            return (Map) leaderEpochCheckpointFile.read().stream().collect(Collectors.toMap(epochEntry -> {
                return Integer.valueOf(epochEntry.epoch);
            }, epochEntry2 -> {
                return Long.valueOf(epochEntry2.startOffset);
            }));
        } catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }

    @Test
    public void testReadForMissingFirstBatchInRemote() throws RemoteStorageException, IOException {
        FileInputStream fileInputStream = (FileInputStream) Mockito.mock(FileInputStream.class);
        final ClassLoaderAwareRemoteStorageManager classLoaderAwareRemoteStorageManager = (ClassLoaderAwareRemoteStorageManager) Mockito.mock(ClassLoaderAwareRemoteStorageManager.class);
        final RemoteLogSegmentMetadata remoteLogSegmentMetadata = (RemoteLogSegmentMetadata) Mockito.mock(RemoteLogSegmentMetadata.class);
        LeaderEpochFileCache leaderEpochFileCache = (LeaderEpochFileCache) Mockito.mock(LeaderEpochFileCache.class);
        Mockito.when(leaderEpochFileCache.epochForOffset(ArgumentMatchers.anyLong())).thenReturn(OptionalInt.of(1));
        Mockito.when(this.remoteStorageManager.fetchLogSegment((RemoteLogSegmentMetadata) ArgumentMatchers.any(RemoteLogSegmentMetadata.class), ArgumentMatchers.anyInt())).thenAnswer(invocationOnMock -> {
            return fileInputStream;
        });
        Mockito.when(this.mockLog.leaderEpochCache()).thenReturn(Option.apply(leaderEpochFileCache));
        RemoteStorageFetchInfo remoteStorageFetchInfo = new RemoteStorageFetchInfo(0, false, this.tp, new FetchRequest.PartitionData(Uuid.randomUuid(), 0, 0L, 10, Optional.empty()), FetchIsolation.TXN_COMMITTED, false);
        RemoteLogManager remoteLogManager = new RemoteLogManager(this.config, 0, this.logDir, "dummyId", this.time, topicPartition -> {
            return Optional.of(this.mockLog);
        }, (topicPartition2, l) -> {
        }, this.brokerTopicStats, this.metrics) { // from class: kafka.log.remote.RemoteLogManagerTest.9
            public RemoteStorageManager createRemoteStorageManager() {
                return classLoaderAwareRemoteStorageManager;
            }

            public RemoteLogMetadataManager createRemoteLogMetadataManager() {
                return RemoteLogManagerTest.this.remoteLogMetadataManager;
            }

            public Optional<RemoteLogSegmentMetadata> fetchRemoteLogSegmentMetadata(TopicPartition topicPartition3, int i, long j) {
                return Optional.of(remoteLogSegmentMetadata);
            }

            public Optional<RemoteLogSegmentMetadata> findNextSegmentMetadata(RemoteLogSegmentMetadata remoteLogSegmentMetadata2, Option<LeaderEpochFileCache> option) {
                return Optional.empty();
            }

            int lookupPositionForOffset(RemoteLogSegmentMetadata remoteLogSegmentMetadata2, long j) {
                return 1;
            }

            RecordBatch findFirstBatch(RemoteLogInputStream remoteLogInputStream, long j) {
                return null;
            }
        };
        Throwable th = null;
        try {
            try {
                FetchDataInfo read = remoteLogManager.read(remoteStorageFetchInfo);
                Assertions.assertEquals(0, read.fetchOffsetMetadata.messageOffset);
                Assertions.assertFalse(read.firstEntryIncomplete);
                Assertions.assertEquals(MemoryRecords.EMPTY, read.records);
                Assertions.assertTrue(read.abortedTransactions.isPresent());
                Assertions.assertTrue(((List) read.abortedTransactions.get()).isEmpty());
                if (remoteLogManager != null) {
                    if (0 == 0) {
                        remoteLogManager.close();
                        return;
                    }
                    try {
                        remoteLogManager.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (remoteLogManager != null) {
                if (th != null) {
                    try {
                        remoteLogManager.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    remoteLogManager.close();
                }
            }
            throw th4;
        }
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testReadForFirstBatchMoreThanMaxFetchBytes(boolean z) throws RemoteStorageException, IOException {
        FileInputStream fileInputStream = (FileInputStream) Mockito.mock(FileInputStream.class);
        final ClassLoaderAwareRemoteStorageManager classLoaderAwareRemoteStorageManager = (ClassLoaderAwareRemoteStorageManager) Mockito.mock(ClassLoaderAwareRemoteStorageManager.class);
        final RemoteLogSegmentMetadata remoteLogSegmentMetadata = (RemoteLogSegmentMetadata) Mockito.mock(RemoteLogSegmentMetadata.class);
        LeaderEpochFileCache leaderEpochFileCache = (LeaderEpochFileCache) Mockito.mock(LeaderEpochFileCache.class);
        Mockito.when(leaderEpochFileCache.epochForOffset(ArgumentMatchers.anyLong())).thenReturn(OptionalInt.of(1));
        Mockito.when(this.remoteStorageManager.fetchLogSegment((RemoteLogSegmentMetadata) ArgumentMatchers.any(RemoteLogSegmentMetadata.class), ArgumentMatchers.anyInt())).thenAnswer(invocationOnMock -> {
            return fileInputStream;
        });
        Mockito.when(this.mockLog.leaderEpochCache()).thenReturn(Option.apply(leaderEpochFileCache));
        final int i = 10 + 1;
        final RecordBatch recordBatch = (RecordBatch) Mockito.mock(RecordBatch.class);
        final ArgumentCaptor forClass = ArgumentCaptor.forClass(ByteBuffer.class);
        RemoteStorageFetchInfo remoteStorageFetchInfo = new RemoteStorageFetchInfo(0, z, this.tp, new FetchRequest.PartitionData(Uuid.randomUuid(), 0, 0L, 10, Optional.empty()), FetchIsolation.HIGH_WATERMARK, false);
        RemoteLogManager remoteLogManager = new RemoteLogManager(this.config, 0, this.logDir, "dummyId", this.time, topicPartition -> {
            return Optional.of(this.mockLog);
        }, (topicPartition2, l) -> {
        }, this.brokerTopicStats, this.metrics) { // from class: kafka.log.remote.RemoteLogManagerTest.10
            public RemoteStorageManager createRemoteStorageManager() {
                return classLoaderAwareRemoteStorageManager;
            }

            public RemoteLogMetadataManager createRemoteLogMetadataManager() {
                return RemoteLogManagerTest.this.remoteLogMetadataManager;
            }

            public Optional<RemoteLogSegmentMetadata> fetchRemoteLogSegmentMetadata(TopicPartition topicPartition3, int i2, long j) {
                return Optional.of(remoteLogSegmentMetadata);
            }

            int lookupPositionForOffset(RemoteLogSegmentMetadata remoteLogSegmentMetadata2, long j) {
                return 1;
            }

            RecordBatch findFirstBatch(RemoteLogInputStream remoteLogInputStream, long j) {
                Mockito.when(Integer.valueOf(recordBatch.sizeInBytes())).thenReturn(Integer.valueOf(i));
                ((RecordBatch) Mockito.doNothing().when(recordBatch)).writeTo((ByteBuffer) forClass.capture());
                return recordBatch;
            }
        };
        Throwable th = null;
        try {
            FetchDataInfo read = remoteLogManager.read(remoteStorageFetchInfo);
            Assertions.assertEquals(0, read.fetchOffsetMetadata.messageOffset);
            Assertions.assertFalse(read.firstEntryIncomplete);
            Assertions.assertEquals(Optional.empty(), read.abortedTransactions);
            if (z) {
                Assertions.assertEquals(i, ((ByteBuffer) forClass.getValue()).capacity());
            } else {
                ((RecordBatch) Mockito.verify(recordBatch, Mockito.never())).writeTo((ByteBuffer) ArgumentMatchers.any(ByteBuffer.class));
                Assertions.assertEquals(MemoryRecords.EMPTY, read.records);
            }
            if (remoteLogManager != null) {
                if (0 == 0) {
                    remoteLogManager.close();
                    return;
                }
                try {
                    remoteLogManager.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (remoteLogManager != null) {
                if (0 != 0) {
                    try {
                        remoteLogManager.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    remoteLogManager.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testReadForFirstBatchInLogCompaction() throws RemoteStorageException, IOException {
        FileInputStream fileInputStream = (FileInputStream) Mockito.mock(FileInputStream.class);
        final RemoteLogInputStream remoteLogInputStream = (RemoteLogInputStream) Mockito.mock(RemoteLogInputStream.class);
        final ClassLoaderAwareRemoteStorageManager classLoaderAwareRemoteStorageManager = (ClassLoaderAwareRemoteStorageManager) Mockito.mock(ClassLoaderAwareRemoteStorageManager.class);
        final RemoteLogSegmentMetadata remoteLogSegmentMetadata = (RemoteLogSegmentMetadata) Mockito.mock(RemoteLogSegmentMetadata.class);
        LeaderEpochFileCache leaderEpochFileCache = (LeaderEpochFileCache) Mockito.mock(LeaderEpochFileCache.class);
        Mockito.when(leaderEpochFileCache.epochForOffset(ArgumentMatchers.anyLong())).thenReturn(OptionalInt.of(1));
        Mockito.when(this.mockLog.leaderEpochCache()).thenReturn(Option.apply(leaderEpochFileCache));
        int i = 10 + 1;
        RecordBatch recordBatch = (RecordBatch) Mockito.mock(RecordBatch.class);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(ByteBuffer.class);
        FetchRequest.PartitionData partitionData = new FetchRequest.PartitionData(Uuid.randomUuid(), 0, 0L, 10, Optional.empty());
        Mockito.when(classLoaderAwareRemoteStorageManager.fetchLogSegment((RemoteLogSegmentMetadata) ArgumentMatchers.any(), ArgumentMatchers.anyInt())).thenReturn(fileInputStream);
        Mockito.when(remoteLogSegmentMetadata.topicIdPartition()).thenReturn(new TopicIdPartition(Uuid.randomUuid(), this.tp));
        Mockito.when(remoteLogInputStream.nextBatch()).thenReturn((Object) null, new RecordBatch[]{recordBatch});
        Mockito.when(Long.valueOf(recordBatch.lastOffset())).thenReturn(2L);
        Mockito.when(Integer.valueOf(recordBatch.sizeInBytes())).thenReturn(Integer.valueOf(i));
        ((RecordBatch) Mockito.doNothing().when(recordBatch)).writeTo((ByteBuffer) forClass.capture());
        RemoteStorageFetchInfo remoteStorageFetchInfo = new RemoteStorageFetchInfo(0, true, this.tp, partitionData, FetchIsolation.HIGH_WATERMARK, false);
        RemoteLogManager remoteLogManager = new RemoteLogManager(this.config, 0, this.logDir, "dummyId", this.time, topicPartition -> {
            return Optional.of(this.mockLog);
        }, (topicPartition2, l) -> {
        }, this.brokerTopicStats, this.metrics) { // from class: kafka.log.remote.RemoteLogManagerTest.11
            public RemoteStorageManager createRemoteStorageManager() {
                return classLoaderAwareRemoteStorageManager;
            }

            public RemoteLogMetadataManager createRemoteLogMetadataManager() {
                return RemoteLogManagerTest.this.remoteLogMetadataManager;
            }

            public Optional<RemoteLogSegmentMetadata> fetchRemoteLogSegmentMetadata(TopicPartition topicPartition3, int i2, long j) {
                return Optional.of(remoteLogSegmentMetadata);
            }

            public RemoteLogInputStream getRemoteLogInputStream(InputStream inputStream) {
                return remoteLogInputStream;
            }

            int lookupPositionForOffset(RemoteLogSegmentMetadata remoteLogSegmentMetadata2, long j) {
                return 1;
            }
        };
        Throwable th = null;
        try {
            try {
                FetchDataInfo read = remoteLogManager.read(remoteStorageFetchInfo);
                Assertions.assertEquals(0, read.fetchOffsetMetadata.messageOffset);
                Assertions.assertFalse(read.firstEntryIncomplete);
                Assertions.assertEquals(Optional.empty(), read.abortedTransactions);
                Assertions.assertEquals(i, ((ByteBuffer) forClass.getValue()).capacity());
                if (remoteLogManager != null) {
                    if (0 == 0) {
                        remoteLogManager.close();
                        return;
                    }
                    try {
                        remoteLogManager.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (remoteLogManager != null) {
                if (th != null) {
                    try {
                        remoteLogManager.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    remoteLogManager.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testCopyQuotaManagerConfig() {
        Properties properties = new Properties();
        properties.put("zookeeper.connect", kafka.utils.TestUtils.MockZkConnect());
        createRLMConfig(properties);
        RLMQuotaManagerConfig copyQuotaManagerConfig = RemoteLogManager.copyQuotaManagerConfig(KafkaConfig.fromProps(properties));
        Assertions.assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND, copyQuotaManagerConfig.quotaBytesPerSecond());
        Assertions.assertEquals(11, copyQuotaManagerConfig.numQuotaSamples());
        Assertions.assertEquals(1, copyQuotaManagerConfig.quotaWindowSizeSeconds());
        Properties properties2 = new Properties();
        properties2.put("zookeeper.connect", kafka.utils.TestUtils.MockZkConnect());
        properties2.put("remote.log.manager.copy.max.bytes.per.second", 100);
        properties2.put("remote.log.manager.copy.quota.window.num", 31);
        properties2.put("remote.log.manager.copy.quota.window.size.seconds", 1);
        createRLMConfig(properties2);
        RLMQuotaManagerConfig copyQuotaManagerConfig2 = RemoteLogManager.copyQuotaManagerConfig(KafkaConfig.fromProps(properties2));
        Assertions.assertEquals(100L, copyQuotaManagerConfig2.quotaBytesPerSecond());
        Assertions.assertEquals(31, copyQuotaManagerConfig2.numQuotaSamples());
        Assertions.assertEquals(1, copyQuotaManagerConfig2.quotaWindowSizeSeconds());
    }

    @Test
    public void testFetchQuotaManagerConfig() {
        Properties properties = new Properties();
        properties.put("zookeeper.connect", kafka.utils.TestUtils.MockZkConnect());
        createRLMConfig(properties);
        RLMQuotaManagerConfig fetchQuotaManagerConfig = RemoteLogManager.fetchQuotaManagerConfig(KafkaConfig.fromProps(properties));
        Assertions.assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND, fetchQuotaManagerConfig.quotaBytesPerSecond());
        Assertions.assertEquals(11, fetchQuotaManagerConfig.numQuotaSamples());
        Assertions.assertEquals(1, fetchQuotaManagerConfig.quotaWindowSizeSeconds());
        Properties properties2 = new Properties();
        properties2.put("zookeeper.connect", kafka.utils.TestUtils.MockZkConnect());
        properties2.put("remote.log.manager.fetch.max.bytes.per.second", 100);
        properties2.put("remote.log.manager.fetch.quota.window.num", 31);
        properties2.put("remote.log.manager.fetch.quota.window.size.seconds", 1);
        createRLMConfig(properties2);
        RLMQuotaManagerConfig fetchQuotaManagerConfig2 = RemoteLogManager.fetchQuotaManagerConfig(KafkaConfig.fromProps(properties2));
        Assertions.assertEquals(100L, fetchQuotaManagerConfig2.quotaBytesPerSecond());
        Assertions.assertEquals(31, fetchQuotaManagerConfig2.numQuotaSamples());
        Assertions.assertEquals(1, fetchQuotaManagerConfig2.quotaWindowSizeSeconds());
    }

    @Test
    public void testEpochEntriesAsByteBuffer() throws Exception {
        List asList = Arrays.asList(new EpochEntry(0, 1L));
        BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(new ByteArrayInputStream(RemoteLogManager.epochEntriesAsByteBuffer(asList).array()), StandardCharsets.UTF_8));
        Assertions.assertEquals(String.valueOf(0), bufferedReader.readLine());
        Assertions.assertEquals(String.valueOf(asList.size()), bufferedReader.readLine());
        Assertions.assertEquals("0 1", bufferedReader.readLine());
    }

    @ValueSource(booleans = {false, true})
    @ParameterizedTest
    public void testCopyQuota(boolean z) throws Exception {
        RemoteLogManager.RLMTask rLMTask = setupRLMTask(z);
        if (z) {
            Assertions.assertThrows(AssertionFailedError.class, () -> {
                Assertions.assertTimeoutPreemptively(Duration.ofMillis(200L), () -> {
                    rLMTask.copyLogSegmentsToRemote(this.mockLog);
                });
            });
            ArgumentCaptor forClass = ArgumentCaptor.forClass(Long.class);
            ((UnifiedLog) Mockito.verify(this.mockLog, Mockito.times(1))).updateHighestOffsetInRemoteStorage(((Long) forClass.capture()).longValue());
            Assertions.assertEquals(-1L, (Long) forClass.getValue());
            return;
        }
        Assertions.assertTimeoutPreemptively(Duration.ofMillis(100L), () -> {
            rLMTask.copyLogSegmentsToRemote(this.mockLog);
        });
        ((RLMQuotaManager) Mockito.verify(this.rlmCopyQuotaManager, Mockito.times(1))).isQuotaExceeded();
        ((RLMQuotaManager) Mockito.verify(this.rlmCopyQuotaManager, Mockito.times(1))).record(10.0d);
        ArgumentCaptor forClass2 = ArgumentCaptor.forClass(Long.class);
        ((UnifiedLog) Mockito.verify(this.mockLog, Mockito.times(2))).updateHighestOffsetInRemoteStorage(((Long) forClass2.capture()).longValue());
        List allValues = forClass2.getAllValues();
        Assertions.assertEquals(-1L, ((Long) allValues.get(0)).longValue());
        Assertions.assertEquals(149L, ((Long) allValues.get(1)).longValue());
    }

    @Test
    public void testRLMShutdownDuringQuotaExceededScenario() throws Exception {
        this.remoteLogManager.startup();
        setupRLMTask(true);
        this.remoteLogManager.onLeadershipChange(Collections.singleton(mockPartition(this.leaderTopicIdPartition)), Collections.emptySet(), this.topicIds);
        TestUtils.waitForCondition(() -> {
            ((RLMQuotaManager) Mockito.verify(this.rlmCopyQuotaManager, Mockito.atLeast(1))).isQuotaExceeded();
            return true;
        }, "Quota exceeded check did not happen");
        Assertions.assertTimeoutPreemptively(Duration.ofMillis(100L), () -> {
            this.remoteLogManager.close();
        });
    }

    private RemoteLogManager.RLMTask setupRLMTask(boolean z) throws RemoteStorageException, IOException {
        Mockito.when(this.mockLog.topicPartition()).thenReturn(this.leaderTopicIdPartition.topicPartition());
        this.checkpoint.write(this.totalEpochEntries);
        Mockito.when(this.mockLog.leaderEpochCache()).thenReturn(Option.apply(new LeaderEpochFileCache(this.leaderTopicIdPartition.topicPartition(), this.checkpoint, this.scheduler)));
        Mockito.when(this.mockLog.parentDir()).thenReturn("dir1");
        Mockito.when(this.remoteLogMetadataManager.highestOffsetForEpoch((TopicIdPartition) ArgumentMatchers.any(TopicIdPartition.class), ArgumentMatchers.anyInt())).thenReturn(Optional.of(0L));
        LogSegment logSegment = (LogSegment) Mockito.mock(LogSegment.class);
        LogSegment logSegment2 = (LogSegment) Mockito.mock(LogSegment.class);
        File tempFile = TestUtils.tempFile();
        FileRecords fileRecords = (FileRecords) Mockito.mock(FileRecords.class);
        Mockito.when(fileRecords.file()).thenReturn(tempFile);
        Mockito.when(Integer.valueOf(fileRecords.sizeInBytes())).thenReturn(10);
        Mockito.when(logSegment.log()).thenReturn(fileRecords);
        Mockito.when(Long.valueOf(logSegment.baseOffset())).thenReturn(0L);
        Mockito.when(Long.valueOf(logSegment.readNextOffset())).thenReturn(150L);
        Mockito.when(Long.valueOf(logSegment2.baseOffset())).thenReturn(150L);
        Mockito.when(this.mockLog.activeSegment()).thenReturn(logSegment2);
        Mockito.when(Long.valueOf(this.mockLog.logStartOffset())).thenReturn(0L);
        Mockito.when(this.mockLog.logSegments(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(logSegment, logSegment2)));
        File tempFile2 = TestUtils.tempFile();
        ProducerStateManager producerStateManager = (ProducerStateManager) Mockito.mock(ProducerStateManager.class);
        Mockito.when(producerStateManager.fetchSnapshot(ArgumentMatchers.anyLong())).thenReturn(Optional.of(tempFile2));
        Mockito.when(this.mockLog.producerStateManager()).thenReturn(producerStateManager);
        Mockito.when(Long.valueOf(this.mockLog.lastStableOffset())).thenReturn(250L);
        File tempDirectory = TestUtils.tempDirectory();
        OffsetIndex offsetIndex = LazyIndex.forOffset(LogFileUtils.offsetIndexFile(tempDirectory, 0L, ""), 0L, 1000).get();
        TimeIndex timeIndex = LazyIndex.forTime(LogFileUtils.timeIndexFile(tempDirectory, 0L, ""), 0L, 1500).get();
        File transactionIndexFile = UnifiedLog.transactionIndexFile(tempDirectory, 0L, "");
        transactionIndexFile.createNewFile();
        TransactionIndex transactionIndex = new TransactionIndex(0L, transactionIndexFile);
        Mockito.when(logSegment.timeIndex()).thenReturn(timeIndex);
        Mockito.when(logSegment.offsetIndex()).thenReturn(offsetIndex);
        Mockito.when(logSegment.txnIndex()).thenReturn(transactionIndex);
        CompletableFuture completableFuture = new CompletableFuture();
        completableFuture.complete(null);
        Mockito.when(this.remoteLogMetadataManager.addRemoteLogSegmentMetadata((RemoteLogSegmentMetadata) ArgumentMatchers.any(RemoteLogSegmentMetadata.class))).thenReturn(completableFuture);
        Mockito.when(this.remoteLogMetadataManager.updateRemoteLogSegmentMetadata((RemoteLogSegmentMetadataUpdate) ArgumentMatchers.any(RemoteLogSegmentMetadataUpdate.class))).thenReturn(completableFuture);
        Mockito.when(this.remoteStorageManager.copyLogSegmentData((RemoteLogSegmentMetadata) ArgumentMatchers.any(RemoteLogSegmentMetadata.class), (LogSegmentData) ArgumentMatchers.any(LogSegmentData.class))).thenReturn(Optional.empty());
        Mockito.when(Boolean.valueOf(this.rlmCopyQuotaManager.isQuotaExceeded())).thenReturn(Boolean.valueOf(z));
        ((RLMQuotaManager) Mockito.doNothing().when(this.rlmCopyQuotaManager)).record(ArgumentMatchers.anyInt());
        RemoteLogManager remoteLogManager = this.remoteLogManager;
        remoteLogManager.getClass();
        RemoteLogManager.RLMTask rLMTask = new RemoteLogManager.RLMTask(remoteLogManager, this.leaderTopicIdPartition, 128);
        rLMTask.convertToLeader(2);
        return rLMTask;
    }

    @Test
    public void testCopyThrottling() throws Exception {
        Mockito.when(this.mockLog.topicPartition()).thenReturn(this.leaderTopicIdPartition.topicPartition());
        this.checkpoint.write(this.totalEpochEntries);
        Mockito.when(this.mockLog.leaderEpochCache()).thenReturn(Option.apply(new LeaderEpochFileCache(this.leaderTopicIdPartition.topicPartition(), this.checkpoint, this.scheduler)));
        Mockito.when(this.remoteLogMetadataManager.highestOffsetForEpoch((TopicIdPartition) ArgumentMatchers.any(TopicIdPartition.class), ArgumentMatchers.anyInt())).thenReturn(Optional.of(0L));
        LogSegment logSegment = (LogSegment) Mockito.mock(LogSegment.class);
        LogSegment logSegment2 = (LogSegment) Mockito.mock(LogSegment.class);
        LogSegment logSegment3 = (LogSegment) Mockito.mock(LogSegment.class);
        File tempFile = TestUtils.tempFile();
        FileRecords fileRecords = (FileRecords) Mockito.mock(FileRecords.class);
        Mockito.when(fileRecords.file()).thenReturn(tempFile);
        Mockito.when(Integer.valueOf(fileRecords.sizeInBytes())).thenReturn(10);
        Mockito.when(logSegment.log()).thenReturn(fileRecords);
        Mockito.when(Long.valueOf(logSegment.baseOffset())).thenReturn(0L);
        Mockito.when(Long.valueOf(logSegment.readNextOffset())).thenReturn(100L);
        Mockito.when(logSegment2.log()).thenReturn(fileRecords);
        Mockito.when(Long.valueOf(logSegment2.baseOffset())).thenReturn(100L);
        Mockito.when(Long.valueOf(logSegment2.readNextOffset())).thenReturn(150L);
        Mockito.when(logSegment3.log()).thenReturn(fileRecords);
        Mockito.when(Long.valueOf(logSegment3.baseOffset())).thenReturn(150L);
        Mockito.when(this.mockLog.activeSegment()).thenReturn(logSegment3);
        Mockito.when(Long.valueOf(this.mockLog.logStartOffset())).thenReturn(0L);
        Mockito.when(this.mockLog.logSegments(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(logSegment, logSegment2, logSegment3)));
        File tempFile2 = TestUtils.tempFile();
        ProducerStateManager producerStateManager = (ProducerStateManager) Mockito.mock(ProducerStateManager.class);
        Mockito.when(producerStateManager.fetchSnapshot(ArgumentMatchers.anyLong())).thenReturn(Optional.of(tempFile2));
        Mockito.when(this.mockLog.producerStateManager()).thenReturn(producerStateManager);
        Mockito.when(Long.valueOf(this.mockLog.lastStableOffset())).thenReturn(250L);
        File tempDirectory = TestUtils.tempDirectory();
        OffsetIndex offsetIndex = LazyIndex.forOffset(LogFileUtils.offsetIndexFile(tempDirectory, 0L, ""), 0L, 1000).get();
        TimeIndex timeIndex = LazyIndex.forTime(LogFileUtils.timeIndexFile(tempDirectory, 0L, ""), 0L, 1500).get();
        File transactionIndexFile = UnifiedLog.transactionIndexFile(tempDirectory, 0L, "");
        transactionIndexFile.createNewFile();
        TransactionIndex transactionIndex = new TransactionIndex(0L, transactionIndexFile);
        Mockito.when(logSegment.timeIndex()).thenReturn(timeIndex);
        Mockito.when(logSegment.offsetIndex()).thenReturn(offsetIndex);
        Mockito.when(logSegment.txnIndex()).thenReturn(transactionIndex);
        CompletableFuture completableFuture = new CompletableFuture();
        completableFuture.complete(null);
        Mockito.when(this.remoteLogMetadataManager.addRemoteLogSegmentMetadata((RemoteLogSegmentMetadata) ArgumentMatchers.any(RemoteLogSegmentMetadata.class))).thenReturn(completableFuture);
        Mockito.when(this.remoteLogMetadataManager.updateRemoteLogSegmentMetadata((RemoteLogSegmentMetadataUpdate) ArgumentMatchers.any(RemoteLogSegmentMetadataUpdate.class))).thenReturn(completableFuture);
        Mockito.when(this.remoteStorageManager.copyLogSegmentData((RemoteLogSegmentMetadata) ArgumentMatchers.any(RemoteLogSegmentMetadata.class), (LogSegmentData) ArgumentMatchers.any(LogSegmentData.class))).thenReturn(Optional.empty());
        Mockito.when(Boolean.valueOf(this.rlmCopyQuotaManager.isQuotaExceeded())).thenReturn(false, new Boolean[]{true});
        ((RLMQuotaManager) Mockito.doNothing().when(this.rlmCopyQuotaManager)).record(ArgumentMatchers.anyInt());
        RemoteLogManager remoteLogManager = this.remoteLogManager;
        remoteLogManager.getClass();
        RemoteLogManager.RLMTask rLMTask = new RemoteLogManager.RLMTask(remoteLogManager, this.leaderTopicIdPartition, 128);
        rLMTask.convertToLeader(2);
        Assertions.assertThrows(AssertionFailedError.class, () -> {
            Assertions.assertTimeoutPreemptively(Duration.ofMillis(200L), () -> {
                rLMTask.copyLogSegmentsToRemote(this.mockLog);
            });
        });
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Long.class);
        ((UnifiedLog) Mockito.verify(this.mockLog, Mockito.times(2))).updateHighestOffsetInRemoteStorage(((Long) forClass.capture()).longValue());
        List allValues = forClass.getAllValues();
        Assertions.assertEquals(-1L, ((Long) allValues.get(0)).longValue());
        Assertions.assertEquals(99L, ((Long) allValues.get(1)).longValue());
    }

    @Test
    public void testTierLagResetsToZeroOnBecomingFollower() {
        this.remoteLogManager.startup();
        this.remoteLogManager.onLeadershipChange(Collections.singleton(mockPartition(this.leaderTopicIdPartition)), Collections.emptySet(), this.topicIds);
        RemoteLogManager.RLMTask rlmTask = this.remoteLogManager.rlmTask(this.leaderTopicIdPartition);
        Assertions.assertNotNull(rlmTask);
        rlmTask.recordLagStats(1024L, 2L);
        Assertions.assertEquals(1024L, this.brokerTopicStats.topicStats(this.leaderTopicIdPartition.topic()).remoteCopyLagBytes());
        Assertions.assertEquals(2L, this.brokerTopicStats.topicStats(this.leaderTopicIdPartition.topic()).remoteCopyLagSegments());
        this.remoteLogManager.onLeadershipChange(Collections.emptySet(), Collections.singleton(mockPartition(this.leaderTopicIdPartition)), this.topicIds);
        Assertions.assertEquals(0L, this.brokerTopicStats.topicStats(this.leaderTopicIdPartition.topic()).remoteCopyLagBytes());
        Assertions.assertEquals(0L, this.brokerTopicStats.topicStats(this.leaderTopicIdPartition.topic()).remoteCopyLagSegments());
        rlmTask.recordLagStats(2048L, 4L);
        Assertions.assertEquals(0L, this.brokerTopicStats.topicStats(this.leaderTopicIdPartition.topic()).remoteCopyLagBytes());
        Assertions.assertEquals(0L, this.brokerTopicStats.topicStats(this.leaderTopicIdPartition.topic()).remoteCopyLagSegments());
    }

    private Partition mockPartition(TopicIdPartition topicIdPartition) {
        TopicPartition topicPartition = topicIdPartition.topicPartition();
        Partition partition = (Partition) Mockito.mock(Partition.class);
        UnifiedLog unifiedLog = (UnifiedLog) Mockito.mock(UnifiedLog.class);
        Mockito.when(partition.topicPartition()).thenReturn(topicPartition);
        Mockito.when(partition.topic()).thenReturn(topicPartition.topic());
        Mockito.when(Boolean.valueOf(unifiedLog.remoteLogEnabled())).thenReturn(true);
        Mockito.when(partition.log()).thenReturn(Option.apply(unifiedLog));
        return partition;
    }

    private RemoteLogManagerConfig createRLMConfig(Properties properties) {
        properties.put("remote.log.storage.system.enable", true);
        properties.put("remote.log.storage.manager.class.name", NoOpRemoteStorageManager.class.getName());
        properties.put("remote.log.metadata.manager.class.name", NoOpRemoteLogMetadataManager.class.getName());
        properties.put("rsm.config.remote.log.storage.test", "storage.test");
        properties.put("rlmm.config.remote.log.metadata.topic.num.partitions", "1");
        properties.put("rlmm.config.remote.log.metadata.test", "metadata.test");
        properties.put("rlmm.config.remote.log.metadata.common.client.common.client.test", "common.test");
        properties.put("rlmm.config.remote.log.metadata.consumer.consumer.test", "consumer.test");
        properties.put("rlmm.config.remote.log.metadata.producer.producer.test", "producer.test");
        return new RemoteLogManagerConfig(properties);
    }
}
