/*
 * Decompiled with CFR 0.152.
 */
package kafka.tier.tools;

import com.google.common.collect.ImmutableList;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.channels.FileChannel;
import java.nio.file.CopyOption;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.function.Function;
import joptsimple.ArgumentAcceptingOptionSpec;
import joptsimple.OptionParser;
import joptsimple.OptionSet;
import joptsimple.OptionSpec;
import kafka.log.Log;
import kafka.server.KafkaConfig;
import kafka.tier.TopicIdPartition;
import kafka.tier.domain.TierObjectMetadata;
import kafka.tier.exceptions.TierObjectStoreRetriableException;
import kafka.tier.fetcher.CancellationContext;
import kafka.tier.fetcher.TierSegmentReader;
import kafka.tier.state.FileTierPartitionIterator;
import kafka.tier.state.FileTierPartitionState;
import kafka.tier.state.Header;
import kafka.tier.store.TierObjectStore;
import kafka.tier.store.TierObjectStoreConfig;
import kafka.tier.store.TierObjectStoreResponse;
import kafka.tier.store.TierObjectStoreUtils;
import kafka.tier.tools.TierCloudBackendUtils;
import kafka.tier.tools.TierObjectStoreFactory;
import kafka.tier.tools.TierRecoveryConfig;
import kafka.tier.tools.TierTopicMaterializationToolConfig;
import kafka.tier.tools.TierTopicMaterializationUtils;
import kafka.utils.KafkaScheduler;
import kafka.utils.Scheduler;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.record.RecordBatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TierMetadataValidator
implements AutoCloseable {
    private Map<TopicIdPartition, TierMetadataValidatorRecord> stateMap = new HashMap<TopicIdPartition, TierMetadataValidatorRecord>();
    public final String metadataStatesDir;
    public final String workDir;
    private static final String SNAPSHOT_DIR_SUFFIX = "snapshots";
    public Properties props = new Properties();
    private TierObjectStore objectStore;
    private boolean validateAgainstObjectStore;
    private boolean verifyOffsetScanAgainstObjectStore = false;
    private TierObjectStore.Backend backend = null;
    TierTopicMaterializationUtils utils;
    private final CancellationContext cancellationContext;
    private final Scheduler scheduler;
    private static final int OBJECT_STORE_RETRY_COUNT = 3;
    private static final long OBJECT_STORE_BACKOFF_MS = 1000L;
    private static final String OFFSET_SCAN_PREFIX = "[OFFSET_SCAN] ";
    private static final Logger log = LoggerFactory.getLogger(TierMetadataValidator.class);

    TierMetadataValidator(String[] args, Scheduler scheduler) {
        this.parseArgs(args);
        this.workDir = this.props.getProperty("working-dir");
        this.metadataStatesDir = this.props.getProperty("metadata-states-dir");
        this.validateAgainstObjectStore = (Boolean)this.props.get("validate-tier-storage");
        if (this.validateAgainstObjectStore) {
            this.backend = (TierObjectStore.Backend)((Object)this.props.get(KafkaConfig.TierBackendProp()));
            this.verifyOffsetScanAgainstObjectStore = (Boolean)this.props.get("validate-tier-storage-offset");
            if (this.verifyOffsetScanAgainstObjectStore && this.backend != TierObjectStore.Backend.S3) {
                throw new IllegalArgumentException("Unsupported backend for offset scan: " + (Object)((Object)this.backend));
            }
            TierObjectStoreConfig objectStoreConfig = TierObjectStoreUtils.generateBackendConfig(this.backend, this.props);
            this.objectStore = TierObjectStoreFactory.getObjectStoreInstance(this.backend, objectStoreConfig);
            log.info("Successfully created backend: {}", (Object)this.backend);
        }
        this.cancellationContext = CancellationContext.newContext();
        this.scheduler = scheduler;
    }

    private void parseArgs(String[] args) {
        OptionParser parser = new OptionParser();
        ArgumentAcceptingOptionSpec workingDirSpec = parser.accepts("working-dir", "working-dir is the directory path where the tool will generate its data").withRequiredArg().describedAs("working-dir").ofType(String.class).defaultsTo((Object)"/tmp/workdir", (Object[])new String[0]);
        ArgumentAcceptingOptionSpec metaStatesDirSpec = parser.accepts("metadata-states-dir", "data directory of kafka, to pull tier topic state files.").withRequiredArg().describedAs("metadata-states-dir").ofType(String.class);
        ArgumentAcceptingOptionSpec bootStrapServerSpec = parser.accepts("bootstrap-server", "The broker server and port string in form host:port").withRequiredArg().describedAs("The broker server and port string in form host:port").ofType(String.class).defaultsTo((Object)"localhost:9092", (Object[])new String[0]);
        ArgumentAcceptingOptionSpec tierSytateTopicPartitionSpec = parser.accepts("tier-state-topic-partition", "tier topic partition. If selected all the processing will be limited from this partition.").withRequiredArg().describedAs("tier topic partition. If selected all the processing will be limited from this partition.").ofType(Integer.class).defaultsTo((Object)-1, (Object[])new Integer[0]);
        ArgumentAcceptingOptionSpec dumpEventsSpec = parser.accepts("dump-events", "dumps the tier state topic's events on std out. By default its turned to be true. If chosen false it will still dump everty 1000th events for trackinglong runs.").withRequiredArg().describedAs("dumps the tier state topic's events on std out. By default its turned to be true. If chosen false it will still dump everty 1000th events for trackinglong runs.").ofType(Boolean.class).defaultsTo((Object)true, (Object[])new Boolean[0]);
        ArgumentAcceptingOptionSpec snapshotStatesSpec = parser.accepts("snapshot-states-files", "If not set then it's expected that the state files are already saved in the snapshot folder.").withRequiredArg().describedAs("If not set then it's expected that the state files are already saved in the snapshot folder.").ofType(Boolean.class).defaultsTo((Object)true, (Object[])new Boolean[0]);
        TierCloudBackendUtils.augmentParserWithValidatorOpts(parser);
        TierCloudBackendUtils.augmentParserWithTierBackendOpts(parser);
        OptionSet options = parser.parse(args);
        if (options.hasArgument((OptionSpec)metaStatesDirSpec) ^ (Boolean)options.valueOf((OptionSpec)snapshotStatesSpec)) {
            throw new IllegalStateException("Only one of metadata-states-dir or snapshot-states-files should be specified.");
        }
        if (options.hasArgument((OptionSpec)metaStatesDirSpec)) {
            this.props.put("metadata-states-dir", options.valueOf((OptionSpec)metaStatesDirSpec));
        }
        this.props.put("working-dir", options.valueOf((OptionSpec)workingDirSpec));
        this.props.put("dump-metadata", "true");
        this.props.put("bootstrap-server", options.valueOf((OptionSpec)bootStrapServerSpec));
        this.props.put("tier-state-topic-partition", options.valueOf((OptionSpec)tierSytateTopicPartitionSpec));
        this.props.put("dump-events", options.valueOf((OptionSpec)dumpEventsSpec));
        this.props.put("snapshot-states-files", options.valueOf((OptionSpec)snapshotStatesSpec));
        TierCloudBackendUtils.addValidatorProps(options, this.props);
        TierCloudBackendUtils.addTierBackendProps(options, this.props);
        log.info("Starting Validation with following args: {}", (Object)this.props);
    }

    private void createWorkDir(String dir) {
        File file = new File(dir);
        File snapshotDir = new File(TierMetadataValidator.getSnapshotDir(dir));
        if (this.props.get("snapshot-states-files").equals(false)) {
            if (!file.exists() || !snapshotDir.exists()) {
                throw new IllegalStateException(dir + " and " + snapshotDir.toPath() + " should exist.");
            }
            return;
        }
        if (!file.exists()) {
            file.mkdirs();
        }
        if (!file.isDirectory() || file.listFiles().length != 0) {
            throw new IllegalStateException("materialization-path needs to be directory and should be empty");
        }
        if (!snapshotDir.exists()) {
            snapshotDir.mkdir();
        }
        if (!snapshotDir.isDirectory() || snapshotDir.listFiles().length != 0) {
            throw new IllegalStateException("snapshot path " + snapshotDir.getAbsolutePath() + " needs to be directory and should be empty");
        }
    }

    static String getSnapshotDir(String dir) {
        return Paths.get(dir, SNAPSHOT_DIR_SUFFIX).toString();
    }

    static Path getSnapshotFilePath(TopicPartition id, String baseDir) {
        return Paths.get(baseDir, id.topic() + "-" + id.partition());
    }

    public void run() throws IOException {
        this.createWorkDir(this.workDir);
        log.info("**** Fetching target partition states from folder.");
        boolean skipPopulate = this.props.get("snapshot-states-files").equals(false);
        File tierStateFolder = skipPopulate ? new File(TierMetadataValidator.getSnapshotDir(this.workDir)) : new File(this.props.getProperty("metadata-states-dir"));
        this.stateMap = TierMetadataValidator.snapshotStateFiles(tierStateFolder, !skipPopulate, this.workDir);
        HashMap<TopicIdPartition, Long> offsetMap = new HashMap<TopicIdPartition, Long>();
        for (Map.Entry<TopicIdPartition, TierMetadataValidatorRecord> entry : this.stateMap.entrySet()) {
            TopicIdPartition id = entry.getKey();
            offsetMap.put(id, entry.getValue().maxOffset);
        }
        log.info("**** Calling materialization for following partition to offset mapping: {}", offsetMap);
        if (offsetMap.size() != 0 && !this.props.get("tier-state-topic-partition").equals(-1)) {
            int endOffset = ((Long)Collections.max(offsetMap.values())).intValue();
            this.props.put("end-offset", (Object)endOffset);
            log.info("Setting end-offset to {}", (Object)endOffset);
        }
        this.utils = new TierTopicMaterializationUtils(new TierTopicMaterializationToolConfig(this.props), TierRecoveryConfig.toConsumerProps(this.props), offsetMap, this.scheduler);
        this.utils.run();
        log.info("**** Calling validator.");
        for (TopicIdPartition topicIdPartition : this.utils.stateMap.keySet()) {
            try {
                Path aFile;
                Path eFile = this.utils.getTierStateFile(topicIdPartition);
                if (!this.validateStates(eFile, aFile = this.stateMap.get((Object)topicIdPartition).snapshot, topicIdPartition.topicPartition(), this.utils.getStartOffset(topicIdPartition.topicPartition()))) continue;
                log.info("Metadata states is consistent {} Vs {}", (Object)eFile, (Object)aFile);
            }
            catch (Exception ex) {
                log.info("Ignoring comparison for non local.", (Throwable)ex);
            }
        }
    }

    static Map<TopicIdPartition, TierMetadataValidatorRecord> snapshotStateFiles(File tierStateFolder, boolean populate, String workDir) throws IOException {
        if (!tierStateFolder.isDirectory()) {
            throw new IllegalStateException(tierStateFolder + " is not metadata states directory");
        }
        HashMap<TopicIdPartition, TierMetadataValidatorRecord> stateMap = new HashMap<TopicIdPartition, TierMetadataValidatorRecord>();
        for (File dir : tierStateFolder.listFiles()) {
            File snapshotFile;
            TopicPartition topicPartition;
            if (!dir.isDirectory()) continue;
            try {
                topicPartition = Log.parseTopicPartitionName(dir);
            }
            catch (KafkaException ex) {
                continue;
            }
            File file = snapshotFile = populate ? TierMetadataValidator.getSnapshotFilePath(topicPartition, TierMetadataValidator.getSnapshotDir(workDir)).toFile() : TierMetadataValidator.getSnapshotFilePath(topicPartition, tierStateFolder.getAbsolutePath()).toFile();
            if (populate) {
                if (!snapshotFile.exists()) {
                    snapshotFile.mkdir();
                }
                log.info("Found TierTopicPartition dir {}", (Object)dir.toPath());
            }
            for (File file2 : dir.listFiles()) {
                if (!file2.isFile() || !Log.isTierStateFile(file2)) continue;
                Path ss = Paths.get(snapshotFile.toString(), file2.getName());
                if (populate) {
                    log.info("Taking snapshot of partition states for {}", (Object)topicPartition);
                    Files.copy(file2.toPath(), ss, new CopyOption[0]);
                    log.info("Copied state files: {}", (Object)ss);
                }
                TierMetadataValidatorRecord record = new TierMetadataValidatorRecord(ss, topicPartition);
                stateMap.put(record.id, record);
            }
        }
        if (stateMap.isEmpty()) {
            throw new IllegalStateException("Can not find any metadata states file in " + tierStateFolder);
        }
        return stateMap;
    }

    private static boolean mayBeActiveObject(long startOffset, TierObjectMetadata metadata) {
        ImmutableList inActiveStateList = ImmutableList.of((Object)((Object)TierObjectMetadata.State.SEGMENT_FENCED), (Object)((Object)TierObjectMetadata.State.SEGMENT_DELETE_COMPLETE), (Object)((Object)TierObjectMetadata.State.SEGMENT_DELETE_INITIATE), (Object)((Object)TierObjectMetadata.State.SEGMENT_UPLOAD_INITIATE));
        ImmutableList nonCommittedList = ImmutableList.of((Object)((Object)TierObjectMetadata.State.SEGMENT_FENCED));
        if (startOffset != -2L) {
            return startOffset <= metadata.endOffset() && !nonCommittedList.contains((Object)metadata.state());
        }
        return !inActiveStateList.contains((Object)metadata.state());
    }

    public boolean validateStates(Path expected, Path actual, TopicPartition id, long startOffset) throws IOException {
        block3: {
            block2: {
                FileTierPartitionIterator eiterator = TierMetadataValidator.getTierPartitionIterator(id, expected);
                FileTierPartitionIterator aiterator = TierMetadataValidator.getTierPartitionIterator(id, actual);
                Optional<TierObjectStore> objectStoreOpt = Optional.ofNullable(this.objectStore);
                if (!this.comparesStates(actual, expected)) break block2;
                if (TierMetadataValidator.isValidStates((Iterator<TierObjectMetadata>)((Object)eiterator), (Iterator<TierObjectMetadata>)((Object)aiterator), startOffset, objectStoreOpt, this.verifyOffsetScanAgainstObjectStore, this.cancellationContext, this.utils::getStartOffset)) break block3;
            }
            log.info("Metadata inconsistencies({}): {} Vs {}", new Object[]{id, actual, expected});
            return false;
        }
        return true;
    }

    static TierMetadataValidatorResult validateStandaloneTierStateFile(Path tierStateFile, TopicIdPartition id, Optional<TierObjectStore> objStoreOpt, boolean verifyOffsetScan, CancellationContext cancellationContext, Function<TopicPartition, Long> startOffsetProducer) throws IOException {
        long currentStartOffset = startOffsetProducer.apply(id.topicPartition());
        Path copiedTierStateFile = Paths.get(tierStateFile.toUri());
        FileTierPartitionIterator expIterator = TierMetadataValidator.getTierPartitionIterator(id.topicPartition(), tierStateFile);
        FileTierPartitionIterator actIterator = TierMetadataValidator.getTierPartitionIterator(id.topicPartition(), copiedTierStateFile);
        boolean isValid = TierMetadataValidator.isValidStates((Iterator<TierObjectMetadata>)((Object)expIterator), (Iterator<TierObjectMetadata>)((Object)actIterator), currentStartOffset, objStoreOpt, verifyOffsetScan, cancellationContext, startOffsetProducer);
        Optional<Header> headerOpt = FileTierPartitionState.readHeader(FileChannel.open(tierStateFile, StandardOpenOption.READ));
        if (isValid && headerOpt.isPresent()) {
            log.info("Metadata state is consistent for file: {}", (Object)tierStateFile);
        } else {
            log.info("Metadata state is inconsistent for file: {}", (Object)tierStateFile);
        }
        return new TierMetadataValidatorResult(isValid, headerOpt);
    }

    static FileTierPartitionIterator getTierPartitionIterator(TopicPartition id, Path tierStateFile) throws IOException {
        FileChannel channel = FileChannel.open(tierStateFile, StandardOpenOption.READ);
        return FileTierPartitionState.iterator(id, channel).orElseGet(() -> {
            throw new IllegalStateException("Couldn't create tierPartitionIterator for: " + id + " from file: " + tierStateFile);
        });
    }

    public boolean comparesStates(Path actual, Path expected) throws IOException {
        byte[] f2;
        Header aheader;
        FileChannel echannel = FileChannel.open(actual, StandardOpenOption.READ);
        FileChannel achannel = FileChannel.open(expected, StandardOpenOption.READ);
        Header eheader = FileTierPartitionState.readHeader(echannel).get();
        if (!eheader.equals(aheader = FileTierPartitionState.readHeader(achannel).get())) {
            log.error("Metadata states(header) inconsistency {} Vs {}", (Object)eheader, (Object)aheader);
            return false;
        }
        byte[] f1 = Files.readAllBytes(expected);
        if (!Arrays.equals(f1, f2 = Files.readAllBytes(actual))) {
            log.info("Metadata inconsistency(files do not match).");
            return false;
        }
        return true;
    }

    private static boolean objectExistsOnTierStore(TierObjectMetadata tierMetadata, TierObjectStore objStore, boolean offsetScan, CancellationContext cancellationContext) throws InterruptedException {
        return TierMetadataValidator.checkObjectStoreExistenceWithRetries(tierMetadata, objStore, offsetScan, 0, cancellationContext, null);
    }

    private static boolean checkObjectStoreExistenceWithRetries(TierObjectMetadata tierMetadata, TierObjectStore objStore, boolean offsetScan, int retryCount, CancellationContext cancellationContext, TierObjectStoreRetriableException lastSeenException) throws InterruptedException {
        if (retryCount >= 3) {
            log.error("checkObjectStoreExistenceWithRetries reached maximum retries #{} for object: {}", (Object)retryCount, (Object)tierMetadata);
            throw lastSeenException;
        }
        TierObjectStore.ObjectMetadata objMetadata = new TierObjectStore.ObjectMetadata(tierMetadata);
        try {
            TierObjectStoreResponse objStoreResponse = offsetScan ? objStore.getObject(objMetadata, TierObjectStore.FileType.SEGMENT) : objStore.getObject(objMetadata, TierObjectStore.FileType.SEGMENT, 0, 0);
            return TierMetadataValidator.handleObjectStoreResponse(objStoreResponse, tierMetadata, offsetScan, objStore.getBackend(), cancellationContext);
        }
        catch (TierObjectStoreRetriableException e) {
            log.debug("Received Transient error from ObjectStore; will retry!", (Throwable)((Object)e));
            long sleepDuration = 1000L * (long)(1 + retryCount);
            log.debug("ObjectStore retryCount#{}. Going to sleep for {}ms", (Object)retryCount, (Object)sleepDuration);
            Thread.sleep(sleepDuration);
            return TierMetadataValidator.checkObjectStoreExistenceWithRetries(tierMetadata, objStore, offsetScan, retryCount + 1, cancellationContext, e);
        }
        catch (Exception e) {
            log.error("ObjectStore:{}, actualObj: {} raised fatal error!", new Object[]{objStore, tierMetadata, e});
            return false;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static boolean handleObjectStoreResponse(TierObjectStoreResponse response, TierObjectMetadata object, boolean offsetScan, TierObjectStore.Backend backend, CancellationContext cancellationContext) {
        InputStream inputStream = response.getInputStream();
        try {
            if (offsetScan) {
                if (backend != TierObjectStore.Backend.S3) {
                    throw new UnsupportedOperationException("[OFFSET_SCAN] Unsupported Backend for offset scan: " + (Object)((Object)backend));
                }
                log.debug("[OFFSET_SCAN] Beginning to perform offset scan for object: " + object);
                TierSegmentReader reader = new TierSegmentReader(OFFSET_SCAN_PREFIX);
                long cumulativeVerifiedSize = 0L;
                long lastRetrievedOffset = -2L;
                while (!cancellationContext.isCancelled() && lastRetrievedOffset < object.endOffset()) {
                    RecordBatch batch = reader.readBatch(inputStream, object.size());
                    if (lastRetrievedOffset == -2L && batch.baseOffset() != object.baseOffset()) {
                        log.error("[OFFSET_SCAN] Offset mismatch between first batch offset: " + batch.baseOffset() + " and metadata base offset: " + object.baseOffset() + " for object: " + object);
                        boolean bl = false;
                        return bl;
                    }
                    if (lastRetrievedOffset != -2L && batch.baseOffset() - lastRetrievedOffset != 1L) {
                        log.error("Metadata inconsistency between S3 record batches: Received batch.baseOffset(): " + batch.baseOffset() + " after lastRetrievedOffset: " + lastRetrievedOffset + " for object: " + object);
                    }
                    lastRetrievedOffset = batch.lastOffset();
                    cumulativeVerifiedSize += (long)batch.sizeInBytes();
                }
                if (cancellationContext.isCancelled()) {
                    log.debug("[OFFSET_SCAN] Cancelled after verifying till: " + lastRetrievedOffset + " for object: " + object);
                }
                if (lastRetrievedOffset != object.endOffset()) {
                    log.error("[OFFSET_SCAN] Metadata inconsistency, couldn't verify till end of segment: " + lastRetrievedOffset + " vs " + object.endOffset() + " for object: " + object);
                    boolean bl = false;
                    return bl;
                }
                if (cumulativeVerifiedSize != (long)object.size()) {
                    log.error("[OFFSET_SCAN] Metadata inconsistency, couldn't verify the entire bytes in the segment. ByteCount:" + cumulativeVerifiedSize + " vs " + object.size() + " for object: " + object);
                    boolean bl = false;
                    return bl;
                }
            } else if (inputStream.read() < 0) {
                log.error("Received empty response for object: " + object);
                boolean reader = false;
                return reader;
            }
            log.debug("[OFFSET_SCAN] Successfully validated from object: " + object);
            boolean reader = true;
            return reader;
        }
        catch (Exception e) {
            log.error("[OFFSET_SCAN] Encountered error while handling response for object: " + object + " with exception", (Throwable)e);
            boolean bl = false;
            return bl;
        }
        finally {
            try {
                inputStream.close();
            }
            catch (IOException ioException) {
                log.debug("Received error while closing the S3 inputStream", (Throwable)ioException);
            }
        }
    }

    public static OffsetValidationResult verifyObjectInBackend(TierObjectMetadata objectMetadata, long firstValidOffset, TierObjectStore objStore, boolean offsetScan, CancellationContext cancellationContext, Function<TopicPartition, Long> startOffsetProducer) {
        boolean objectPresentInTierStore = false;
        Object exception = null;
        try {
            objectPresentInTierStore = TierMetadataValidator.objectExistsOnTierStore(objectMetadata, objStore, offsetScan, cancellationContext);
        }
        catch (InterruptedException | TierObjectStoreRetriableException tierStoreEx) {
            exception = tierStoreEx;
        }
        OffsetValidationResult result = new OffsetValidationResult(true, firstValidOffset, (Exception)exception);
        if (!objectPresentInTierStore) {
            long updatedFirstValidOffset = startOffsetProducer.apply(objectMetadata.topicIdPartition().topicPartition());
            boolean activeSegment = true;
            if (updatedFirstValidOffset > firstValidOffset) {
                log.debug("Updated firstValidOffset from: {} to: {}", (Object)firstValidOffset, (Object)updatedFirstValidOffset);
                firstValidOffset = updatedFirstValidOffset;
                activeSegment = TierMetadataValidator.mayBeActiveObject(firstValidOffset, objectMetadata);
                result.firstValidOffset = firstValidOffset;
            }
            if (activeSegment && updatedFirstValidOffset != -2L) {
                log.error("ObjectStore inconsistency. Object: " + objectMetadata + " not found in objectStore: " + objStore);
                result.result = false;
            } else {
                log.debug("Ignoring inactive Object at offset: " + objectMetadata.baseOffset());
            }
        }
        return result;
    }

    public static boolean isValidStates(Iterator<TierObjectMetadata> eIterator, Iterator<TierObjectMetadata> aIterator, long firstValidOffset, Optional<TierObjectStore> objectStoreOpt, boolean verifyOffsetScan, CancellationContext cancellationContext, Function<TopicPartition, Long> startOffsetProducer) {
        long prevEndOffset = -1L;
        while (eIterator.hasNext()) {
            if (!aIterator.hasNext()) {
                log.error("Metadata inconsistency(more states) for #expected > #actual");
                return false;
            }
            TierObjectMetadata expectedObject = eIterator.next();
            TierObjectMetadata actualObject = aIterator.next();
            boolean active = TierMetadataValidator.mayBeActiveObject(firstValidOffset, actualObject);
            if (actualObject.equals(expectedObject)) {
                long start = Math.max(expectedObject.baseOffset(), prevEndOffset + 1L);
                if (actualObject.state().equals((Object)TierObjectMetadata.State.SEGMENT_FENCED)) continue;
                if (start - prevEndOffset != 1L || actualObject.endOffset() <= prevEndOffset) {
                    if (active) {
                        log.error("Metadata offset inconsistency: ({}:{}) for object: {} ", new Object[]{actualObject.baseOffset(), actualObject.endOffset(), actualObject});
                        return false;
                    }
                } else if (objectStoreOpt.isPresent() && active && actualObject.state() == TierObjectMetadata.State.SEGMENT_UPLOAD_COMPLETE) {
                    OffsetValidationResult result = TierMetadataValidator.verifyObjectInBackend(actualObject, firstValidOffset, objectStoreOpt.get(), verifyOffsetScan, cancellationContext, startOffsetProducer);
                    if (result.firstValidOffset > firstValidOffset) {
                        firstValidOffset = result.firstValidOffset;
                    }
                    if (!result.result) {
                        return false;
                    }
                }
            } else {
                log.error("Metadata states inconsistency at " + actualObject);
                return false;
            }
            prevEndOffset = expectedObject.endOffset();
        }
        if (aIterator.hasNext()) {
            log.error("Metadata inconsistency(more states) for #expected < #actual");
            return false;
        }
        return true;
    }

    @Override
    public void close() {
        if (this.validateAgainstObjectStore && this.backend != null) {
            this.cancellationContext.cancel();
            TierObjectStoreFactory.closeBackendInstance(this.backend);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static void main(String[] args) {
        if (args.length < 1) {
            log.error("At least metadata-states-dir needs to be set.");
            System.exit(1);
        }
        KafkaScheduler scheduler = new KafkaScheduler(1, "tier-metadata-validator-scheduler-", true);
        scheduler.startup();
        try (TierMetadataValidator validator = new TierMetadataValidator(args, scheduler);){
            validator.run();
        }
        catch (Exception ae) {
            log.error("TierMetadataValidator::main received error!", (Throwable)ae);
            ae.printStackTrace();
            System.exit(1);
        }
        finally {
            scheduler.shutdown();
        }
    }

    public static class OffsetValidationResult {
        public boolean result;
        public long firstValidOffset;
        public final Exception exception;

        OffsetValidationResult(boolean result, long firstValidOffset, Exception exception) {
            this.result = result;
            this.firstValidOffset = firstValidOffset;
            this.exception = exception;
        }
    }

    static class TierMetadataValidatorRecord {
        public Path snapshot;
        public TopicIdPartition id;
        public long maxOffset;

        public TierMetadataValidatorRecord(Path stateFile, TopicPartition topicPartition) throws IOException {
            FileChannel fileChannel = FileChannel.open(stateFile, StandardOpenOption.READ);
            Optional<Header> headerOpt = FileTierPartitionState.readHeader(fileChannel);
            if (!headerOpt.isPresent()) {
                return;
            }
            Header header = headerOpt.get();
            this.snapshot = stateFile;
            this.id = new TopicIdPartition(topicPartition.topic(), header.topicId(), topicPartition.partition());
            this.maxOffset = header.localMaterializedOffsetAndEpoch().offset();
        }

        public String toString() {
            return "TierMetadataValidatorRecord{snapshot=" + this.snapshot + ", id=" + this.id + ", maxOffset=" + this.maxOffset + '}';
        }
    }

    static class TierMetadataValidatorResult {
        final boolean valid;
        final Optional<Header> headerOpt;

        public TierMetadataValidatorResult(boolean valid, Optional<Header> headerOpt) {
            this.valid = valid;
            this.headerOpt = headerOpt;
        }
    }
}

