package org.apache.flink.streaming.api.operators.sorted.state;

import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
import org.apache.flink.runtime.asyncprocessing.RecordContext;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.TimerHeapInternalTimer;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.util.function.ThrowingRunnable;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionInternalTimeServiceWithAsyncState.class */
public class BatchExecutionInternalTimeServiceWithAsyncState<K, N> extends BatchExecutionInternalTimeService<K, N> {
    private AsyncExecutionController<K> asyncExecutionController;

    /* JADX INFO: Access modifiers changed from: package-private */
    public BatchExecutionInternalTimeServiceWithAsyncState(ProcessingTimeService processingTimeService, Triggerable<K, N> triggerable) {
        super(processingTimeService, triggerable);
    }

    public void setup(AsyncExecutionController<K> asyncExecutionController) {
        if (asyncExecutionController != null) {
            this.asyncExecutionController = asyncExecutionController;
        }
    }

    @Override // org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionInternalTimeService
    public void setCurrentKey(K k) throws Exception {
        if (k != null && k.equals(this.currentKey)) {
            return;
        }
        this.currentWatermark = Long.MAX_VALUE;
        while (true) {
            TimerHeapInternalTimer<K, N> poll = this.eventTimeTimersQueue.poll();
            if (poll == null) {
                break;
            } else {
                maintainContextAndProcess(poll, () -> {
                    this.triggerTarget.onEventTime(poll);
                });
            }
        }
        while (true) {
            TimerHeapInternalTimer<K, N> poll2 = this.processingTimeTimersQueue.poll();
            if (poll2 == null) {
                this.currentWatermark = Long.MIN_VALUE;
                this.currentKey = k;
                return;
            }
            maintainContextAndProcess(poll2, () -> {
                this.triggerTarget.onProcessingTime(poll2);
            });
        }
    }

    private void maintainContextAndProcess(InternalTimer<K, N> internalTimer, ThrowingRunnable<Exception> throwingRunnable) {
        RecordContext<K> currentContext = this.asyncExecutionController.getCurrentContext();
        RecordContext<K> buildContext = this.asyncExecutionController.buildContext(internalTimer, internalTimer.getKey());
        buildContext.retain();
        this.asyncExecutionController.setCurrentContext(buildContext);
        this.asyncExecutionController.syncPointRequestWithCallback(throwingRunnable, true);
        buildContext.release();
        this.asyncExecutionController.setCurrentContext(currentContext);
    }
}
