/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.windowing.triggers;

import java.time.Duration;
import javax.annotation.Nonnull;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.streaming.api.windowing.triggers.AsyncProcessingTimeoutTrigger;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.operators.windowing.AsyncTriggerConverter;

@PublicEvolving
public class ProcessingTimeoutTrigger<T, W extends Window>
extends Trigger<T, W>
implements AsyncTriggerConverter {
    private static final long serialVersionUID = 1L;
    private final Trigger<T, W> nestedTrigger;
    private final long interval;
    private final boolean resetTimerOnNewRecord;
    private final boolean shouldClearOnTimeout;
    private final ValueStateDescriptor<Long> timeoutStateDesc;

    private ProcessingTimeoutTrigger(Trigger<T, W> nestedTrigger, long interval, boolean resetTimerOnNewRecord, boolean shouldClearOnTimeout) {
        this.nestedTrigger = nestedTrigger;
        this.interval = interval;
        this.resetTimerOnNewRecord = resetTimerOnNewRecord;
        this.shouldClearOnTimeout = shouldClearOnTimeout;
        this.timeoutStateDesc = new ValueStateDescriptor("timeout", (TypeSerializer)LongSerializer.INSTANCE);
    }

    public TriggerResult onElement(T element, long timestamp, W window, Trigger.TriggerContext ctx) throws Exception {
        TriggerResult triggerResult = this.nestedTrigger.onElement(element, timestamp, window, ctx);
        if (triggerResult.isFire()) {
            this.clear(window, ctx);
            return triggerResult;
        }
        ValueState timeoutState = (ValueState)ctx.getPartitionedState(this.timeoutStateDesc);
        long nextFireTimestamp = ctx.getCurrentProcessingTime() + this.interval;
        Long timeoutTimestamp = (Long)timeoutState.value();
        if (timeoutTimestamp != null && this.resetTimerOnNewRecord) {
            ctx.deleteProcessingTimeTimer(timeoutTimestamp.longValue());
            timeoutState.clear();
            timeoutTimestamp = null;
        }
        if (timeoutTimestamp == null) {
            timeoutState.update((Object)nextFireTimestamp);
            ctx.registerProcessingTimeTimer(nextFireTimestamp);
        }
        return triggerResult;
    }

    public TriggerResult onProcessingTime(long timestamp, W window, Trigger.TriggerContext ctx) throws Exception {
        TriggerResult triggerResult = this.nestedTrigger.onProcessingTime(timestamp, window, ctx);
        if (this.shouldClearOnTimeout) {
            this.clear(window, ctx);
        }
        return triggerResult.isPurge() ? TriggerResult.FIRE_AND_PURGE : TriggerResult.FIRE;
    }

    public TriggerResult onEventTime(long timestamp, W window, Trigger.TriggerContext ctx) throws Exception {
        TriggerResult triggerResult = this.nestedTrigger.onEventTime(timestamp, window, ctx);
        if (this.shouldClearOnTimeout) {
            this.clear(window, ctx);
        }
        return triggerResult.isPurge() ? TriggerResult.FIRE_AND_PURGE : TriggerResult.FIRE;
    }

    public void clear(W window, Trigger.TriggerContext ctx) throws Exception {
        ValueState timeoutTimestampState = (ValueState)ctx.getPartitionedState(this.timeoutStateDesc);
        Long timeoutTimestamp = (Long)timeoutTimestampState.value();
        if (timeoutTimestamp != null) {
            ctx.deleteProcessingTimeTimer(timeoutTimestamp.longValue());
            timeoutTimestampState.clear();
        }
        this.nestedTrigger.clear(window, ctx);
    }

    public String toString() {
        return "TimeoutTrigger(" + this.nestedTrigger.toString() + ")";
    }

    public static <T, W extends Window> ProcessingTimeoutTrigger<T, W> of(Trigger<T, W> nestedTrigger, Duration timeout) {
        return new ProcessingTimeoutTrigger<T, W>(nestedTrigger, timeout.toMillis(), false, true);
    }

    public static <T, W extends Window> ProcessingTimeoutTrigger<T, W> of(Trigger<T, W> nestedTrigger, Duration timeout, boolean resetTimerOnNewRecord, boolean shouldClearOnTimeout) {
        return new ProcessingTimeoutTrigger<T, W>(nestedTrigger, timeout.toMillis(), resetTimerOnNewRecord, shouldClearOnTimeout);
    }

    @Nonnull
    public Object convertToAsync() {
        return AsyncProcessingTimeoutTrigger.of(AsyncTriggerConverter.convertToAsync(this.nestedTrigger), Duration.ofMillis(this.interval), this.resetTimerOnNewRecord, this.shouldClearOnTimeout);
    }
}

