package org.apache.flink.datastream.impl.extension.window.operators;

import java.util.Collection;
import org.apache.flink.api.common.state.v2.AppendingState;
import org.apache.flink.api.common.state.v2.ListStateDescriptor;
import org.apache.flink.api.common.state.v2.StateDescriptor;
import org.apache.flink.api.common.state.v2.StateIterator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
import org.apache.flink.datastream.api.context.ProcessingTimeManager;
import org.apache.flink.datastream.api.extension.window.function.OneInputWindowStreamProcessFunction;
import org.apache.flink.datastream.impl.context.UnsupportedProcessingTimeManager;
import org.apache.flink.datastream.impl.extension.window.context.DefaultOneInputWindowContext;
import org.apache.flink.datastream.impl.extension.window.context.WindowTriggerContext;
import org.apache.flink.datastream.impl.extension.window.function.InternalOneInputWindowStreamProcessFunction;
import org.apache.flink.datastream.impl.extension.window.operators.MergingWindowSet;
import org.apache.flink.datastream.impl.extension.window.utils.WindowUtils;
import org.apache.flink.datastream.impl.operators.BaseKeyedProcessOperator;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.v2.internal.InternalAppendingState;
import org.apache.flink.runtime.state.v2.internal.InternalListState;
import org.apache.flink.runtime.state.v2.internal.InternalMergingState;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/datastream/impl/extension/window/operators/OneInputWindowProcessOperator.class */
public class OneInputWindowProcessOperator<K, IN, OUT, W extends Window> extends BaseKeyedProcessOperator<K, IN, OUT> implements Triggerable<K, W> {
    private static final long serialVersionUID = 1;
    private final OneInputWindowStreamProcessFunction<IN, OUT> windowProcessFunction;
    private final long allowedLateness;
    private transient InternalTimerService<W> internalTimerService;
    private final TypeSerializer<W> windowSerializer;
    private final WindowAssigner<? super IN, W> windowAssigner;
    private transient WindowAssigner.WindowAssignerContext windowAssignerContext;
    private final Trigger<? super IN, ? super W> trigger;
    private transient WindowTriggerContext<K, IN, W> triggerContext;
    private transient DefaultOneInputWindowContext<K, IN, W> windowFunctionContext;
    private final StateDescriptor<IN> windowStateDescriptor;
    private transient InternalAppendingState<K, W, IN, IN, StateIterator<IN>, Iterable<IN>> windowState;
    private transient InternalMergingState<K, W, IN, IN, StateIterator<IN>, Iterable<IN>> windowMergingState;
    private transient InternalListState<K, VoidNamespace, Tuple2<W, W>> mergingSetsState;

    public OneInputWindowProcessOperator(InternalOneInputWindowStreamProcessFunction<IN, OUT, W> internalOneInputWindowStreamProcessFunction, WindowAssigner<? super IN, W> windowAssigner, Trigger<? super IN, ? super W> trigger, TypeSerializer<W> typeSerializer, StateDescriptor<IN> stateDescriptor, long j) {
        super(internalOneInputWindowStreamProcessFunction);
        Preconditions.checkArgument(j >= 0);
        this.windowProcessFunction = internalOneInputWindowStreamProcessFunction.getWindowProcessFunction();
        this.windowAssigner = windowAssigner;
        this.trigger = trigger;
        this.windowSerializer = typeSerializer;
        this.windowStateDescriptor = stateDescriptor;
        this.allowedLateness = j;
    }

    @Override // org.apache.flink.datastream.impl.operators.BaseKeyedProcessOperator, org.apache.flink.datastream.impl.operators.ProcessOperator
    public void open() throws Exception {
        super.open();
        this.internalTimerService = getInternalTimerService("process-window-timers", this.windowSerializer, this);
        if (this.windowStateDescriptor != null) {
            this.windowState = getOrCreateKeyedState((Window) this.windowSerializer.createInstance(), this.windowSerializer, this.windowStateDescriptor);
        }
        if (this.windowAssigner instanceof MergingWindowAssigner) {
            if (this.windowState instanceof InternalMergingState) {
                this.windowMergingState = this.windowState;
            } else if (this.windowState != null) {
                throw new IllegalStateException("The window uses a merging assigner, but the window state is not mergeable.");
            }
            this.mergingSetsState = getOrCreateKeyedState(VoidNamespaceSerializer.INSTANCE.createInstance(), VoidNamespaceSerializer.INSTANCE, new ListStateDescriptor("merging-window-set", new TupleSerializer(Tuple2.class, new TypeSerializer[]{this.windowSerializer, this.windowSerializer})));
            this.mergingSetsState.setCurrentNamespace(VoidNamespace.INSTANCE);
        }
        this.triggerContext = new WindowTriggerContext<>(null, null, this, this.internalTimerService, this.trigger, this.windowSerializer);
        this.windowAssignerContext = new WindowAssigner.WindowAssignerContext() { // from class: org.apache.flink.datastream.impl.extension.window.operators.OneInputWindowProcessOperator.1
            public long getCurrentProcessingTime() {
                return OneInputWindowProcessOperator.this.internalTimerService.currentProcessingTime();
            }
        };
        this.windowFunctionContext = new DefaultOneInputWindowContext<>(null, this.windowState, this.windowProcessFunction, this, this.windowSerializer, this.windowMergingState != null);
    }

    @Override // org.apache.flink.datastream.impl.operators.ProcessOperator
    public void close() throws Exception {
        super.close();
        this.triggerContext = null;
        this.windowFunctionContext = null;
        this.windowAssignerContext = null;
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.flink.datastream.impl.operators.ProcessOperator
    public void processElement(StreamRecord<IN> streamRecord) throws Exception {
        Collection<Window> assignWindows = this.windowAssigner.assignWindows(streamRecord.getValue(), streamRecord.getTimestamp(), this.windowAssignerContext);
        boolean z = true;
        final Object currentKey = getCurrentKey();
        if (this.windowAssigner instanceof MergingWindowAssigner) {
            MergingWindowSet mergingWindowSet = getMergingWindowSet();
            for (Window window : assignWindows) {
                Window addWindow = mergingWindowSet.addWindow(window, new MergingWindowSet.MergeFunction<W>() { // from class: org.apache.flink.datastream.impl.extension.window.operators.OneInputWindowProcessOperator.2
                    /* JADX WARN: Multi-variable type inference failed */
                    public void merge(W w, Collection<W> collection, W w2, Collection<W> collection2) throws Exception {
                        if (OneInputWindowProcessOperator.this.windowAssigner.isEventTime() && w.maxTimestamp() + OneInputWindowProcessOperator.this.allowedLateness <= OneInputWindowProcessOperator.this.internalTimerService.currentWatermark()) {
                            UnsupportedOperationException unsupportedOperationException = new UnsupportedOperationException("The end timestamp of an event-time window cannot become earlier than the current watermark by merging. Current event time: " + OneInputWindowProcessOperator.this.internalTimerService.currentWatermark() + " window: " + unsupportedOperationException);
                            throw unsupportedOperationException;
                        }
                        if (!OneInputWindowProcessOperator.this.windowAssigner.isEventTime()) {
                            long currentProcessingTime = OneInputWindowProcessOperator.this.internalTimerService.currentProcessingTime();
                            if (w.maxTimestamp() <= currentProcessingTime) {
                                UnsupportedOperationException unsupportedOperationException2 = new UnsupportedOperationException("The end timestamp of a processing-time window cannot become earlier than the current processing time by merging. Current processing time: " + currentProcessingTime + " window: " + unsupportedOperationException2);
                                throw unsupportedOperationException2;
                            }
                        }
                        OneInputWindowProcessOperator.this.triggerContext.setKey(currentKey);
                        OneInputWindowProcessOperator.this.triggerContext.setWindow(w);
                        OneInputWindowProcessOperator.this.triggerContext.onMerge(collection);
                        for (W w3 : collection) {
                            OneInputWindowProcessOperator.this.triggerContext.setWindow(w3);
                            OneInputWindowProcessOperator.this.triggerContext.clear();
                            WindowUtils.deleteCleanupTimer(w3, OneInputWindowProcessOperator.this.windowAssigner, OneInputWindowProcessOperator.this.triggerContext, OneInputWindowProcessOperator.this.allowedLateness);
                        }
                        OneInputWindowProcessOperator.this.windowMergingState.mergeNamespaces(w2, collection2);
                    }

                    /* JADX WARN: Multi-variable type inference failed */
                    @Override // org.apache.flink.datastream.impl.extension.window.operators.MergingWindowSet.MergeFunction
                    public /* bridge */ /* synthetic */ void merge(Object obj, Collection collection, Object obj2, Collection collection2) throws Exception {
                        merge((Collection) obj, (Collection<Collection>) collection, (Collection) obj2, (Collection<Collection>) collection2);
                    }
                });
                if (WindowUtils.isWindowLate(addWindow, this.windowAssigner, this.internalTimerService, this.allowedLateness)) {
                    mergingWindowSet.retireWindow(addWindow);
                } else {
                    z = false;
                    Window stateWindow = mergingWindowSet.getStateWindow(addWindow);
                    if (stateWindow == null) {
                        throw new IllegalStateException("Window " + window + " is not in in-flight window set.");
                    }
                    this.windowState.setCurrentNamespace(stateWindow);
                    this.outputCollector.setTimestamp(window.maxTimestamp());
                    this.windowFunctionContext.setWindow(window);
                    this.windowProcessFunction.onRecord(streamRecord.getValue(), this.outputCollector, this.partitionedContext, this.windowFunctionContext);
                    this.triggerContext.setKey(currentKey);
                    this.triggerContext.setWindow(addWindow);
                    TriggerResult onElement = this.triggerContext.onElement(streamRecord);
                    if (onElement.isFire()) {
                        emitWindowContents(addWindow);
                    }
                    if (onElement.isPurge()) {
                        this.windowState.clear();
                    }
                    WindowUtils.registerCleanupTimer(addWindow, this.windowAssigner, this.triggerContext, this.allowedLateness);
                }
            }
            mergingWindowSet.persist();
        } else {
            for (Window window2 : assignWindows) {
                if (!WindowUtils.isWindowLate(window2, this.windowAssigner, this.internalTimerService, this.allowedLateness)) {
                    z = false;
                    this.windowState.setCurrentNamespace(window2);
                    this.outputCollector.setTimestamp(window2.maxTimestamp());
                    this.windowFunctionContext.setWindow(window2);
                    this.windowProcessFunction.onRecord(streamRecord.getValue(), this.outputCollector, this.partitionedContext, this.windowFunctionContext);
                    this.triggerContext.setKey(currentKey);
                    this.triggerContext.setWindow(window2);
                    TriggerResult onElement2 = this.triggerContext.onElement(streamRecord);
                    if (onElement2.isFire()) {
                        emitWindowContents(window2);
                    }
                    if (onElement2.isPurge()) {
                        this.windowState.clear();
                    }
                    WindowUtils.registerCleanupTimer(window2, this.windowAssigner, this.triggerContext, this.allowedLateness);
                }
            }
        }
        if (z && WindowUtils.isElementLate(streamRecord, this.windowAssigner, this.allowedLateness, this.internalTimerService)) {
            this.windowProcessFunction.onLateRecord(streamRecord.getValue(), this.outputCollector, this.partitionedContext);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void onEventTime(InternalTimer<K, W> internalTimer) throws Exception {
        MergingWindowSet<W> mergingWindowSet;
        this.triggerContext.setKey(internalTimer.getKey());
        this.triggerContext.setWindow((Window) internalTimer.getNamespace());
        if (this.windowAssigner instanceof MergingWindowAssigner) {
            mergingWindowSet = getMergingWindowSet();
            W stateWindow = mergingWindowSet.getStateWindow(this.triggerContext.getWindow());
            if (stateWindow == null) {
                return;
            } else {
                this.windowState.setCurrentNamespace(stateWindow);
            }
        } else {
            this.windowState.setCurrentNamespace(this.triggerContext.getWindow());
            mergingWindowSet = null;
        }
        TriggerResult onEventTime = this.triggerContext.onEventTime(internalTimer.getTimestamp());
        if (onEventTime.isFire()) {
            emitWindowContents(this.triggerContext.getWindow());
        }
        if (onEventTime.isPurge()) {
            this.windowState.clear();
        }
        if (this.windowAssigner.isEventTime() && WindowUtils.isCleanupTime(this.triggerContext.getWindow(), internalTimer.getTimestamp(), this.windowAssigner, this.allowedLateness)) {
            clearAllState(this.triggerContext.getWindow(), this.windowState, mergingWindowSet);
        }
        if (mergingWindowSet != null) {
            mergingWindowSet.persist();
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void onProcessingTime(InternalTimer<K, W> internalTimer) throws Exception {
        MergingWindowSet<W> mergingWindowSet;
        this.triggerContext.setKey(internalTimer.getKey());
        this.triggerContext.setWindow((Window) internalTimer.getNamespace());
        if (this.windowAssigner instanceof MergingWindowAssigner) {
            mergingWindowSet = getMergingWindowSet();
            W stateWindow = mergingWindowSet.getStateWindow(this.triggerContext.getWindow());
            if (stateWindow == null) {
                return;
            } else {
                this.windowState.setCurrentNamespace(stateWindow);
            }
        } else {
            this.windowState.setCurrentNamespace(this.triggerContext.getWindow());
            mergingWindowSet = null;
        }
        TriggerResult onProcessingTime = this.triggerContext.onProcessingTime(internalTimer.getTimestamp());
        if (onProcessingTime.isFire()) {
            emitWindowContents(this.triggerContext.getWindow());
        }
        if (onProcessingTime.isPurge()) {
            this.windowState.clear();
        }
        if (!this.windowAssigner.isEventTime() && WindowUtils.isCleanupTime(this.triggerContext.getWindow(), internalTimer.getTimestamp(), this.windowAssigner, this.allowedLateness)) {
            clearAllState(this.triggerContext.getWindow(), this.windowState, mergingWindowSet);
        }
        if (mergingWindowSet != null) {
            mergingWindowSet.persist();
        }
    }

    @Override // org.apache.flink.datastream.impl.operators.ProcessOperator
    protected ProcessingTimeManager getProcessingTimeManager() {
        return UnsupportedProcessingTimeManager.INSTANCE;
    }

    private void clearAllState(W w, AppendingState<IN, StateIterator<IN>, Iterable<IN>> appendingState, MergingWindowSet<W> mergingWindowSet) throws Exception {
        appendingState.clear();
        this.triggerContext.clear();
        this.windowFunctionContext.setWindow(w);
        this.windowProcessFunction.onClear(this.outputCollector, this.partitionedContext, this.windowFunctionContext);
        if (mergingWindowSet != null) {
            mergingWindowSet.retireWindow(w);
            mergingWindowSet.persist();
        }
    }

    private void emitWindowContents(W w) throws Exception {
        this.outputCollector.setTimestamp(w.maxTimestamp());
        this.windowFunctionContext.setWindow(w);
        this.windowProcessFunction.onTrigger(this.outputCollector, this.partitionedContext, this.windowFunctionContext);
    }

    protected MergingWindowSet<W> getMergingWindowSet() throws Exception {
        return new MergingWindowSet<>(this.windowAssigner, this.mergingSetsState);
    }
}
