package org.apache.flink.table.runtime.operators.aggregate.window.processors;

import java.time.ZoneId;
import java.util.TimeZone;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.utils.JoinedRowData;
import org.apache.flink.table.runtime.dataview.PerWindowStateDataViewStore;
import org.apache.flink.table.runtime.generated.GeneratedNamespaceAggsHandleFunction;
import org.apache.flink.table.runtime.generated.NamespaceAggsHandleFunction;
import org.apache.flink.table.runtime.operators.aggregate.window.buffers.WindowBuffer;
import org.apache.flink.table.runtime.operators.window.combines.WindowCombineFunction;
import org.apache.flink.table.runtime.operators.window.slicing.ClockService;
import org.apache.flink.table.runtime.operators.window.slicing.SliceAssigner;
import org.apache.flink.table.runtime.operators.window.slicing.SlicingWindowProcessor;
import org.apache.flink.table.runtime.operators.window.slicing.WindowTimerService;
import org.apache.flink.table.runtime.operators.window.slicing.WindowTimerServiceImpl;
import org.apache.flink.table.runtime.operators.window.state.WindowValueState;
import org.apache.flink.table.runtime.util.TimeWindowUtil;

/* loaded from: input_file:org/apache/flink/table/runtime/operators/aggregate/window/processors/AbstractWindowAggProcessor.class */
public abstract class AbstractWindowAggProcessor implements SlicingWindowProcessor<Long> {
    private static final long serialVersionUID = 1;
    protected final GeneratedNamespaceAggsHandleFunction<Long> genAggsHandler;
    protected final WindowBuffer.Factory windowBufferFactory;
    protected final WindowCombineFunction.Factory combineFactory;
    protected final SliceAssigner sliceAssigner;
    protected final TypeSerializer<RowData> accSerializer;
    protected final boolean isEventTime;
    protected final long windowInterval;
    protected final ZoneId shiftTimeZone;
    protected final boolean useDayLightSaving;
    protected transient long currentProgress;
    private transient long nextTriggerProgress;
    protected transient SlicingWindowProcessor.Context<Long> ctx;
    protected transient ClockService clockService;
    protected transient WindowTimerService<Long> windowTimerService;
    protected transient NamespaceAggsHandleFunction<Long> aggregator;
    protected transient WindowBuffer windowBuffer;
    protected transient WindowValueState<Long> windowState;
    protected transient JoinedRowData reuseOutput;

    public AbstractWindowAggProcessor(GeneratedNamespaceAggsHandleFunction<Long> generatedNamespaceAggsHandleFunction, WindowBuffer.Factory factory, WindowCombineFunction.Factory factory2, SliceAssigner sliceAssigner, TypeSerializer<RowData> typeSerializer, ZoneId zoneId) {
        this.genAggsHandler = generatedNamespaceAggsHandleFunction;
        this.windowBufferFactory = factory;
        this.combineFactory = factory2;
        this.sliceAssigner = sliceAssigner;
        this.accSerializer = typeSerializer;
        this.isEventTime = sliceAssigner.isEventTime();
        this.windowInterval = sliceAssigner.getSliceEndInterval();
        this.shiftTimeZone = zoneId;
        this.useDayLightSaving = TimeZone.getTimeZone(zoneId).useDaylightTime();
    }

    @Override // org.apache.flink.table.runtime.operators.window.slicing.SlicingWindowProcessor
    public void open(SlicingWindowProcessor.Context<Long> context) throws Exception {
        this.ctx = context;
        LongSerializer longSerializer = LongSerializer.INSTANCE;
        this.windowState = new WindowValueState<>((ValueState) this.ctx.getKeyedStateBackend().getOrCreateKeyedState(longSerializer, new ValueStateDescriptor("window-aggs", this.accSerializer)));
        this.clockService = ClockService.of(this.ctx.getTimerService());
        this.windowTimerService = new WindowTimerServiceImpl(this.ctx.getTimerService(), this.shiftTimeZone);
        this.aggregator = (NamespaceAggsHandleFunction) this.genAggsHandler.newInstance(this.ctx.getRuntimeContext().getUserCodeClassLoader());
        this.aggregator.open(new PerWindowStateDataViewStore(this.ctx.getKeyedStateBackend(), longSerializer, this.ctx.getRuntimeContext()));
        this.windowBuffer = this.windowBufferFactory.create(this.ctx.getOperatorOwner(), this.ctx.getMemoryManager(), this.ctx.getMemorySize(), this.combineFactory.create(this.ctx.getRuntimeContext(), this.windowTimerService, this.ctx.getKeyedStateBackend(), this.windowState, this.isEventTime), this.shiftTimeZone);
        this.reuseOutput = new JoinedRowData();
        this.currentProgress = Long.MIN_VALUE;
        this.nextTriggerProgress = Long.MIN_VALUE;
    }

    @Override // org.apache.flink.table.runtime.operators.window.slicing.SlicingWindowProcessor
    public boolean processElement(RowData rowData, RowData rowData2) throws Exception {
        long assignSliceEnd = this.sliceAssigner.assignSliceEnd(rowData2, this.clockService);
        if (!this.isEventTime) {
            this.windowTimerService.registerProcessingTimeWindowTimer(Long.valueOf(assignSliceEnd));
        }
        if (!this.isEventTime || !TimeWindowUtil.isWindowFired(assignSliceEnd, this.currentProgress, this.shiftTimeZone)) {
            this.windowBuffer.addElement(rowData, assignSliceEnd, rowData2);
            return false;
        }
        if (TimeWindowUtil.isWindowFired(this.sliceAssigner.getLastWindowEnd(assignSliceEnd), this.currentProgress, this.shiftTimeZone)) {
            return true;
        }
        this.windowBuffer.addElement(rowData, sliceStateMergeTarget(assignSliceEnd), rowData2);
        long j = assignSliceEnd;
        while (true) {
            long j2 = j;
            if (!TimeWindowUtil.isWindowFired(j2, this.currentProgress, this.shiftTimeZone)) {
                this.windowTimerService.registerEventTimeWindowTimer(Long.valueOf(j2));
                return false;
            }
            j = j2 + this.windowInterval;
        }
    }

    protected abstract long sliceStateMergeTarget(long j) throws Exception;

    @Override // org.apache.flink.table.runtime.operators.window.slicing.SlicingWindowProcessor
    public void advanceProgress(long j) throws Exception {
        if (j > this.currentProgress) {
            this.currentProgress = j;
            if (this.currentProgress >= this.nextTriggerProgress) {
                this.windowBuffer.advanceProgress(this.currentProgress);
                this.nextTriggerProgress = TimeWindowUtil.getNextTriggerWatermark(this.currentProgress, this.windowInterval, this.shiftTimeZone, this.useDayLightSaving);
            }
        }
    }

    @Override // org.apache.flink.table.runtime.operators.window.slicing.SlicingWindowProcessor
    public void prepareCheckpoint() throws Exception {
        this.windowBuffer.flush();
    }

    @Override // org.apache.flink.table.runtime.operators.window.slicing.SlicingWindowProcessor
    public void clearWindow(Long l) throws Exception {
        for (Long l2 : this.sliceAssigner.expiredSlices(l.longValue())) {
            this.windowState.clear(l2);
            this.aggregator.cleanup(l2);
        }
    }

    @Override // org.apache.flink.table.runtime.operators.window.slicing.SlicingWindowProcessor
    public void close() throws Exception {
        if (this.aggregator != null) {
            this.aggregator.close();
        }
        if (this.windowBuffer != null) {
            this.windowBuffer.close();
        }
    }

    @Override // org.apache.flink.table.runtime.operators.window.slicing.SlicingWindowProcessor
    public TypeSerializer<Long> createWindowSerializer() {
        return LongSerializer.INSTANCE;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void collect(RowData rowData) {
        this.reuseOutput.replace((RowData) this.ctx.getKeyedStateBackend().getCurrentKey(), rowData);
        this.ctx.output(this.reuseOutput);
    }
}
