/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.deduplicate.window.processors;

import java.time.ZoneId;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.runtime.state.internal.InternalValueState;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.operators.aggregate.window.buffers.WindowBuffer;
import org.apache.flink.table.runtime.operators.window.tvf.common.SyncStateWindowProcessor;
import org.apache.flink.table.runtime.operators.window.tvf.common.WindowTimerService;
import org.apache.flink.table.runtime.operators.window.tvf.slicing.SlicingSyncStateWindowProcessor;
import org.apache.flink.table.runtime.operators.window.tvf.slicing.SlicingWindowTimerServiceImpl;
import org.apache.flink.table.runtime.operators.window.tvf.state.WindowValueState;
import org.apache.flink.table.runtime.util.TimeWindowUtil;

public final class RowTimeSyncStateWindowDeduplicateProcessor
implements SlicingSyncStateWindowProcessor<Long> {
    private static final long serialVersionUID = 1L;
    private final WindowBuffer.Factory bufferFactory;
    private final TypeSerializer<RowData> inputSerializer;
    private final int windowEndIndex;
    private final ZoneId shiftTimeZone;
    private transient long currentProgress;
    private transient SyncStateWindowProcessor.SyncStateContext<Long> ctx;
    private transient WindowTimerService<Long> windowTimerService;
    private transient WindowBuffer windowBuffer;
    private transient WindowValueState<Long> windowState;

    public RowTimeSyncStateWindowDeduplicateProcessor(TypeSerializer<RowData> inputSerializer, WindowBuffer.Factory bufferFactory, int windowEndIndex, ZoneId shiftTimeZone) {
        this.inputSerializer = inputSerializer;
        this.bufferFactory = bufferFactory;
        this.windowEndIndex = windowEndIndex;
        this.shiftTimeZone = shiftTimeZone;
    }

    @Override
    public void open(SyncStateWindowProcessor.SyncStateContext<Long> context) throws Exception {
        this.ctx = context;
        LongSerializer namespaceSerializer = LongSerializer.INSTANCE;
        ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor("window_deduplicate", this.inputSerializer);
        ValueState state = (ValueState)this.ctx.getKeyedStateBackend().getOrCreateKeyedState((TypeSerializer)namespaceSerializer, (StateDescriptor)valueStateDescriptor);
        this.windowTimerService = new SlicingWindowTimerServiceImpl((InternalTimerService<Long>)this.ctx.getTimerService(), this.shiftTimeZone);
        this.windowState = new WindowValueState((InternalValueState)state);
        this.windowBuffer = this.bufferFactory.create(this.ctx.getOperatorOwner(), this.ctx.getMemoryManager(), this.ctx.getMemorySize(), this.ctx.getRuntimeContext(), this.windowTimerService, this.ctx.getKeyedStateBackend(), this.windowState, true, this.shiftTimeZone);
        this.currentProgress = Long.MIN_VALUE;
    }

    @Override
    public void initializeWatermark(long watermark) {
        this.currentProgress = watermark;
    }

    @Override
    public boolean processElement(RowData key, RowData element) throws Exception {
        long sliceEnd = element.getLong(this.windowEndIndex);
        if (TimeWindowUtil.isWindowFired(sliceEnd, this.currentProgress, this.shiftTimeZone)) {
            return true;
        }
        this.windowBuffer.addElement(key, sliceEnd, element);
        return false;
    }

    @Override
    public void advanceProgress(long progress) throws Exception {
        if (progress > this.currentProgress) {
            this.currentProgress = progress;
            this.windowBuffer.advanceProgress(this.currentProgress);
        }
    }

    @Override
    public void prepareCheckpoint() throws Exception {
        this.windowBuffer.flush();
    }

    @Override
    public void clearWindow(long timerTimestamp, Long windowEnd) throws Exception {
        this.windowState.clear(windowEnd);
    }

    @Override
    public void close() throws Exception {
        if (this.windowBuffer != null) {
            this.windowBuffer.close();
        }
    }

    @Override
    public TypeSerializer<Long> createWindowSerializer() {
        return LongSerializer.INSTANCE;
    }

    @Override
    public void fireWindow(long timerTimestamp, Long windowEnd) throws Exception {
        RowData data = this.windowState.value(windowEnd);
        if (data != null) {
            this.ctx.output(data);
        }
    }
}

