package org.apache.flink.table.runtime.operators.window.internal;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.runtime.generated.NamespaceAggsHandleFunctionBase;
import org.apache.flink.table.runtime.operators.window.Window;
import org.apache.flink.table.runtime.operators.window.assigners.MergingWindowAssigner;
import org.apache.flink.table.runtime.operators.window.internal.InternalWindowProcessFunction;
import org.apache.flink.table.runtime.operators.window.internal.MergingWindowSet;

/* loaded from: input_file:org/apache/flink/table/runtime/operators/window/internal/MergingWindowProcessFunction.class */
public class MergingWindowProcessFunction<K, W extends Window> extends InternalWindowProcessFunction<K, W> {
    private static final long serialVersionUID = -2866771637946397223L;
    private final MergingWindowAssigner<W> windowAssigner;
    private final TypeSerializer<W> windowSerializer;
    private transient MergingWindowSet<W> mergingWindows;
    private transient MergingWindowProcessFunction<K, W>.MergingFunctionImpl mergingFunction;
    private List<W> reuseActualWindows;

    /* loaded from: input_file:org/apache/flink/table/runtime/operators/window/internal/MergingWindowProcessFunction$MergingFunctionImpl.class */
    private class MergingFunctionImpl implements MergingWindowSet.MergeFunction<W> {
        private MergingFunctionImpl() {
        }

        public void merge(W w, Collection<W> collection, W w2, Collection<W> collection2) throws Exception {
            if (MergingWindowProcessFunction.this.windowAssigner.isEventTime() && w.maxTimestamp() + MergingWindowProcessFunction.this.allowedLateness <= MergingWindowProcessFunction.this.ctx.currentWatermark()) {
                throw new UnsupportedOperationException("The end timestamp of an event-time window cannot become earlier than the current watermark by merging. Current watermark: " + MergingWindowProcessFunction.this.ctx.currentWatermark() + " window: " + w);
            }
            if (!MergingWindowProcessFunction.this.windowAssigner.isEventTime() && w.maxTimestamp() <= MergingWindowProcessFunction.this.ctx.currentProcessingTime()) {
                throw new UnsupportedOperationException("The end timestamp of a processing-time window cannot become earlier than the current processing time by merging. Current processing time: " + MergingWindowProcessFunction.this.ctx.currentProcessingTime() + " window: " + w);
            }
            MergingWindowProcessFunction.this.ctx.onMerge(w, collection2);
            for (W w3 : collection) {
                MergingWindowProcessFunction.this.ctx.clearTrigger(w3);
                MergingWindowProcessFunction.this.ctx.deleteCleanupTimer(w3);
            }
            if (collection2.isEmpty()) {
                return;
            }
            BaseRow windowAccumulators = MergingWindowProcessFunction.this.ctx.getWindowAccumulators(w2);
            if (windowAccumulators == null) {
                windowAccumulators = MergingWindowProcessFunction.this.windowAggregator.createAccumulators();
            }
            MergingWindowProcessFunction.this.windowAggregator.setAccumulators(w2, windowAccumulators);
            for (W w4 : collection2) {
                BaseRow windowAccumulators2 = MergingWindowProcessFunction.this.ctx.getWindowAccumulators(w4);
                if (windowAccumulators2 != null) {
                    MergingWindowProcessFunction.this.windowAggregator.merge(w4, windowAccumulators2);
                }
                MergingWindowProcessFunction.this.ctx.clearWindowState(w4);
                MergingWindowProcessFunction.this.ctx.clearPreviousState(w4);
            }
            MergingWindowProcessFunction.this.ctx.setWindowAccumulators(w2, MergingWindowProcessFunction.this.windowAggregator.getAccumulators());
        }

        /* JADX WARN: Multi-variable type inference failed */
        @Override // org.apache.flink.table.runtime.operators.window.internal.MergingWindowSet.MergeFunction
        public /* bridge */ /* synthetic */ void merge(Object obj, Collection collection, Object obj2, Collection collection2) throws Exception {
            merge((Collection) obj, (Collection<Collection>) collection, (Collection) obj2, (Collection<Collection>) collection2);
        }
    }

    public MergingWindowProcessFunction(MergingWindowAssigner<W> mergingWindowAssigner, NamespaceAggsHandleFunctionBase<W> namespaceAggsHandleFunctionBase, TypeSerializer<W> typeSerializer, long j) {
        super(mergingWindowAssigner, namespaceAggsHandleFunctionBase, j);
        this.windowAssigner = mergingWindowAssigner;
        this.windowSerializer = typeSerializer;
    }

    @Override // org.apache.flink.table.runtime.operators.window.internal.InternalWindowProcessFunction
    public void open(InternalWindowProcessFunction.Context<K, W> context) throws Exception {
        super.open(context);
        this.mergingWindows = new MergingWindowSet<>(this.windowAssigner, context.getPartitionedState(new MapStateDescriptor("session-window-mapping", this.windowSerializer, this.windowSerializer)));
        this.mergingFunction = new MergingFunctionImpl();
    }

    @Override // org.apache.flink.table.runtime.operators.window.internal.InternalWindowProcessFunction
    public Collection<W> assignStateNamespace(BaseRow baseRow, long j) throws Exception {
        Collection<W> assignWindows = this.windowAssigner.assignWindows(baseRow, j);
        this.mergingWindows.initializeCache(this.ctx.currentKey());
        this.reuseActualWindows = new ArrayList(1);
        Iterator<W> it = assignWindows.iterator();
        while (it.hasNext()) {
            W addWindow = this.mergingWindows.addWindow(it.next(), this.mergingFunction);
            if (isWindowLate(addWindow)) {
                this.mergingWindows.retireWindow(addWindow);
            } else {
                this.reuseActualWindows.add(addWindow);
            }
        }
        ArrayList arrayList = new ArrayList(this.reuseActualWindows.size());
        Iterator<W> it2 = this.reuseActualWindows.iterator();
        while (it2.hasNext()) {
            arrayList.add(this.mergingWindows.getStateWindow(it2.next()));
        }
        return arrayList;
    }

    @Override // org.apache.flink.table.runtime.operators.window.internal.InternalWindowProcessFunction
    public Collection<W> assignActualWindows(BaseRow baseRow, long j) throws Exception {
        return this.reuseActualWindows;
    }

    @Override // org.apache.flink.table.runtime.operators.window.internal.InternalWindowProcessFunction
    public void prepareAggregateAccumulatorForEmit(W w) throws Exception {
        W stateWindow = this.mergingWindows.getStateWindow(w);
        BaseRow windowAccumulators = this.ctx.getWindowAccumulators(stateWindow);
        if (windowAccumulators == null) {
            windowAccumulators = this.windowAggregator.createAccumulators();
        }
        this.windowAggregator.setAccumulators(stateWindow, windowAccumulators);
    }

    @Override // org.apache.flink.table.runtime.operators.window.internal.InternalWindowProcessFunction
    public void cleanWindowIfNeeded(W w, long j) throws Exception {
        if (isCleanupTime(w, j)) {
            this.ctx.clearTrigger(w);
            this.ctx.clearWindowState(this.mergingWindows.getStateWindow(w));
            this.mergingWindows.initializeCache(this.ctx.currentKey());
            this.mergingWindows.retireWindow(w);
        }
    }
}
