/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pinot.server.starter.helix;

import java.util.Set;
import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager;
import org.apache.pinot.server.starter.helix.IngestionBasedConsumptionStatusChecker;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;

public class FreshnessBasedConsumptionStatusChecker
extends IngestionBasedConsumptionStatusChecker {
    private final long _minFreshnessMs;

    public FreshnessBasedConsumptionStatusChecker(InstanceDataManager instanceDataManager, Set<String> consumingSegments, long minFreshnessMs) {
        super(instanceDataManager, consumingSegments);
        this._minFreshnessMs = minFreshnessMs;
    }

    private boolean isOffsetCaughtUp(StreamPartitionMsgOffset currentOffset, StreamPartitionMsgOffset latestOffset) {
        if (currentOffset != null && latestOffset != null) {
            return currentOffset.compareTo((Object)latestOffset) >= 0;
        }
        return false;
    }

    protected long now() {
        return System.currentTimeMillis();
    }

    @Override
    protected boolean isSegmentCaughtUp(String segmentName, LLRealtimeSegmentDataManager rtSegmentDataManager) {
        StreamPartitionMsgOffset latestStreamOffset;
        long now = this.now();
        long latestIngestionTimestamp = rtSegmentDataManager.getSegment().getSegmentMetadata().getLatestIngestionTimestamp();
        long freshnessMs = now - latestIngestionTimestamp;
        if (latestIngestionTimestamp >= 0L && freshnessMs <= this._minFreshnessMs) {
            this._logger.info("Segment {} with freshness {}ms has caught up within min freshness {}", new Object[]{segmentName, freshnessMs, this._minFreshnessMs});
            return true;
        }
        StreamPartitionMsgOffset currentOffset = rtSegmentDataManager.getCurrentOffset();
        if (this.isOffsetCaughtUp(currentOffset, latestStreamOffset = rtSegmentDataManager.fetchLatestStreamOffset(5000L))) {
            this._logger.info("Segment {} with freshness {}ms has not caught up within min freshness {}.But the current ingested offset is equal to the latest available offset {}.", new Object[]{segmentName, freshnessMs, this._minFreshnessMs, currentOffset});
            return true;
        }
        this._logger.info("Segment {} with freshness {}ms has not caught up within min freshness {}. At offset {}. Latest offset {}.", new Object[]{segmentName, freshnessMs, this._minFreshnessMs, currentOffset, latestStreamOffset});
        return false;
    }
}

