/*
 * Decompiled with CFR 0.152.
 */
package kafka.server;

import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.util.HashMap;
import java.util.List;
import java.util.Optional;
import kafka.cluster.Partition;
import kafka.log.UnifiedLog;
import kafka.log.remote.RemoteLogManager;
import kafka.server.Fetching$;
import kafka.server.LeaderEndPoint;
import kafka.server.PartitionFetchState;
import kafka.server.ReplicaManager;
import kafka.server.TierStateMachine;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.message.FetchResponseData;
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData;
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.common.CheckpointFile;
import org.apache.kafka.server.common.OffsetAndEpoch;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile;
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache;
import org.apache.kafka.storage.internals.log.EpochEntry;
import org.apache.kafka.storage.internals.log.LogFileUtils;
import org.apache.kafka.storage.internals.log.LogStartOffsetIncrementReason;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.collection.JavaConverters;
import scala.collection.Map;

public class ReplicaFetcherTierStateMachine
implements TierStateMachine {
    private static final Logger log = LoggerFactory.getLogger(ReplicaFetcherTierStateMachine.class);
    private LeaderEndPoint leader;
    private ReplicaManager replicaMgr;

    public ReplicaFetcherTierStateMachine(LeaderEndPoint leader, ReplicaManager replicaMgr) {
        this.leader = leader;
        this.replicaMgr = replicaMgr;
    }

    @Override
    public PartitionFetchState start(TopicPartition topicPartition, PartitionFetchState currentFetchState, FetchResponseData.PartitionData fetchPartitionData) throws Exception {
        OffsetAndEpoch epochAndLeaderLocalStartOffset = this.leader.fetchEarliestLocalOffset(topicPartition, currentFetchState.currentLeaderEpoch());
        int epoch = epochAndLeaderLocalStartOffset.leaderEpoch();
        long leaderLocalStartOffset = epochAndLeaderLocalStartOffset.offset();
        long offsetToFetch = this.buildRemoteLogAuxState(topicPartition, currentFetchState.currentLeaderEpoch(), leaderLocalStartOffset, epoch, fetchPartitionData.logStartOffset());
        OffsetAndEpoch fetchLatestOffsetResult = this.leader.fetchLatestOffset(topicPartition, currentFetchState.currentLeaderEpoch());
        long leaderEndOffset = fetchLatestOffsetResult.offset();
        long initialLag = leaderEndOffset - offsetToFetch;
        return PartitionFetchState.apply(currentFetchState.topicId(), offsetToFetch, (Option<Object>)Option.apply((Object)initialLag), currentFetchState.currentLeaderEpoch(), Fetching$.MODULE$, this.replicaMgr.localLogOrException(topicPartition).latestEpoch());
    }

    @Override
    public Optional<PartitionFetchState> maybeAdvanceState(TopicPartition topicPartition, PartitionFetchState currentFetchState) {
        return Optional.of(currentFetchState);
    }

    private OffsetForLeaderEpochResponseData.EpochEndOffset fetchEarlierEpochEndOffset(Integer epoch, TopicPartition partition, Integer currentLeaderEpoch) {
        int previousEpoch = epoch - 1;
        HashMap<TopicPartition, OffsetForLeaderEpochRequestData.OffsetForLeaderPartition> partitionsWithEpochs = new HashMap<TopicPartition, OffsetForLeaderEpochRequestData.OffsetForLeaderPartition>();
        partitionsWithEpochs.put(partition, new OffsetForLeaderEpochRequestData.OffsetForLeaderPartition().setPartition(partition.partition()).setCurrentLeaderEpoch(currentLeaderEpoch.intValue()).setLeaderEpoch(previousEpoch));
        Option maybeEpochEndOffset = this.leader.fetchEpochEndOffsets((Map<TopicPartition, OffsetForLeaderEpochRequestData.OffsetForLeaderPartition>)JavaConverters.mapAsScalaMap(partitionsWithEpochs)).get((Object)partition);
        if (maybeEpochEndOffset.isEmpty()) {
            throw new KafkaException("No response received for partition: " + partition);
        }
        OffsetForLeaderEpochResponseData.EpochEndOffset epochEndOffset = (OffsetForLeaderEpochResponseData.EpochEndOffset)maybeEpochEndOffset.get();
        if (epochEndOffset.errorCode() != Errors.NONE.code()) {
            throw Errors.forCode((short)epochEndOffset.errorCode()).exception();
        }
        return epochEndOffset;
    }

    private List<EpochEntry> readLeaderEpochCheckpoint(RemoteLogManager rlm, RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws IOException, RemoteStorageException {
        InputStream inputStream = rlm.storageManager().fetchIndex(remoteLogSegmentMetadata, RemoteStorageManager.IndexType.LEADER_EPOCH);
        try (BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8));){
            CheckpointFile.CheckpointReadBuffer readBuffer = new CheckpointFile.CheckpointReadBuffer("", bufferedReader, 0, (CheckpointFile.EntryFormatter)LeaderEpochCheckpointFile.FORMATTER);
            List list = readBuffer.read();
            return list;
        }
    }

    private void buildProducerSnapshotFile(File snapshotFile, RemoteLogSegmentMetadata remoteLogSegmentMetadata, RemoteLogManager rlm) throws IOException, RemoteStorageException {
        File tmpSnapshotFile = new File(snapshotFile.getAbsolutePath() + ".tmp");
        Files.copy(rlm.storageManager().fetchIndex(remoteLogSegmentMetadata, RemoteStorageManager.IndexType.PRODUCER_SNAPSHOT), tmpSnapshotFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
        Utils.atomicMoveWithFallback((Path)tmpSnapshotFile.toPath(), (Path)snapshotFile.toPath(), (boolean)false);
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    private Long buildRemoteLogAuxState(TopicPartition topicPartition, Integer currentLeaderEpoch, Long leaderLocalLogStartOffset, Integer epochForLeaderLocalLogStartOffset, Long leaderLogStartOffset) throws IOException, RemoteStorageException {
        OffsetForLeaderEpochResponseData.EpochEndOffset earlierEpochEndOffset;
        UnifiedLog unifiedLog = this.replicaMgr.localLogOrException(topicPartition);
        if (!unifiedLog.remoteStorageSystemEnable() || !unifiedLog.config().remoteStorageEnable()) throw new RemoteStorageException("Couldn't build the state from remote store for partition " + topicPartition + ", as remote log storage is not yet enabled");
        if (this.replicaMgr.remoteLogManager().isEmpty()) {
            throw new IllegalStateException("RemoteLogManager is not yet instantiated");
        }
        RemoteLogManager rlm = (RemoteLogManager)this.replicaMgr.remoteLogManager().get();
        long previousOffsetToLeaderLocalLogStartOffset = leaderLocalLogStartOffset - 1L;
        int targetEpoch = epochForLeaderLocalLogStartOffset == 0 ? epochForLeaderLocalLogStartOffset : ((earlierEpochEndOffset = this.fetchEarlierEpochEndOffset(epochForLeaderLocalLogStartOffset, topicPartition, currentLeaderEpoch)).endOffset() > previousOffsetToLeaderLocalLogStartOffset ? earlierEpochEndOffset.leaderEpoch() : epochForLeaderLocalLogStartOffset.intValue());
        Optional<RemoteLogSegmentMetadata> maybeRlsm = rlm.fetchRemoteLogSegmentMetadata(topicPartition, targetEpoch, previousOffsetToLeaderLocalLogStartOffset);
        if (!maybeRlsm.isPresent()) throw new RemoteStorageException("Couldn't build the state from remote store for partition: " + topicPartition + ", currentLeaderEpoch: " + currentLeaderEpoch + ", leaderLocalLogStartOffset: " + leaderLocalLogStartOffset + ", leaderLogStartOffset: " + leaderLogStartOffset + ", epoch: " + targetEpoch + "as the previous remote log segment metadata was not found");
        RemoteLogSegmentMetadata remoteLogSegmentMetadata = maybeRlsm.get();
        long nextOffset = remoteLogSegmentMetadata.endOffset() + 1L;
        Partition partition = this.replicaMgr.getPartitionOrException(topicPartition);
        partition.truncateFullyAndStartAt(nextOffset, false, (Option<Object>)Option.apply((Object)leaderLogStartOffset));
        unifiedLog.maybeIncrementLogStartOffset(leaderLogStartOffset, LogStartOffsetIncrementReason.LeaderOffsetIncremented);
        List<EpochEntry> epochs = this.readLeaderEpochCheckpoint(rlm, remoteLogSegmentMetadata);
        if (unifiedLog.leaderEpochCache().isDefined()) {
            ((LeaderEpochFileCache)unifiedLog.leaderEpochCache().get()).assign(epochs);
        }
        log.debug("Updated the epoch cache from remote tier till offset: {} with size: {} for {}", new Object[]{leaderLocalLogStartOffset, epochs.size(), partition});
        File snapshotFile = LogFileUtils.producerSnapshotFile((File)unifiedLog.dir(), (long)nextOffset);
        this.buildProducerSnapshotFile(snapshotFile, remoteLogSegmentMetadata, rlm);
        unifiedLog.producerStateManager().truncateFullyAndReloadSnapshots();
        unifiedLog.loadProducerState(nextOffset);
        log.debug("Built the leader epoch cache and producer snapshots from remote tier for {}, with active producers size: {}, leaderLogStartOffset: {}, and logEndOffset: {}", new Object[]{partition, unifiedLog.producerStateManager().activeProducers().size(), leaderLogStartOffset, nextOffset});
        return nextOffset;
    }
}

