package org.apache.flink.table.runtime.operators.aggregate.window.processors;

import java.time.ZoneId;
import java.util.Collection;
import java.util.Objects;
import java.util.Optional;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.MergingState;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.internal.InternalMergingState;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.util.RowDataUtil;
import org.apache.flink.table.runtime.generated.GeneratedNamespaceAggsHandleFunction;
import org.apache.flink.table.runtime.operators.window.TimeWindow;
import org.apache.flink.table.runtime.operators.window.Window;
import org.apache.flink.table.runtime.operators.window.groupwindow.internal.MergingWindowProcessFunction;
import org.apache.flink.table.runtime.operators.window.groupwindow.triggers.EventTimeTriggers;
import org.apache.flink.table.runtime.operators.window.groupwindow.triggers.ProcessingTimeTriggers;
import org.apache.flink.table.runtime.operators.window.groupwindow.triggers.Trigger;
import org.apache.flink.table.runtime.operators.window.tvf.common.SyncStateWindowProcessor;
import org.apache.flink.table.runtime.operators.window.tvf.common.WindowTimerService;
import org.apache.flink.table.runtime.operators.window.tvf.unslicing.UnsliceAssigner;
import org.apache.flink.table.runtime.operators.window.tvf.unslicing.UnslicingSyncStateWindowProcessor;
import org.apache.flink.table.runtime.operators.window.tvf.unslicing.UnslicingWindowTimerServiceImpl;
import org.apache.flink.table.runtime.util.TimeWindowUtil;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.BiConsumerWithException;

/* loaded from: input_file:org/apache/flink/table/runtime/operators/aggregate/window/processors/UnsliceSyncStateWindowAggProcessor.class */
public class UnsliceSyncStateWindowAggProcessor extends AbstractSyncStateWindowAggProcessor<TimeWindow> implements UnslicingSyncStateWindowProcessor<TimeWindow> {
    private final UnsliceAssigner<TimeWindow> unsliceAssigner;
    private final Trigger<TimeWindow> trigger;
    private transient MetricGroup metrics;
    protected transient MergingWindowProcessFunction<RowData, TimeWindow> windowFunction;
    private transient TriggerContextImpl triggerContext;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/table/runtime/operators/aggregate/window/processors/UnsliceSyncStateWindowAggProcessor$TriggerContextImpl.class */
    public class TriggerContextImpl implements Trigger.OnMergeContext {
        private TimeWindow window;
        private Collection<TimeWindow> mergedWindows;

        private TriggerContextImpl() {
        }

        public void open() throws Exception {
            UnsliceSyncStateWindowAggProcessor.this.trigger.open(this);
        }

        @Override // org.apache.flink.table.runtime.operators.window.groupwindow.triggers.Trigger.TriggerContext
        public MetricGroup getMetricGroup() {
            return UnsliceSyncStateWindowAggProcessor.this.metrics;
        }

        public boolean onProcessingTime(long j) throws Exception {
            return UnsliceSyncStateWindowAggProcessor.this.trigger.onProcessingTime(j, this.window);
        }

        public boolean onEventTime(long j) throws Exception {
            return UnsliceSyncStateWindowAggProcessor.this.trigger.onEventTime(j, this.window);
        }

        public void onMerge() throws Exception {
            UnsliceSyncStateWindowAggProcessor.this.trigger.onMerge(this.window, this);
        }

        public void setWindow(TimeWindow timeWindow) {
            this.window = timeWindow;
        }

        public void setMergedWindows(Collection<TimeWindow> collection) {
            this.mergedWindows = collection;
        }

        @Override // org.apache.flink.table.runtime.operators.window.groupwindow.triggers.Trigger.TriggerContext
        public long getCurrentProcessingTime() {
            return ((SyncStateWindowProcessor.SyncStateContext) UnsliceSyncStateWindowAggProcessor.this.ctx).getTimerService().currentProcessingTime();
        }

        @Override // org.apache.flink.table.runtime.operators.window.groupwindow.triggers.Trigger.TriggerContext
        public long getCurrentWatermark() {
            return ((SyncStateWindowProcessor.SyncStateContext) UnsliceSyncStateWindowAggProcessor.this.ctx).getTimerService().currentWatermark();
        }

        @Override // org.apache.flink.table.runtime.operators.window.groupwindow.triggers.Trigger.TriggerContext
        public void registerProcessingTimeTimer(long j) {
            ((SyncStateWindowProcessor.SyncStateContext) UnsliceSyncStateWindowAggProcessor.this.ctx).getTimerService().registerProcessingTimeTimer(this.window, j);
        }

        @Override // org.apache.flink.table.runtime.operators.window.groupwindow.triggers.Trigger.TriggerContext
        public void registerEventTimeTimer(long j) {
            ((SyncStateWindowProcessor.SyncStateContext) UnsliceSyncStateWindowAggProcessor.this.ctx).getTimerService().registerEventTimeTimer(this.window, j);
        }

        @Override // org.apache.flink.table.runtime.operators.window.groupwindow.triggers.Trigger.TriggerContext
        public void deleteProcessingTimeTimer(long j) {
            ((SyncStateWindowProcessor.SyncStateContext) UnsliceSyncStateWindowAggProcessor.this.ctx).getTimerService().deleteProcessingTimeTimer(this.window, j);
        }

        @Override // org.apache.flink.table.runtime.operators.window.groupwindow.triggers.Trigger.TriggerContext
        public void deleteEventTimeTimer(long j) {
            ((SyncStateWindowProcessor.SyncStateContext) UnsliceSyncStateWindowAggProcessor.this.ctx).getTimerService().deleteEventTimeTimer(this.window, j);
        }

        @Override // org.apache.flink.table.runtime.operators.window.groupwindow.triggers.Trigger.TriggerContext
        public ZoneId getShiftTimeZone() {
            return UnsliceSyncStateWindowAggProcessor.this.shiftTimeZone;
        }

        public void clear() throws Exception {
            UnsliceSyncStateWindowAggProcessor.this.trigger.clear(this.window);
        }

        @Override // org.apache.flink.table.runtime.operators.window.groupwindow.triggers.Trigger.OnMergeContext
        public <S extends MergingState<?, ?>> void mergePartitionedState(StateDescriptor<S, ?> stateDescriptor) {
            if (this.mergedWindows == null || this.mergedWindows.isEmpty()) {
                return;
            }
            try {
                InternalMergingState orCreateKeyedState = ((SyncStateWindowProcessor.SyncStateContext) UnsliceSyncStateWindowAggProcessor.this.ctx).getKeyedStateBackend().getOrCreateKeyedState(UnsliceSyncStateWindowAggProcessor.this.createWindowSerializer(), stateDescriptor);
                if (!(orCreateKeyedState instanceof InternalMergingState)) {
                    throw new IllegalArgumentException("The given state descriptor does not refer to a mergeable state (MergingState)");
                }
                orCreateKeyedState.mergeNamespaces(this.window, this.mergedWindows);
            } catch (Exception e) {
                throw new RuntimeException("Error while merging state.", e);
            }
        }

        @Override // org.apache.flink.table.runtime.operators.window.groupwindow.triggers.Trigger.TriggerContext
        public <S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor) {
            try {
                return (S) ((SyncStateWindowProcessor.SyncStateContext) UnsliceSyncStateWindowAggProcessor.this.ctx).getKeyedStateBackend().getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, stateDescriptor);
            } catch (Exception e) {
                throw new RuntimeException("Could not retrieve state", e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/table/runtime/operators/aggregate/window/processors/UnsliceSyncStateWindowAggProcessor$WindowContextImpl.class */
    public class WindowContextImpl implements MergingWindowProcessFunction.MergingContext<RowData, TimeWindow> {
        private WindowContextImpl() {
        }

        @Override // org.apache.flink.table.runtime.operators.window.groupwindow.internal.InternalWindowProcessFunction.Context
        public long currentProcessingTime() {
            return ((SyncStateWindowProcessor.SyncStateContext) UnsliceSyncStateWindowAggProcessor.this.ctx).getTimerService().currentProcessingTime();
        }

        @Override // org.apache.flink.table.runtime.operators.window.groupwindow.internal.InternalWindowProcessFunction.Context
        public long currentWatermark() {
            return ((SyncStateWindowProcessor.SyncStateContext) UnsliceSyncStateWindowAggProcessor.this.ctx).getTimerService().currentWatermark();
        }

        @Override // org.apache.flink.table.runtime.operators.window.groupwindow.internal.InternalWindowProcessFunction.Context
        public ZoneId getShiftTimeZone() {
            return UnsliceSyncStateWindowAggProcessor.this.shiftTimeZone;
        }

        @Override // org.apache.flink.table.runtime.operators.window.groupwindow.internal.InternalWindowProcessFunction.Context
        public RowData getWindowAccumulators(TimeWindow timeWindow) throws Exception {
            return UnsliceSyncStateWindowAggProcessor.this.windowState.value(timeWindow);
        }

        @Override // org.apache.flink.table.runtime.operators.window.groupwindow.internal.InternalWindowProcessFunction.Context
        public void setWindowAccumulators(TimeWindow timeWindow, RowData rowData) throws Exception {
            UnsliceSyncStateWindowAggProcessor.this.windowState.update(timeWindow, rowData);
        }

        @Override // org.apache.flink.table.runtime.operators.window.groupwindow.internal.InternalWindowProcessFunction.Context
        public void clearWindowState(TimeWindow timeWindow) throws Exception {
            UnsliceSyncStateWindowAggProcessor.this.windowState.clear(timeWindow);
            UnsliceSyncStateWindowAggProcessor.this.aggregator.cleanup(timeWindow);
        }

        @Override // org.apache.flink.table.runtime.operators.window.groupwindow.internal.InternalWindowProcessFunction.Context
        public void clearPreviousState(TimeWindow timeWindow) throws Exception {
        }

        @Override // org.apache.flink.table.runtime.operators.window.groupwindow.internal.InternalWindowProcessFunction.Context
        public void clearTrigger(TimeWindow timeWindow) throws Exception {
            UnsliceSyncStateWindowAggProcessor.this.triggerContext.setWindow(timeWindow);
            UnsliceSyncStateWindowAggProcessor.this.triggerContext.clear();
        }

        @Override // org.apache.flink.table.runtime.operators.window.groupwindow.internal.InternalWindowProcessFunction.Context
        public void deleteCleanupTimer(TimeWindow timeWindow) throws Exception {
            long epochMillsForTimer = TimeWindowUtil.toEpochMillsForTimer(timeWindow.maxTimestamp(), UnsliceSyncStateWindowAggProcessor.this.shiftTimeZone);
            if (epochMillsForTimer == Long.MAX_VALUE) {
                return;
            }
            if (UnsliceSyncStateWindowAggProcessor.this.unsliceAssigner.isEventTime()) {
                UnsliceSyncStateWindowAggProcessor.this.triggerContext.deleteEventTimeTimer(epochMillsForTimer);
            } else {
                UnsliceSyncStateWindowAggProcessor.this.triggerContext.deleteProcessingTimeTimer(epochMillsForTimer);
            }
        }

        public void onMerge(TimeWindow timeWindow, Collection<TimeWindow> collection) throws Exception {
            UnsliceSyncStateWindowAggProcessor.this.triggerContext.setWindow(timeWindow);
            UnsliceSyncStateWindowAggProcessor.this.triggerContext.setMergedWindows(collection);
            UnsliceSyncStateWindowAggProcessor.this.triggerContext.onMerge();
        }

        @Override // org.apache.flink.table.runtime.operators.window.groupwindow.internal.InternalWindowProcessFunction.Context
        public <S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor) throws Exception {
            Objects.requireNonNull(stateDescriptor, "The state properties must not be null");
            return (S) ((SyncStateWindowProcessor.SyncStateContext) UnsliceSyncStateWindowAggProcessor.this.ctx).getKeyedStateBackend().getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, stateDescriptor);
        }

        @Override // org.apache.flink.table.runtime.operators.window.groupwindow.internal.InternalWindowProcessFunction.Context
        public RowData currentKey() {
            return (RowData) ((SyncStateWindowProcessor.SyncStateContext) UnsliceSyncStateWindowAggProcessor.this.ctx).getKeyedStateBackend().getCurrentKey();
        }

        @Override // org.apache.flink.table.runtime.operators.window.groupwindow.internal.MergingWindowProcessFunction.MergingContext
        public BiConsumerWithException<TimeWindow, Collection<TimeWindow>, Throwable> getWindowStateMergingConsumer() {
            return new MergingWindowProcessFunction.DefaultAccMergingConsumer(this, UnsliceSyncStateWindowAggProcessor.this.aggregator);
        }

        @Override // org.apache.flink.table.runtime.operators.window.groupwindow.internal.InternalWindowProcessFunction.Context
        public /* bridge */ /* synthetic */ void onMerge(Window window, Collection collection) throws Exception {
            onMerge((TimeWindow) window, (Collection<TimeWindow>) collection);
        }
    }

    public UnsliceSyncStateWindowAggProcessor(GeneratedNamespaceAggsHandleFunction<TimeWindow> generatedNamespaceAggsHandleFunction, UnsliceAssigner<TimeWindow> unsliceAssigner, TypeSerializer<RowData> typeSerializer, int i, ZoneId zoneId) {
        super(generatedNamespaceAggsHandleFunction, unsliceAssigner, typeSerializer, unsliceAssigner.isEventTime(), i, zoneId);
        this.unsliceAssigner = unsliceAssigner;
        if (this.isEventTime) {
            this.trigger = EventTimeTriggers.afterEndOfWindow();
        } else {
            this.trigger = ProcessingTimeTriggers.afterEndOfWindow();
        }
    }

    @Override // org.apache.flink.table.runtime.operators.aggregate.window.processors.AbstractSyncStateWindowAggProcessor, org.apache.flink.table.runtime.operators.aggregate.window.processors.WindowAggProcessorBase, org.apache.flink.table.runtime.operators.window.tvf.common.WindowProcessor
    public void open(SyncStateWindowProcessor.SyncStateContext<TimeWindow> syncStateContext) throws Exception {
        super.open((SyncStateWindowProcessor.SyncStateContext) syncStateContext);
        this.metrics = syncStateContext.getRuntimeContext().getMetricGroup();
        this.windowFunction = new MergingWindowProcessFunction<>(this.unsliceAssigner.getMergingWindowAssigner(), this.aggregator, this.unsliceAssigner.getMergingWindowAssigner().getWindowSerializer(new ExecutionConfig()), 0L);
        this.triggerContext = new TriggerContextImpl();
        this.triggerContext.open();
        this.windowFunction.open(new WindowContextImpl());
    }

    @Override // org.apache.flink.table.runtime.operators.window.tvf.common.SyncStateWindowProcessor
    public boolean processElement(RowData rowData, RowData rowData2) throws Exception {
        Optional<TimeWindow> assignStateNamespace = this.unsliceAssigner.assignStateNamespace(rowData2, this.clockService, this.windowFunction);
        boolean z = true;
        if (assignStateNamespace.isPresent()) {
            TimeWindow timeWindow = assignStateNamespace.get();
            z = false;
            RowData value = this.windowState.value(timeWindow);
            if (value == null) {
                value = this.aggregator.createAccumulators();
            }
            this.aggregator.setAccumulators(timeWindow, value);
            if (RowDataUtil.isAccumulateMsg(rowData2)) {
                this.aggregator.accumulate(rowData2);
            } else {
                this.aggregator.retract(rowData2);
            }
            this.windowState.update(timeWindow, this.aggregator.getAccumulators());
        }
        Optional<TimeWindow> assignActualWindow = this.unsliceAssigner.assignActualWindow(rowData2, this.clockService, this.windowFunction);
        Preconditions.checkArgument((assignStateNamespace.isPresent() && assignActualWindow.isPresent()) || !(assignStateNamespace.isPresent() || assignActualWindow.isPresent()));
        if (assignActualWindow.isPresent()) {
            TimeWindow timeWindow2 = assignActualWindow.get();
            this.triggerContext.setWindow(timeWindow2);
            long epochMillsForTimer = TimeWindowUtil.toEpochMillsForTimer(timeWindow2.maxTimestamp(), this.shiftTimeZone);
            if (this.isEventTime) {
                this.triggerContext.registerEventTimeTimer(epochMillsForTimer);
            } else {
                this.triggerContext.registerProcessingTimeTimer(epochMillsForTimer);
            }
        }
        return z;
    }

    @Override // org.apache.flink.table.runtime.operators.window.tvf.common.SyncStateWindowProcessor
    public void fireWindow(long j, TimeWindow timeWindow) throws Exception {
        this.windowFunction.prepareAggregateAccumulatorForEmit(timeWindow);
        RowData value = this.aggregator.getValue(timeWindow);
        this.triggerContext.setWindow(timeWindow);
        if (!(this.isEventTime ? this.triggerContext.onEventTime(j) : this.triggerContext.onProcessingTime(j)) || this.emptySupplier.get().booleanValue()) {
            return;
        }
        collect(value);
    }

    @Override // org.apache.flink.table.runtime.operators.window.tvf.common.SyncStateWindowProcessor
    public void clearWindow(long j, TimeWindow timeWindow) throws Exception {
        this.windowFunction.cleanWindowIfNeeded(timeWindow, j);
    }

    @Override // org.apache.flink.table.runtime.operators.window.tvf.common.SyncStateWindowProcessor
    public void advanceProgress(long j) throws Exception {
    }

    @Override // org.apache.flink.table.runtime.operators.window.tvf.common.SyncStateWindowProcessor
    public void prepareCheckpoint() throws Exception {
    }

    @Override // org.apache.flink.table.runtime.operators.window.tvf.common.WindowProcessor
    public TypeSerializer<TimeWindow> createWindowSerializer() {
        return this.unsliceAssigner.getMergingWindowAssigner().getWindowSerializer(new ExecutionConfig());
    }

    @Override // org.apache.flink.table.runtime.operators.aggregate.window.processors.WindowAggProcessorBase
    protected WindowTimerService<TimeWindow> getWindowTimerService() {
        return new UnslicingWindowTimerServiceImpl(((SyncStateWindowProcessor.SyncStateContext) this.ctx).getTimerService(), this.shiftTimeZone);
    }
}
