package org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers;

import java.time.Duration;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.v2.ReducingState;
import org.apache.flink.api.common.state.v2.ReducingStateDescriptor;
import org.apache.flink.api.common.state.v2.StateFuture;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.core.state.StateFutureUtils;
import org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncTrigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.Window;

@Internal
/* loaded from: input_file:org/apache/flink/runtime/asyncprocessing/operators/windowing/triggers/AsyncContinuousEventTimeTrigger.class */
public class AsyncContinuousEventTimeTrigger<W extends Window> extends AsyncTrigger<Object, W> {
    private static final long serialVersionUID = 1;
    private final long interval;
    private final ReducingStateDescriptor<Long> stateDesc = new ReducingStateDescriptor<>("fire-time", new Min(), LongSerializer.INSTANCE);

    /* loaded from: input_file:org/apache/flink/runtime/asyncprocessing/operators/windowing/triggers/AsyncContinuousEventTimeTrigger$Min.class */
    private static class Min implements ReduceFunction<Long> {
        private static final long serialVersionUID = 1;

        private Min() {
        }

        @Override // org.apache.flink.api.common.functions.ReduceFunction
        public Long reduce(Long l, Long l2) throws Exception {
            return Long.valueOf(Math.min(l.longValue(), l2.longValue()));
        }
    }

    private AsyncContinuousEventTimeTrigger(long j) {
        this.interval = j;
    }

    @Override // org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncTrigger
    public StateFuture<TriggerResult> onElement(Object obj, long j, W w, AsyncTrigger.TriggerContext triggerContext) throws Exception {
        if (w.maxTimestamp() <= triggerContext.getCurrentWatermark()) {
            return StateFutureUtils.completedFuture(TriggerResult.FIRE);
        }
        triggerContext.registerEventTimeTimer(w.maxTimestamp());
        ReducingState reducingState = (ReducingState) triggerContext.getPartitionedState(this.stateDesc);
        return reducingState.asyncGet().thenCompose(l -> {
            if (l == null) {
                registerNextFireTimestamp(j - (j % this.interval), w, triggerContext, reducingState);
            }
            return StateFutureUtils.completedFuture(TriggerResult.CONTINUE);
        });
    }

    @Override // org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncTrigger
    public StateFuture<TriggerResult> onEventTime(long j, W w, AsyncTrigger.TriggerContext triggerContext) throws Exception {
        if (j == w.maxTimestamp()) {
            return StateFutureUtils.completedFuture(TriggerResult.FIRE);
        }
        ReducingState reducingState = (ReducingState) triggerContext.getPartitionedState(this.stateDesc);
        return reducingState.asyncGet().thenCompose(l -> {
            return (l == null || l.longValue() != j) ? StateFutureUtils.completedFuture(TriggerResult.CONTINUE) : reducingState.asyncClear().thenCompose(r13 -> {
                return registerNextFireTimestamp(j, w, triggerContext, reducingState);
            }).thenApply(r2 -> {
                return TriggerResult.FIRE;
            });
        });
    }

    @Override // org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncTrigger
    public StateFuture<TriggerResult> onProcessingTime(long j, W w, AsyncTrigger.TriggerContext triggerContext) throws Exception {
        return StateFutureUtils.completedFuture(TriggerResult.CONTINUE);
    }

    @Override // org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncTrigger
    public StateFuture<Void> clear(W w, AsyncTrigger.TriggerContext triggerContext) throws Exception {
        ReducingState reducingState = (ReducingState) triggerContext.getPartitionedState(this.stateDesc);
        return reducingState.asyncGet().thenCompose(l -> {
            if (l == null) {
                return StateFutureUtils.completedVoidFuture();
            }
            triggerContext.deleteEventTimeTimer(l.longValue());
            return reducingState.asyncClear();
        });
    }

    @Override // org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncTrigger
    public boolean canMerge() {
        return true;
    }

    @Override // org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncTrigger
    public void onMerge(W w, AsyncTrigger.OnMergeContext onMergeContext) throws Exception {
        throw new RuntimeException("Merge window not support");
    }

    public String toString() {
        return "ContinuousEventTimeTrigger(" + this.interval + ")";
    }

    @VisibleForTesting
    public long getInterval() {
        return this.interval;
    }

    public static <W extends Window> AsyncContinuousEventTimeTrigger<W> of(Duration duration) {
        return new AsyncContinuousEventTimeTrigger<>(duration.toMillis());
    }

    private StateFuture<Void> registerNextFireTimestamp(long j, W w, AsyncTrigger.TriggerContext triggerContext, ReducingState<Long> reducingState) throws Exception {
        long min = Math.min(j + this.interval, w.maxTimestamp());
        return reducingState.asyncAdd(Long.valueOf(min)).thenAccept(r7 -> {
            triggerContext.registerEventTimeTimer(min);
        });
    }
}
