/*
 * Decompiled with CFR 0.152.
 */
package kafka.tier;

import io.confluent.kafka.availability.FilesWrapper;
import io.confluent.kafka.availability.ThreadCountersManager;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import kafka.server.LogDirFailureChannel;
import kafka.tier.state.OffsetAndEpoch;
import kafka.tier.topic.TierTopicManagerConfig;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Function0;
import scala.compat.java8.JFunction;

public class TierTopicManagerCommitter {
    public static final VersionInfo CURRENT_VERSION = VersionInfo.VERSION_1;
    private static final Logger log = LoggerFactory.getLogger(TierTopicManagerCommitter.class);
    private static final int NO_EPOCH = -1;
    private static final String SEPARATOR = " ";
    private final TierTopicManagerConfig config;
    private final LogDirFailureChannel logDirFailureChannel;
    private final Map<Integer, OffsetAndEpoch> partitionToPosition = new ConcurrentHashMap<Integer, OffsetAndEpoch>();

    public TierTopicManagerCommitter(TierTopicManagerConfig config, LogDirFailureChannel logDirFailureChannel) {
        if (config.logDirs.size() != 1) {
            throw new UnsupportedOperationException("TierTopicManager does not currently support multiple logdirs.");
        }
        this.config = config;
        this.logDirFailureChannel = logDirFailureChannel;
        this.clearTempFiles();
        this.loadOffsets();
    }

    public void updatePosition(int partition, OffsetAndEpoch updateTo) {
        OffsetAndEpoch current = this.partitionToPosition.getOrDefault(partition, OffsetAndEpoch.EMPTY);
        if (current.offset() >= updateTo.offset()) {
            throw new IllegalStateException("Illegal offset in " + updateTo + " with current position " + current);
        }
        if (current.epoch().isPresent() && updateTo.epoch().isPresent() && current.epoch().get() > updateTo.epoch().get()) {
            throw new IllegalStateException("Illegal epoch in " + updateTo + " with current position " + current);
        }
        this.partitionToPosition.put(partition, updateTo);
        log.debug("Committer position updated {}:{}", (Object)partition, (Object)updateTo);
    }

    public OffsetAndEpoch positionFor(int partitionId) {
        return this.partitionToPosition.get(partitionId);
    }

    public synchronized Map<Integer, OffsetAndEpoch> takePositionsSnapshot() {
        return new HashMap<Integer, OffsetAndEpoch>(this.partitionToPosition);
    }

    public void writePositionsSnapshot(Map<Integer, OffsetAndEpoch> positions) {
        this.writeOffsets(CURRENT_VERSION, positions);
    }

    static Map<Integer, OffsetAndEpoch> earliestOffsets(List<Map<Integer, OffsetAndEpoch>> diskOffsets) {
        if (diskOffsets.stream().map(Map::keySet).collect(Collectors.toSet()).size() != 1) {
            return Collections.emptyMap();
        }
        HashMap<Integer, OffsetAndEpoch> minimum = new HashMap<Integer, OffsetAndEpoch>();
        for (Map<Integer, OffsetAndEpoch> offsets : diskOffsets) {
            log.debug("Loading offsets from logdir {}.", diskOffsets);
            for (Map.Entry<Integer, OffsetAndEpoch> entry : offsets.entrySet()) {
                minimum.compute(entry.getKey(), (k, v) -> {
                    if (v == null || ((OffsetAndEpoch)entry.getValue()).offset() < v.offset()) {
                        return (OffsetAndEpoch)entry.getValue();
                    }
                    return v;
                });
            }
        }
        log.debug("Minimum offsets found {}.", minimum);
        return minimum;
    }

    private static String commitPath(String logDir) {
        return logDir + "/tier.offsets";
    }

    private static String commitTempFilename(String logDir) {
        return TierTopicManagerCommitter.commitPath(logDir) + ".tmp";
    }

    private void clearTempFiles() {
        for (String logDir : this.config.logDirs) {
            try {
                FilesWrapper.deleteIfExists((Path)Paths.get(TierTopicManagerCommitter.commitTempFilename(logDir), new String[0]));
            }
            catch (IOException ioe) {
                this.logDirFailureChannel.maybeAddOfflineLogDir(logDir, (Function0<String>)JFunction.func(() -> "Failed to delete temporory tier offsets in logdir."), ioe);
            }
        }
    }

    static Map<Integer, OffsetAndEpoch> committed(String logDir, LogDirFailureChannel logDirFailureChannel) {
        return (Map)ThreadCountersManager.wrapIO(() -> {
            Map<Integer, OffsetAndEpoch> map;
            Throwable throwable;
            BufferedReader br;
            Throwable throwable2;
            FileReader fr;
            block30: {
                block31: {
                    block28: {
                        block29: {
                            fr = new FileReader(TierTopicManagerCommitter.commitPath(logDir));
                            throwable2 = null;
                            br = new BufferedReader(fr);
                            throwable = null;
                            String versionLine = br.readLine();
                            map = TierTopicManagerCommitter.readPayload(br, TierTopicManagerCommitter.readVersion(versionLine));
                            if (br == null) break block28;
                            if (throwable == null) break block29;
                            try {
                                br.close();
                            }
                            catch (Throwable throwable3) {
                                throwable.addSuppressed(throwable3);
                            }
                            break block28;
                        }
                        br.close();
                    }
                    if (fr == null) break block30;
                    if (throwable2 == null) break block31;
                    try {
                        fr.close();
                    }
                    catch (Throwable throwable4) {
                        throwable2.addSuppressed(throwable4);
                    }
                    break block30;
                }
                fr.close();
            }
            return map;
            {
                catch (Throwable throwable5) {
                    try {
                        try {
                            try {
                                try {
                                    throwable = throwable5;
                                    throw throwable5;
                                }
                                catch (Throwable throwable6) {
                                    if (br != null) {
                                        if (throwable != null) {
                                            try {
                                                br.close();
                                            }
                                            catch (Throwable throwable7) {
                                                throwable.addSuppressed(throwable7);
                                            }
                                        } else {
                                            br.close();
                                        }
                                    }
                                    throw throwable6;
                                }
                            }
                            catch (Throwable throwable8) {
                                throwable2 = throwable8;
                                throw throwable8;
                            }
                        }
                        catch (Throwable throwable9) {
                            if (fr != null) {
                                if (throwable2 != null) {
                                    try {
                                        fr.close();
                                    }
                                    catch (Throwable throwable10) {
                                        throwable2.addSuppressed(throwable10);
                                    }
                                } else {
                                    fr.close();
                                }
                            }
                            throw throwable9;
                        }
                    }
                    catch (FileNotFoundException fnf) {
                        log.info("TierTopicManager offsets not found. This is expected if this is the first time starting up with tiered storage.", (Throwable)fnf);
                    }
                    catch (IOException ioe) {
                        log.error("Error loading TierTopicManager offsets. Setting logdir offline.", (Throwable)ioe);
                        logDirFailureChannel.maybeAddOfflineLogDir(logDir, (Function0<String>)JFunction.func(() -> "Failed to commit tier offsets to logdir."), ioe);
                    }
                    catch (Exception e) {
                        log.warn("Exception encountered when reading tier checkpoint file. Resetting offsets.", (Throwable)e);
                    }
                }
            }
            return Collections.emptyMap();
        });
    }

    private static VersionInfo readVersion(String line) {
        return VersionInfo.toVersionInfo(Integer.parseInt(line));
    }

    private static Map<Integer, OffsetAndEpoch> readPayload(BufferedReader br, VersionInfo versionInfo) throws IOException {
        String line;
        HashMap<Integer, OffsetAndEpoch> committedPositions = new HashMap<Integer, OffsetAndEpoch>();
        while ((line = br.readLine()) != null) {
            String[] values = line.split(SEPARATOR);
            if (values.length == versionInfo.numFields) {
                OffsetAndEpoch previousPosition;
                int deserializedEpoch;
                int partitionId = Integer.parseInt(values[0]);
                long offset = Long.parseLong(values[1]);
                Optional<Integer> epoch = Optional.empty();
                if (versionInfo.version > VersionInfo.VERSION_0.version && (deserializedEpoch = Integer.parseInt(values[2])) != -1) {
                    epoch = Optional.of(deserializedEpoch);
                }
                if ((previousPosition = committedPositions.put(partitionId, new OffsetAndEpoch(offset, epoch))) == null) continue;
                throw new IllegalStateException("Found duplicate positions for partition " + partitionId);
            }
            throw new IllegalStateException("Committed offsets found in incorrect format on line " + line);
        }
        return committedPositions;
    }

    private void loadOffsets() {
        Map<Integer, OffsetAndEpoch> earliest = TierTopicManagerCommitter.earliestOffsets(this.config.logDirs.stream().map(logDir -> TierTopicManagerCommitter.committed(logDir, this.logDirFailureChannel)).collect(Collectors.toList()));
        this.partitionToPosition.clear();
        this.partitionToPosition.putAll(earliest);
    }

    void writeOffsets(VersionInfo versionInfo, Map<Integer, OffsetAndEpoch> offsets) {
        ThreadCountersManager.wrapIOVoid(() -> {
            for (String logDir : this.config.logDirs) {
                try {
                    try (FileWriter fw = new FileWriter(TierTopicManagerCommitter.commitTempFilename(logDir));
                         BufferedWriter bw = new BufferedWriter(fw);){
                        bw.write(String.valueOf(versionInfo.version));
                        bw.newLine();
                        for (Map.Entry entry : offsets.entrySet()) {
                            int partitionId = (Integer)entry.getKey();
                            long offset = ((OffsetAndEpoch)entry.getValue()).offset();
                            int epoch = ((OffsetAndEpoch)entry.getValue()).epoch().orElse(-1);
                            bw.write(partitionId + SEPARATOR + offset);
                            if (versionInfo.version > VersionInfo.VERSION_0.version) {
                                bw.write(SEPARATOR + epoch);
                            }
                            bw.newLine();
                        }
                    }
                    Utils.atomicMoveWithFallback((Path)Paths.get(TierTopicManagerCommitter.commitTempFilename(logDir), new String[0]), (Path)Paths.get(TierTopicManagerCommitter.commitPath(logDir), new String[0]));
                }
                catch (IOException ioe) {
                    this.logDirFailureChannel.maybeAddOfflineLogDir(logDir, (Function0<String>)JFunction.func(() -> "Failed to commit tier offsets to logdir."), ioe);
                }
            }
        });
    }

    static enum VersionInfo {
        VERSION_0(0, 2),
        VERSION_1(1, 3);

        private static final Map<Integer, VersionInfo> VERSION_MAP;
        final int version;
        final int numFields;

        private VersionInfo(int version, int numFields) {
            this.version = version;
            this.numFields = numFields;
        }

        public static VersionInfo toVersionInfo(int version) {
            VersionInfo versionInfo = VERSION_MAP.get(version);
            if (versionInfo == null) {
                throw new IllegalStateException("Unknown version " + version);
            }
            return versionInfo;
        }

        static {
            VERSION_MAP = new HashMap<Integer, VersionInfo>();
            for (VersionInfo versionInfo : VersionInfo.values()) {
                VersionInfo oldVersion = VERSION_MAP.put(versionInfo.version, versionInfo);
                if (oldVersion == null) continue;
                throw new ExceptionInInitializerError("Found duplicate version " + versionInfo.version);
            }
        }
    }
}

