package org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers;

import org.apache.flink.annotation.Experimental;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.v2.ReducingState;
import org.apache.flink.api.common.state.v2.ReducingStateDescriptor;
import org.apache.flink.api.common.state.v2.StateFuture;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.core.state.StateFutureUtils;
import org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncTrigger;
import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerMetricsInfo;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.Window;

@Experimental
/* loaded from: input_file:org/apache/flink/runtime/asyncprocessing/operators/windowing/triggers/AsyncCountTrigger.class */
public class AsyncCountTrigger<W extends Window> extends AsyncTrigger<Object, W> {
    private static final long serialVersionUID = 1;
    private final long maxCount;
    private final ReducingStateDescriptor<Long> stateDesc = new ReducingStateDescriptor<>(TaskManagerMetricsInfo.GarbageCollectorInfo.FIELD_NAME_COUNT, new Sum(), LongSerializer.INSTANCE);

    /* loaded from: input_file:org/apache/flink/runtime/asyncprocessing/operators/windowing/triggers/AsyncCountTrigger$Sum.class */
    private static class Sum implements ReduceFunction<Long> {
        private static final long serialVersionUID = 1;

        private Sum() {
        }

        public Long reduce(Long l, Long l2) throws Exception {
            return Long.valueOf(l.longValue() + l2.longValue());
        }
    }

    private AsyncCountTrigger(long j) {
        this.maxCount = j;
    }

    @Override // org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncTrigger
    public StateFuture<TriggerResult> onElement(Object obj, long j, W w, AsyncTrigger.TriggerContext triggerContext) throws Exception {
        ReducingState partitionedState = triggerContext.getPartitionedState(this.stateDesc);
        return partitionedState.asyncAdd(1L).thenCompose(r3 -> {
            return partitionedState.asyncGet();
        }).thenCompose(l -> {
            return l.longValue() >= this.maxCount ? partitionedState.asyncClear().thenCompose(r2 -> {
                return StateFutureUtils.completedFuture(TriggerResult.FIRE);
            }) : StateFutureUtils.completedFuture(TriggerResult.CONTINUE);
        });
    }

    @Override // org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncTrigger
    public StateFuture<TriggerResult> onEventTime(long j, W w, AsyncTrigger.TriggerContext triggerContext) {
        return StateFutureUtils.completedFuture(TriggerResult.CONTINUE);
    }

    @Override // org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncTrigger
    public StateFuture<TriggerResult> onProcessingTime(long j, W w, AsyncTrigger.TriggerContext triggerContext) throws Exception {
        return StateFutureUtils.completedFuture(TriggerResult.CONTINUE);
    }

    @Override // org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncTrigger
    public StateFuture<Void> clear(W w, AsyncTrigger.TriggerContext triggerContext) throws Exception {
        return triggerContext.getPartitionedState(this.stateDesc).asyncClear();
    }

    @Override // org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncTrigger
    public boolean canMerge() {
        return true;
    }

    @Override // org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncTrigger
    public void onMerge(W w, AsyncTrigger.OnMergeContext onMergeContext) throws Exception {
        onMergeContext.mergePartitionedState(this.stateDesc);
    }

    public String toString() {
        return "CountTrigger(" + this.maxCount + ")";
    }

    public static <W extends Window> AsyncCountTrigger<W> of(long j) {
        return new AsyncCountTrigger<>(j);
    }
}
