/*
 * Decompiled with CFR 0.152.
 */
package kafka.tier.tools;

import com.fasterxml.jackson.core.JsonProcessingException;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Collectors;
import kafka.server.KafkaConfig;
import kafka.tier.TopicIdPartition;
import kafka.tier.fetcher.CancellationContext;
import kafka.tier.state.TierPartitionStatus;
import kafka.tier.store.TierObjectStore;
import kafka.tier.store.TierObjectStoreConfig;
import kafka.tier.store.TierObjectStoreUtils;
import kafka.tier.tools.RecoveryUtils;
import kafka.tier.tools.TierMetadataValidator;
import kafka.tier.tools.TierObjectStoreFactory;
import kafka.tier.tools.TierRecoveryConfig;
import kafka.tier.tools.TierTopicMaterializationToolConfig;
import kafka.tier.tools.TierTopicMaterializationUtils;
import kafka.tier.tools.common.ComparatorInfo;
import kafka.tier.tools.common.FenceEventInfo;
import kafka.utils.CoreUtils;
import kafka.utils.KafkaScheduler;
import kafka.utils.Scheduler;
import net.sourceforge.argparse4j.ArgumentParsers;
import net.sourceforge.argparse4j.inf.ArgumentParser;
import net.sourceforge.argparse4j.inf.ArgumentParserException;
import net.sourceforge.argparse4j.inf.Namespace;
import net.sourceforge.argparse4j.internal.HelpScreenException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;

public class TierMetadataComparator
implements AutoCloseable {
    private final Properties props;
    private final Path outputJsonFile;
    private final List<FenceEventInfo> fenceEvents;
    private final TierTopicMaterializationUtils materializationUtils;
    private final Map<TopicIdPartition, Long> fencedOffsetMap;
    private final CancellationContext cancellationContext;
    private final Optional<TierObjectStore> objStoreOpt;
    private final boolean offsetScanFlag;
    public static final List<String> REQUIRED_PROPERTIES = Arrays.asList("confluent.tier.recovery.broker.workdir.list", "confluent.tier.recovery.working.dir", "confluent.tier.recovery.validate", "confluent.tier.recovery.materialize");

    public TierMetadataComparator(Properties props, List<FenceEventInfo> fenceEvents, Path outputJsonFile, Scheduler scheduler) {
        this.props = props;
        this.fenceEvents = fenceEvents;
        this.fencedOffsetMap = TierMetadataComparator.generateOffsetMapFromInput(fenceEvents);
        this.materializationUtils = new TierTopicMaterializationUtils(new TierTopicMaterializationToolConfig(TierRecoveryConfig.toMaterializerProperties(props)), props, new HashMap<TopicIdPartition, Long>(this.fencedOffsetMap), scheduler);
        this.cancellationContext = CancellationContext.newContext();
        this.outputJsonFile = outputJsonFile;
        this.objStoreOpt = TierMetadataComparator.getObjectStoreMaybe(this.props);
        this.offsetScanFlag = props.containsKey("confluent.tier.recovery.validate") && Boolean.parseBoolean(props.getProperty("confluent.tier.recovery.validate"));
    }

    static Optional<TierObjectStore> getObjectStoreMaybe(Properties props) {
        if (props.containsKey("confluent.tier.recovery.validate") && Boolean.parseBoolean(props.getProperty("confluent.tier.recovery.validate"))) {
            TierObjectStore.Backend backend = TierObjectStore.Backend.valueOf(props.getProperty(KafkaConfig.TierBackendProp()));
            TierObjectStoreConfig config = TierObjectStoreUtils.generateBackendConfig(backend, props);
            TierObjectStore objStore = TierObjectStoreFactory.getObjectStoreInstance(backend, config);
            System.out.println("Initialized Backend: " + (Object)((Object)backend) + " with objStore: " + objStore);
            return Optional.of(objStore);
        }
        System.out.println("Not initializing any backend, will avoid doing cloud object presence check!");
        return Optional.empty();
    }

    public void run() {
        System.out.println("Starting TierMetadataComparator with properties: " + this.props + " for partitions: " + Arrays.toString(this.fenceEvents.toArray()));
        Path baseMaterializationPath = Paths.get(this.props.getProperty("confluent.tier.recovery.working.dir"), new String[0]);
        try {
            this.materializationUtils.run();
        }
        catch (Exception e) {
            System.err.println("Failed to materialize states from tier state topic, " + e);
            throw new IllegalStateException("Failed to materialize states from tier state topic", e);
        }
        System.out.println("Materialized base states at: " + baseMaterializationPath + " for topicIdPartitions: " + Arrays.toString(this.fencedOffsetMap.keySet().toArray()));
        Map<String, Path> serverMap = TierMetadataComparator.getVerifiedTierFolderMap(this.props);
        serverMap.put("rematerialized", baseMaterializationPath);
        List<ComparatorInfo.ComparatorReplicaInfo> allReplicaInfo = TierMetadataComparator.getReplicas(this.fencedOffsetMap.keySet(), serverMap);
        allReplicaInfo.forEach(replicaInfo -> TierMetadataComparator.validateTierStateAndUpdateInfo(replicaInfo, this.cancellationContext, this.materializationUtils::getStartOffset, this.objStoreOpt, this.offsetScanFlag, TierPartitionStatus.ERROR));
        System.out.println("Completed tier-state validation for info count: " + allReplicaInfo.size());
        TierMetadataComparator.generateChoiceAndWriteJsonOutput(this.fenceEvents, allReplicaInfo, this.outputJsonFile, this.fencedOffsetMap);
    }

    static void validateTierStateAndUpdateInfo(ComparatorInfo.ComparatorReplicaInfo info, CancellationContext cancellationContext, Function<TopicPartition, Long> startOffsetProducer, Optional<TierObjectStore> objStoreOpt, boolean offsetScanFlag, TierPartitionStatus requiredStatus) {
        try {
            TierMetadataValidator.TierMetadataValidatorResult validatorResult = TierMetadataValidator.validateStandaloneTierStateFile(info.tierStateFile(), info.topicIdPartition(), objStoreOpt, offsetScanFlag, cancellationContext, startOffsetProducer);
            if (validatorResult.valid) {
                if (!validatorResult.headerOpt.isPresent()) {
                    throw new IllegalStateException("Valid state must have a header.");
                }
                if (validatorResult.headerOpt.get().status() != requiredStatus) {
                    throw new IllegalStateException("Validated TierPartitionState must be in " + (Object)((Object)requiredStatus) + " status.");
                }
            }
            validatorResult.headerOpt.ifPresent(info::setHeader);
            info.setValidationSuccess(validatorResult.valid);
        }
        catch (Exception e) {
            System.err.println("Couldn't validate replicaInfo: " + info + " due to exception: " + e);
            info.setException(e);
        }
    }

    static Map<TopicIdPartition, Optional<ComparatorInfo.ComparatorReplicaInfo>> generateChoices(List<ComparatorInfo.ComparatorReplicaInfo> allInfoList, Map<TopicIdPartition, Long> fencedOffsetMap) {
        return allInfoList.stream().filter(ComparatorInfo.ComparatorReplicaInfo::isValidationSuccess).filter(info -> fencedOffsetMap.containsKey(info.topicIdPartition()) && (Long)fencedOffsetMap.get(info.topicIdPartition()) >= info.lastOffset()).collect(Collectors.groupingBy(ComparatorInfo.ComparatorReplicaInfo::topicIdPartition, Collectors.maxBy(Comparator.comparingLong(ComparatorInfo.ComparatorReplicaInfo::lastOffset))));
    }

    private static void generateChoiceAndWriteJsonOutput(List<FenceEventInfo> fencedEvents, List<ComparatorInfo.ComparatorReplicaInfo> infoList, Path outputJsonFile, Map<TopicIdPartition, Long> fencedOffsetMap) {
        Map<TopicIdPartition, Optional<ComparatorInfo.ComparatorReplicaInfo>> choiceMap = TierMetadataComparator.generateChoices(infoList, fencedOffsetMap);
        Map<TopicIdPartition, List<ComparatorInfo.ComparatorReplicaInfo>> replicaMap = infoList.stream().collect(Collectors.groupingBy(ComparatorInfo.ComparatorReplicaInfo::topicIdPartition));
        List<ComparatorInfo.ComparatorOutput> outputList = fencedEvents.stream().map(input -> {
            TopicIdPartition topicIdPartition = TierMetadataComparator.getTopicIdPartitionFromInput(input);
            Map<String, ComparatorInfo.ComparatorReplicaInfo> replicaInfoMap = ((List)replicaMap.get(topicIdPartition)).stream().collect(Collectors.toMap(ComparatorInfo.ComparatorReplicaInfo::getReplica, info -> info));
            ComparatorInfo.ComparatorReplicaInfo choice = (ComparatorInfo.ComparatorReplicaInfo)((Optional)choiceMap.get(topicIdPartition)).orElseGet(null);
            return new ComparatorInfo.ComparatorOutput(replicaInfoMap, choice, (FenceEventInfo)input);
        }).collect(Collectors.toList());
        try {
            ComparatorInfo.ComparatorOutput.writeJsonToFile(outputList, outputJsonFile);
            System.out.println("JSON Output written to: " + outputJsonFile);
        }
        catch (IOException e) {
            System.err.println("Error in writing out the Json output: " + outputJsonFile + "due to: " + e);
        }
    }

    static List<ComparatorInfo.ComparatorReplicaInfo> getReplicas(Set<TopicIdPartition> partitions, Map<String, Path> tierStateFolderMap) {
        ArrayList<ComparatorInfo.ComparatorReplicaInfo> allReplicaInfoList = new ArrayList<ComparatorInfo.ComparatorReplicaInfo>();
        for (String replicaId : tierStateFolderMap.keySet()) {
            Path tierStateFolder = tierStateFolderMap.get(replicaId);
            System.out.println("Generating info for replica: " + replicaId + " with folder: " + tierStateFolder);
            try {
                List infoList = TierMetadataValidator.snapshotStateFiles(tierStateFolder.toFile(), false, tierStateFolder.toString()).entrySet().stream().filter(entry -> partitions.contains(entry.getKey())).map(entry -> new ComparatorInfo.ComparatorReplicaInfo(replicaId, ((TierMetadataValidator.TierMetadataValidatorRecord)entry.getValue()).snapshot, (TopicIdPartition)entry.getKey())).collect(Collectors.toList());
                if (infoList.size() != partitions.size()) {
                    throw new IllegalStateException("Couldn't collect all partitions for replica: " + replicaId);
                }
                allReplicaInfoList.addAll(infoList);
            }
            catch (Exception e) {
                System.err.println("Error in creating replicaInfo for replica: " + replicaId + " due to: " + e);
            }
        }
        System.out.println("Generated allReplicaInfoList count: " + allReplicaInfoList.size());
        return allReplicaInfoList;
    }

    static Map<TopicIdPartition, Long> generateOffsetMapFromInput(List<FenceEventInfo> fencedEvents) {
        List fencedList = fencedEvents.stream().map(input -> {
            TopicIdPartition topicIdPartition = TierMetadataComparator.getTopicIdPartitionFromInput(input);
            return new AbstractMap.SimpleEntry<TopicIdPartition, Long>(topicIdPartition, input.recordOffset);
        }).collect(Collectors.toList());
        Set topicIdPartitionSet = fencedList.stream().map(Map.Entry::getKey).collect(Collectors.toSet());
        if (topicIdPartitionSet.size() != fencedList.size()) {
            throw new IllegalArgumentException("Duplicate topicIdPartitions as part of input fenced events: " + Arrays.toString(fencedEvents.toArray()));
        }
        Map<TopicIdPartition, Long> offsetMap = fencedList.stream().collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
        System.out.println("Generated offsetMap: " + Arrays.toString(offsetMap.keySet().toArray()));
        if (offsetMap.size() != fencedEvents.size()) {
            throw new IllegalArgumentException("Entire fencedEvents couldn't be converted to offsetMap!");
        }
        return offsetMap;
    }

    static TopicIdPartition getTopicIdPartitionFromInput(FenceEventInfo input) {
        UUID id = CoreUtils.uuidFromBase64(input.topicIdBase64);
        return new TopicIdPartition(input.topic, id, input.partition);
    }

    private static List<FenceEventInfo> getComparatorInput(Path inputJsonFile) {
        if (Files.notExists(inputJsonFile, new LinkOption[0]) || !Files.isRegularFile(inputJsonFile, new LinkOption[0])) {
            throw new IllegalArgumentException("Incorrect json file provided: " + inputJsonFile);
        }
        try {
            List<FenceEventInfo> inputList = FenceEventInfo.jsonToList(inputJsonFile);
            System.out.println("Received JSON input: " + Arrays.toString(inputList.toArray()));
            return inputList;
        }
        catch (JsonProcessingException e) {
            throw new IllegalStateException("Couldn't parse provided input JSON", e);
        }
        catch (IOException e) {
            throw new IllegalArgumentException("Incorrect JSON file provided: " + inputJsonFile, e);
        }
    }

    static Map<String, Path> getVerifiedTierFolderMap(Properties props) {
        Object[] workingDirs = props.getProperty("confluent.tier.recovery.broker.workdir.list").split(",");
        if (workingDirs.length == 0) {
            throw new IllegalArgumentException("Received empty: confluent.tier.recovery.broker.workdir.list");
        }
        HashMap<String, Path> bootstrapServerMap = new HashMap<String, Path>();
        for (String string : workingDirs) {
            Path workdir = Paths.get(string, new String[0]);
            if (Files.notExists(workdir, new LinkOption[0]) || !Files.isDirectory(workdir, new LinkOption[0])) {
                throw new IllegalArgumentException("Incorrect workdir: " + workdir);
            }
            String replicaId = workdir.getFileName().toString();
            if (bootstrapServerMap.containsKey(replicaId)) {
                throw new IllegalArgumentException("Found duplicate replicaId " + replicaId + " in: " + Arrays.toString(workingDirs));
            }
            if ("rematerialized".equals(replicaId)) {
                throw new IllegalArgumentException("replicaId can't be: rematerialized");
            }
            bootstrapServerMap.put(replicaId, workdir);
        }
        return bootstrapServerMap;
    }

    static ArgumentParser createComparatorParser() {
        ArgumentParser parser = ArgumentParsers.newArgumentParser((String)TierMetadataComparator.class.getName()).defaultHelp(true).description("Compares the tier-state files across different brokers");
        parser.addArgument(new String[]{RecoveryUtils.makeArgument("tier.config")}).dest("tier.config").type(String.class).required(true).help("The path to a configuration file containing the required properties");
        parser.addArgument(new String[]{RecoveryUtils.makeArgument("input.json")}).dest("input.json").type(String.class).required(true).help("The path to a json file to be accepted as the input to the tool");
        parser.addArgument(new String[]{RecoveryUtils.makeArgument("output.json")}).dest("output.json").type(String.class).required(true).help("The path to a json file where the tool will generate the output");
        return parser;
    }

    static Properties fetchPropertiesFromArgs(String[] args, ArgumentParser parser) throws Exception {
        try {
            Properties props = new Properties();
            Namespace res = parser.parseArgs(args);
            String toolsConfigFile = res.getString("tier.config");
            System.out.println("TierMetadataComparator received properties file: " + toolsConfigFile);
            props.putAll((Map<?, ?>)Utils.loadProps((String)toolsConfigFile));
            for (String key : REQUIRED_PROPERTIES) {
                if (props.containsKey(key)) continue;
                throw new IllegalArgumentException("Properties doesn't contain key: " + key + ", allProps: " + props);
            }
            System.out.println("fetchPropertiesFromArgs received props: " + props);
            return props;
        }
        catch (IOException | ArgumentParserException e) {
            if (e instanceof ArgumentParserException) {
                parser.handleError((ArgumentParserException)e);
                throw e;
            }
            throw new IllegalArgumentException("Couldn't create properties from provided file!", e);
        }
    }

    static Path createJsonPathFromArgs(String[] args, ArgumentParser parser, String argKey) {
        try {
            Namespace res = parser.parseArgs(args);
            Path inputFile = Paths.get(res.getString(argKey), new String[0]);
            System.out.println("TierMetadataComparator received " + argKey + " file: " + inputFile);
            return inputFile;
        }
        catch (ArgumentParserException e) {
            throw new IllegalArgumentException("Couldn't create " + argKey + " from provided file!", e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static void main(String[] args) throws Exception {
        System.out.println("Received cmdline args: " + Arrays.toString(args));
        ArgumentParser cliParser = TierMetadataComparator.createComparatorParser();
        Properties props = null;
        try {
            props = TierMetadataComparator.fetchPropertiesFromArgs(args, cliParser);
        }
        catch (HelpScreenException e) {
            Exit.exit((int)0);
        }
        Path inputJsonFile = TierMetadataComparator.createJsonPathFromArgs(args, cliParser, "input.json");
        Path outputJsonFile = TierMetadataComparator.createJsonPathFromArgs(args, cliParser, "output.json");
        List<FenceEventInfo> partitions = TierMetadataComparator.getComparatorInput(inputJsonFile);
        KafkaScheduler scheduler = new KafkaScheduler(1, "tier-metadata-comparator-scheduler-", true);
        scheduler.startup();
        try (TierMetadataComparator comparator = new TierMetadataComparator(props, partitions, outputJsonFile, scheduler);){
            comparator.run();
        }
        catch (Exception e) {
            System.err.println("Received exception during comparator runtime");
            e.printStackTrace();
            Exit.exit((int)1);
        }
        finally {
            scheduler.shutdown();
        }
    }

    @Override
    public void close() {
        this.cancellationContext.cancel();
        this.objStoreOpt.ifPresent(objectStore -> TierObjectStoreFactory.closeBackendInstance(objectStore.getBackend()));
    }
}

