/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pinot.server.starter.helix;

import java.util.Set;
import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager;
import org.apache.pinot.server.starter.helix.IngestionBasedConsumptionStatusChecker;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;

public class OffsetBasedConsumptionStatusChecker
extends IngestionBasedConsumptionStatusChecker {
    public OffsetBasedConsumptionStatusChecker(InstanceDataManager instanceDataManager, Set<String> consumingSegments) {
        super(instanceDataManager, consumingSegments);
    }

    @Override
    protected boolean isSegmentCaughtUp(String segmentName, LLRealtimeSegmentDataManager rtSegmentDataManager) {
        StreamPartitionMsgOffset latestIngestedOffset = rtSegmentDataManager.getCurrentOffset();
        StreamPartitionMsgOffset latestStreamOffset = rtSegmentDataManager.getLatestStreamOffsetAtStartupTime();
        if (latestStreamOffset == null || latestIngestedOffset == null) {
            this._logger.info("Null offset found for segment {} - latest stream offset: {}, latest ingested offset: {}. Will check consumption status later", new Object[]{segmentName, latestStreamOffset, latestIngestedOffset});
            return false;
        }
        if (latestIngestedOffset.compareTo((Object)latestStreamOffset) < 0) {
            this._logger.info("Latest ingested offset {} in segment {} is smaller than stream latest available offset {} ", new Object[]{latestIngestedOffset, segmentName, latestStreamOffset});
            return false;
        }
        this._logger.info("Segment {} with latest ingested offset {} has caught up to the latest stream offset {}", new Object[]{segmentName, latestIngestedOffset, latestStreamOffset});
        return true;
    }
}

