/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.datastream.impl.extension.window.operators;

import java.util.Collection;
import javax.annotation.Nullable;
import org.apache.flink.api.common.state.v2.AppendingState;
import org.apache.flink.api.common.state.v2.ListStateDescriptor;
import org.apache.flink.api.common.state.v2.StateDescriptor;
import org.apache.flink.api.common.state.v2.StateIterator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
import org.apache.flink.datastream.api.common.Collector;
import org.apache.flink.datastream.api.context.ProcessingTimeManager;
import org.apache.flink.datastream.api.context.TwoOutputPartitionedContext;
import org.apache.flink.datastream.api.extension.window.function.TwoOutputWindowStreamProcessFunction;
import org.apache.flink.datastream.api.extension.window.function.WindowProcessFunction;
import org.apache.flink.datastream.impl.context.UnsupportedProcessingTimeManager;
import org.apache.flink.datastream.impl.extension.window.context.DefaultOneInputWindowContext;
import org.apache.flink.datastream.impl.extension.window.context.WindowTriggerContext;
import org.apache.flink.datastream.impl.extension.window.function.InternalTwoOutputWindowStreamProcessFunction;
import org.apache.flink.datastream.impl.extension.window.operators.MergingWindowSet;
import org.apache.flink.datastream.impl.extension.window.utils.WindowUtils;
import org.apache.flink.datastream.impl.operators.BaseKeyedTwoOutputProcessOperator;
import org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateStreamOperator;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.v2.internal.InternalAppendingState;
import org.apache.flink.runtime.state.v2.internal.InternalListState;
import org.apache.flink.runtime.state.v2.internal.InternalMergingState;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.OutputTag;
import org.apache.flink.util.Preconditions;

public class TwoOutputWindowProcessOperator<K, IN, OUT_MAIN, OUT_SIDE, W extends Window>
extends BaseKeyedTwoOutputProcessOperator<K, IN, OUT_MAIN, OUT_SIDE>
implements Triggerable<K, W> {
    private static final long serialVersionUID = 1L;
    private final TwoOutputWindowStreamProcessFunction<IN, OUT_MAIN, OUT_SIDE> windowProcessFunction;
    private final long allowedLateness;
    private transient InternalTimerService<W> internalTimerService;
    private final TypeSerializer<W> windowSerializer;
    private final WindowAssigner<? super IN, W> windowAssigner;
    private transient WindowAssigner.WindowAssignerContext windowAssignerContext;
    private final Trigger<? super IN, ? super W> trigger;
    private transient WindowTriggerContext<K, IN, W> triggerContext;
    private transient DefaultOneInputWindowContext<K, IN, W> windowFunctionContext;
    private final StateDescriptor<IN> windowStateDescriptor;
    private transient InternalAppendingState<K, W, IN, IN, StateIterator<IN>, Iterable<IN>> windowState;
    private transient InternalMergingState<K, W, IN, IN, StateIterator<IN>, Iterable<IN>> windowMergingState;
    private transient InternalListState<K, VoidNamespace, Tuple2<W, W>> mergingSetsState;

    public TwoOutputWindowProcessOperator(InternalTwoOutputWindowStreamProcessFunction<IN, OUT_MAIN, OUT_SIDE, W> windowProcessFunction, OutputTag<OUT_SIDE> outputTag, @Nullable KeySelector<OUT_MAIN, K> mainOutKeySelector, @Nullable KeySelector<OUT_SIDE, K> sideOutKeySelector, WindowAssigner<? super IN, W> windowAssigner, Trigger<? super IN, ? super W> trigger, TypeSerializer<W> windowSerializer, StateDescriptor<IN> windowStateDescriptor, long allowedLateness) {
        super(windowProcessFunction, outputTag, mainOutKeySelector, sideOutKeySelector);
        Preconditions.checkArgument((allowedLateness >= 0L ? 1 : 0) != 0);
        this.windowProcessFunction = windowProcessFunction.getWindowProcessFunction();
        this.windowAssigner = windowAssigner;
        this.trigger = trigger;
        this.windowSerializer = windowSerializer;
        this.windowStateDescriptor = windowStateDescriptor;
        this.allowedLateness = allowedLateness;
    }

    @Override
    public void open() throws Exception {
        super.open();
        this.internalTimerService = this.getInternalTimerService("process-window-timers", this.windowSerializer, this);
        if (this.windowStateDescriptor != null) {
            this.windowState = (InternalAppendingState)this.getOrCreateKeyedState((Window)this.windowSerializer.createInstance(), this.windowSerializer, this.windowStateDescriptor);
        }
        if (this.windowAssigner instanceof MergingWindowAssigner) {
            if (this.windowState instanceof InternalMergingState) {
                this.windowMergingState = (InternalMergingState)this.windowState;
            } else if (this.windowState != null) {
                throw new IllegalStateException("The window uses a merging assigner, but the window state is not mergeable.");
            }
            Class<Tuple2> typedTuple = Tuple2.class;
            TupleSerializer tupleSerializer = new TupleSerializer(typedTuple, new TypeSerializer[]{this.windowSerializer, this.windowSerializer});
            ListStateDescriptor mergingSetsStateDescriptor = new ListStateDescriptor("merging-window-set", (TypeSerializer)tupleSerializer);
            this.mergingSetsState = (InternalListState)this.getOrCreateKeyedState(VoidNamespaceSerializer.INSTANCE.createInstance(), (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)mergingSetsStateDescriptor);
            this.mergingSetsState.setCurrentNamespace((Object)VoidNamespace.INSTANCE);
        }
        this.triggerContext = new WindowTriggerContext<Object, IN, W>(null, null, (AbstractStreamOperator<?>)this, this.internalTimerService, this.trigger, this.windowSerializer);
        this.windowAssignerContext = new WindowAssigner.WindowAssignerContext(){

            public long getCurrentProcessingTime() {
                return TwoOutputWindowProcessOperator.this.internalTimerService.currentProcessingTime();
            }
        };
        this.windowFunctionContext = new DefaultOneInputWindowContext(null, this.windowState, (WindowProcessFunction)this.windowProcessFunction, (AbstractAsyncStateStreamOperator<?>)this, (TypeSerializer<Object>)this.windowSerializer, this.windowMergingState != null);
    }

    @Override
    public void close() throws Exception {
        super.close();
        this.triggerContext = null;
        this.windowFunctionContext = null;
        this.windowAssignerContext = null;
    }

    @Override
    public void processElement(StreamRecord<IN> element) throws Exception {
        Collection elementWindows = this.windowAssigner.assignWindows(element.getValue(), element.getTimestamp(), this.windowAssignerContext);
        boolean isSkippedElement = true;
        final Object key = this.getCurrentKey();
        if (this.windowAssigner instanceof MergingWindowAssigner) {
            MergingWindowSet<Window> mergingWindows = this.getMergingWindowSet();
            for (Window window : elementWindows) {
                Window actualWindow = mergingWindows.addWindow(window, new MergingWindowSet.MergeFunction<W>(){

                    @Override
                    public void merge(W mergeResult, Collection<W> mergedWindows, W stateWindowResult, Collection<W> mergedStateWindows) throws Exception {
                        if (TwoOutputWindowProcessOperator.this.windowAssigner.isEventTime() && mergeResult.maxTimestamp() + TwoOutputWindowProcessOperator.this.allowedLateness <= TwoOutputWindowProcessOperator.this.internalTimerService.currentWatermark()) {
                            throw new UnsupportedOperationException("The end timestamp of an event-time window cannot become earlier than the current watermark by merging. Current event time: " + TwoOutputWindowProcessOperator.this.internalTimerService.currentWatermark() + " window: " + mergeResult);
                        }
                        if (!TwoOutputWindowProcessOperator.this.windowAssigner.isEventTime()) {
                            long currentProcessingTime = TwoOutputWindowProcessOperator.this.internalTimerService.currentProcessingTime();
                            if (mergeResult.maxTimestamp() <= currentProcessingTime) {
                                throw new UnsupportedOperationException("The end timestamp of a processing-time window cannot become earlier than the current processing time by merging. Current processing time: " + currentProcessingTime + " window: " + mergeResult);
                            }
                        }
                        TwoOutputWindowProcessOperator.this.triggerContext.setKey(key);
                        TwoOutputWindowProcessOperator.this.triggerContext.setWindow(mergeResult);
                        TwoOutputWindowProcessOperator.this.triggerContext.onMerge(mergedWindows);
                        for (Window m : mergedWindows) {
                            TwoOutputWindowProcessOperator.this.triggerContext.setWindow(m);
                            TwoOutputWindowProcessOperator.this.triggerContext.clear();
                            WindowUtils.deleteCleanupTimer(m, TwoOutputWindowProcessOperator.this.windowAssigner, TwoOutputWindowProcessOperator.this.triggerContext, TwoOutputWindowProcessOperator.this.allowedLateness);
                        }
                        TwoOutputWindowProcessOperator.this.windowMergingState.mergeNamespaces(stateWindowResult, mergedStateWindows);
                    }
                });
                if (WindowUtils.isWindowLate(actualWindow, this.windowAssigner, this.internalTimerService, this.allowedLateness)) {
                    mergingWindows.retireWindow(actualWindow);
                    continue;
                }
                isSkippedElement = false;
                Window stateWindow = mergingWindows.getStateWindow(actualWindow);
                if (stateWindow == null) {
                    throw new IllegalStateException("Window " + window + " is not in in-flight window set.");
                }
                this.windowState.setCurrentNamespace((Object)stateWindow);
                this.mainCollector.setTimestamp(window.maxTimestamp());
                this.sideCollector.setTimestamp(window.maxTimestamp());
                this.windowFunctionContext.setWindow(window);
                this.windowProcessFunction.onRecord(element.getValue(), (Collector)this.mainCollector, (Collector)this.sideCollector, (TwoOutputPartitionedContext)this.partitionedContext, this.windowFunctionContext);
                this.triggerContext.setKey(key);
                this.triggerContext.setWindow(actualWindow);
                TriggerResult triggerResult = this.triggerContext.onElement(element);
                if (triggerResult.isFire()) {
                    this.emitWindowContents(actualWindow);
                }
                if (triggerResult.isPurge()) {
                    this.windowState.clear();
                }
                WindowUtils.registerCleanupTimer(actualWindow, this.windowAssigner, this.triggerContext, this.allowedLateness);
            }
            mergingWindows.persist();
        } else {
            for (Window window : elementWindows) {
                if (WindowUtils.isWindowLate(window, this.windowAssigner, this.internalTimerService, this.allowedLateness)) continue;
                isSkippedElement = false;
                this.windowState.setCurrentNamespace((Object)window);
                this.mainCollector.setTimestamp(window.maxTimestamp());
                this.sideCollector.setTimestamp(window.maxTimestamp());
                this.windowFunctionContext.setWindow(window);
                this.windowProcessFunction.onRecord(element.getValue(), (Collector)this.mainCollector, (Collector)this.sideCollector, (TwoOutputPartitionedContext)this.partitionedContext, this.windowFunctionContext);
                this.triggerContext.setKey(key);
                this.triggerContext.setWindow(window);
                TriggerResult triggerResult = this.triggerContext.onElement(element);
                if (triggerResult.isFire()) {
                    this.emitWindowContents(window);
                }
                if (triggerResult.isPurge()) {
                    this.windowState.clear();
                }
                WindowUtils.registerCleanupTimer(window, this.windowAssigner, this.triggerContext, this.allowedLateness);
            }
        }
        if (isSkippedElement && WindowUtils.isElementLate(element, this.windowAssigner, this.allowedLateness, this.internalTimerService)) {
            this.windowProcessFunction.onLateRecord(element.getValue(), (Collector)this.mainCollector, (Collector)this.sideCollector, (TwoOutputPartitionedContext)this.partitionedContext);
        }
    }

    public void onEventTime(InternalTimer<K, W> timer) throws Exception {
        MergingWindowSet<W> mergingWindows;
        this.triggerContext.setKey(timer.getKey());
        this.triggerContext.setWindow((Window)timer.getNamespace());
        if (this.windowAssigner instanceof MergingWindowAssigner) {
            mergingWindows = this.getMergingWindowSet();
            W stateWindow = mergingWindows.getStateWindow(this.triggerContext.getWindow());
            if (stateWindow == null) {
                return;
            }
            this.windowState.setCurrentNamespace(stateWindow);
        } else {
            this.windowState.setCurrentNamespace(this.triggerContext.getWindow());
            mergingWindows = null;
        }
        TriggerResult triggerResult = this.triggerContext.onEventTime(timer.getTimestamp());
        if (triggerResult.isFire()) {
            this.emitWindowContents(this.triggerContext.getWindow());
        }
        if (triggerResult.isPurge()) {
            this.windowState.clear();
        }
        if (this.windowAssigner.isEventTime() && WindowUtils.isCleanupTime(this.triggerContext.getWindow(), timer.getTimestamp(), this.windowAssigner, this.allowedLateness)) {
            this.clearAllState(this.triggerContext.getWindow(), (AppendingState<IN, StateIterator<IN>, Iterable<IN>>)this.windowState, mergingWindows);
        }
        if (mergingWindows != null) {
            mergingWindows.persist();
        }
    }

    public void onProcessingTime(InternalTimer<K, W> timer) throws Exception {
        MergingWindowSet<W> mergingWindows;
        this.triggerContext.setKey(timer.getKey());
        this.triggerContext.setWindow((Window)timer.getNamespace());
        if (this.windowAssigner instanceof MergingWindowAssigner) {
            mergingWindows = this.getMergingWindowSet();
            W stateWindow = mergingWindows.getStateWindow(this.triggerContext.getWindow());
            if (stateWindow == null) {
                return;
            }
            this.windowState.setCurrentNamespace(stateWindow);
        } else {
            this.windowState.setCurrentNamespace(this.triggerContext.getWindow());
            mergingWindows = null;
        }
        TriggerResult triggerResult = this.triggerContext.onProcessingTime(timer.getTimestamp());
        if (triggerResult.isFire()) {
            this.emitWindowContents(this.triggerContext.getWindow());
        }
        if (triggerResult.isPurge()) {
            this.windowState.clear();
        }
        if (!this.windowAssigner.isEventTime() && WindowUtils.isCleanupTime(this.triggerContext.getWindow(), timer.getTimestamp(), this.windowAssigner, this.allowedLateness)) {
            this.clearAllState(this.triggerContext.getWindow(), (AppendingState<IN, StateIterator<IN>, Iterable<IN>>)this.windowState, mergingWindows);
        }
        if (mergingWindows != null) {
            mergingWindows.persist();
        }
    }

    @Override
    protected ProcessingTimeManager getProcessingTimeManager() {
        return UnsupportedProcessingTimeManager.INSTANCE;
    }

    private void clearAllState(W window, AppendingState<IN, StateIterator<IN>, Iterable<IN>> windowState, MergingWindowSet<W> mergingWindows) throws Exception {
        windowState.clear();
        this.triggerContext.clear();
        this.windowFunctionContext.setWindow(window);
        this.windowProcessFunction.onClear((Collector)this.mainCollector, (Collector)this.sideCollector, (TwoOutputPartitionedContext)this.partitionedContext, this.windowFunctionContext);
        if (mergingWindows != null) {
            mergingWindows.retireWindow(window);
            mergingWindows.persist();
        }
    }

    private void emitWindowContents(W window) throws Exception {
        this.mainCollector.setTimestamp(window.maxTimestamp());
        this.sideCollector.setTimestamp(window.maxTimestamp());
        this.windowFunctionContext.setWindow(window);
        this.windowProcessFunction.onTrigger((Collector)this.mainCollector, (Collector)this.sideCollector, (TwoOutputPartitionedContext)this.partitionedContext, this.windowFunctionContext);
    }

    protected MergingWindowSet<W> getMergingWindowSet() throws Exception {
        MergingWindowAssigner mergingAssigner = (MergingWindowAssigner)this.windowAssigner;
        return new MergingWindowSet<W>(mergingAssigner, this.mergingSetsState);
    }
}

