/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.windowing.triggers;

import java.time.Duration;
import java.util.Objects;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.state.v2.StateFuture;
import org.apache.flink.api.common.state.v2.ValueState;
import org.apache.flink.api.common.state.v2.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.core.state.StateFutureUtils;
import org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncTrigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.Window;

@Experimental
public class AsyncProcessingTimeoutTrigger<T, W extends Window>
extends AsyncTrigger<T, W> {
    private static final long serialVersionUID = 1L;
    private final AsyncTrigger<T, W> nestedTrigger;
    private final long interval;
    private final boolean resetTimerOnNewRecord;
    private final boolean shouldClearOnTimeout;
    private final ValueStateDescriptor<Long> timeoutStateDesc;

    public AsyncProcessingTimeoutTrigger(AsyncTrigger<T, W> nestedTrigger, long interval, boolean resetTimerOnNewRecord, boolean shouldClearOnTimeout) {
        this.nestedTrigger = nestedTrigger;
        this.interval = interval;
        this.resetTimerOnNewRecord = resetTimerOnNewRecord;
        this.shouldClearOnTimeout = shouldClearOnTimeout;
        this.timeoutStateDesc = new ValueStateDescriptor("timeout", (TypeSerializer)LongSerializer.INSTANCE);
    }

    public StateFuture<TriggerResult> onElement(T element, long timestamp, W window, AsyncTrigger.TriggerContext ctx) throws Exception {
        return this.nestedTrigger.onElement(element, timestamp, window, ctx).thenConditionallyCompose(TriggerResult::isFire, triggerResult -> this.clear(window, ctx).thenApply(ignore -> triggerResult), triggerResult -> {
            ValueState timeoutState = (ValueState)ctx.getPartitionedState(this.timeoutStateDesc);
            long nextFireTimestamp = ctx.getCurrentProcessingTime() + this.interval;
            return timeoutState.asyncValue().thenConditionallyCompose(Objects::nonNull, timeoutTimestamp -> {
                if (this.resetTimerOnNewRecord) {
                    ctx.deleteProcessingTimeTimer(timeoutTimestamp.longValue());
                    return timeoutState.asyncClear().thenApply(ignore -> null);
                }
                return StateFutureUtils.completedFuture((Object)timeoutTimestamp);
            }).thenConditionallyCompose(tuple -> tuple.f1 == null, ignore -> timeoutState.asyncUpdate((Object)nextFireTimestamp).thenAccept(ignore2 -> ctx.registerProcessingTimeTimer(nextFireTimestamp))).thenApply(ignore -> triggerResult);
        }).thenApply(tuple -> (TriggerResult)tuple.f1);
    }

    public StateFuture<TriggerResult> onProcessingTime(long time, W window, AsyncTrigger.TriggerContext ctx) throws Exception {
        return this.nestedTrigger.onProcessingTime(time, window, ctx).thenCompose(triggerResult -> {
            TriggerResult finalResult = triggerResult.isPurge() ? TriggerResult.FIRE_AND_PURGE : TriggerResult.FIRE;
            return this.shouldClearOnTimeout ? this.clear(window, ctx).thenApply(ignore -> finalResult) : StateFutureUtils.completedFuture((Object)finalResult);
        });
    }

    public StateFuture<TriggerResult> onEventTime(long time, W window, AsyncTrigger.TriggerContext ctx) throws Exception {
        return this.nestedTrigger.onEventTime(time, window, ctx).thenCompose(triggerResult -> {
            TriggerResult finalResult = triggerResult.isPurge() ? TriggerResult.FIRE_AND_PURGE : TriggerResult.FIRE;
            return this.shouldClearOnTimeout ? this.clear(window, ctx).thenApply(ignore -> finalResult) : StateFutureUtils.completedFuture((Object)finalResult);
        });
    }

    public StateFuture<Void> clear(W window, AsyncTrigger.TriggerContext ctx) throws Exception {
        ValueState timeoutTimestampState = (ValueState)ctx.getPartitionedState(this.timeoutStateDesc);
        return timeoutTimestampState.asyncValue().thenConditionallyCompose(Objects::nonNull, timeoutTimestamp -> {
            ctx.deleteProcessingTimeTimer(timeoutTimestamp.longValue());
            return timeoutTimestampState.asyncClear();
        }).thenCompose(ignore -> this.nestedTrigger.clear(window, ctx));
    }

    public String toString() {
        return "AsyncTimeoutTrigger(" + this.nestedTrigger.toString() + ")";
    }

    public static <T, W extends Window> AsyncProcessingTimeoutTrigger<T, W> of(AsyncTrigger<T, W> nestedTrigger, Duration timeout) {
        return new AsyncProcessingTimeoutTrigger<T, W>(nestedTrigger, timeout.toMillis(), false, true);
    }

    public static <T, W extends Window> AsyncProcessingTimeoutTrigger<T, W> of(AsyncTrigger<T, W> nestedTrigger, Duration timeout, boolean resetTimerOnNewRecord, boolean shouldClearOnTimeout) {
        return new AsyncProcessingTimeoutTrigger<T, W>(nestedTrigger, timeout.toMillis(), resetTimerOnNewRecord, shouldClearOnTimeout);
    }

    @VisibleForTesting
    public AsyncTrigger<T, W> getNestedTrigger() {
        return this.nestedTrigger;
    }

    @VisibleForTesting
    public long getInterval() {
        return this.interval;
    }

    @VisibleForTesting
    public boolean isResetTimerOnNewRecord() {
        return this.resetTimerOnNewRecord;
    }

    @VisibleForTesting
    public boolean isShouldClearOnTimeout() {
        return this.shouldClearOnTimeout;
    }

    @VisibleForTesting
    public ValueStateDescriptor<Long> getTimeoutStateDesc() {
        return this.timeoutStateDesc;
    }
}

