package org.apache.flink.streaming.api.operators.sorted.state;

import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
import org.apache.flink.runtime.state.PriorityComparator;
import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.TimerHeapInternalTimer;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.BiConsumerWithException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionInternalTimeService.class */
public class BatchExecutionInternalTimeService<K, N> implements InternalTimerService<N> {
    private static final Logger LOG = LoggerFactory.getLogger(BatchExecutionInternalTimeService.class);
    private final ProcessingTimeService processingTimeService;
    private final Triggerable<K, N> triggerTarget;
    private K currentKey;
    private long currentWatermark = Long.MIN_VALUE;
    private final KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> processingTimeTimersQueue = new BatchExecutionInternalPriorityQueueSet(PriorityComparator.forPriorityComparableObjects(), StreamGraphGenerator.DEFAULT_LOWER_BOUND_MAX_PARALLELISM);
    private final KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> eventTimeTimersQueue = new BatchExecutionInternalPriorityQueueSet(PriorityComparator.forPriorityComparableObjects(), StreamGraphGenerator.DEFAULT_LOWER_BOUND_MAX_PARALLELISM);

    /* JADX INFO: Access modifiers changed from: package-private */
    public BatchExecutionInternalTimeService(ProcessingTimeService processingTimeService, Triggerable<K, N> triggerable) {
        this.processingTimeService = (ProcessingTimeService) Preconditions.checkNotNull(processingTimeService);
        this.triggerTarget = (Triggerable) Preconditions.checkNotNull(triggerable);
    }

    @Override // org.apache.flink.streaming.api.operators.InternalTimerService
    public long currentProcessingTime() {
        return this.processingTimeService.getCurrentProcessingTime();
    }

    @Override // org.apache.flink.streaming.api.operators.InternalTimerService
    public long currentWatermark() {
        return this.currentWatermark;
    }

    @Override // org.apache.flink.streaming.api.operators.InternalTimerService
    public void registerProcessingTimeTimer(N n, long j) {
        if (this.currentWatermark == Long.MAX_VALUE) {
            LOG.warn("Timer service is quiesced. Processing time timer for timestamp '{}' will be ignored.", Long.valueOf(j));
        } else {
            this.processingTimeTimersQueue.add(new TimerHeapInternalTimer(j, this.currentKey, n));
        }
    }

    @Override // org.apache.flink.streaming.api.operators.InternalTimerService
    public void registerEventTimeTimer(N n, long j) {
        if (this.currentWatermark == Long.MAX_VALUE) {
            LOG.warn("Timer service is quiesced. Event time timer for timestamp '{}' will be ignored.", Long.valueOf(j));
        } else {
            this.eventTimeTimersQueue.add(new TimerHeapInternalTimer(j, this.currentKey, n));
        }
    }

    @Override // org.apache.flink.streaming.api.operators.InternalTimerService
    public void deleteProcessingTimeTimer(N n, long j) {
        this.processingTimeTimersQueue.remove(new TimerHeapInternalTimer(j, this.currentKey, n));
    }

    @Override // org.apache.flink.streaming.api.operators.InternalTimerService
    public void deleteEventTimeTimer(N n, long j) {
        this.eventTimeTimersQueue.remove(new TimerHeapInternalTimer(j, this.currentKey, n));
    }

    @Override // org.apache.flink.streaming.api.operators.InternalTimerService
    public void forEachEventTimeTimer(BiConsumerWithException<N, Long, Exception> biConsumerWithException) {
        throw new UnsupportedOperationException("The BatchExecutionInternalTimeService should not be used in State Processor API.");
    }

    @Override // org.apache.flink.streaming.api.operators.InternalTimerService
    public void forEachProcessingTimeTimer(BiConsumerWithException<N, Long, Exception> biConsumerWithException) {
        throw new UnsupportedOperationException("The BatchExecutionInternalTimeService should not be used in State Processor API.");
    }

    public void setCurrentKey(K k) throws Exception {
        if (k != null && k.equals(this.currentKey)) {
            return;
        }
        this.currentWatermark = Long.MAX_VALUE;
        while (true) {
            InternalTimer<K, N> internalTimer = (InternalTimer) this.eventTimeTimersQueue.poll();
            if (internalTimer == null) {
                break;
            } else {
                this.triggerTarget.onEventTime(internalTimer);
            }
        }
        while (true) {
            InternalTimer<K, N> internalTimer2 = (InternalTimer) this.processingTimeTimersQueue.poll();
            if (internalTimer2 == null) {
                this.currentWatermark = Long.MIN_VALUE;
                this.currentKey = k;
                return;
            }
            this.triggerTarget.onProcessingTime(internalTimer2);
        }
    }
}
