package org.apache.flink.table.runtime.operators.window.tvf.common;

import java.util.Collections;
import java.util.Iterator;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Meter;
import org.apache.flink.metrics.MeterView;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.KeyContext;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.TimestampedCollector;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.operators.TableStreamOperator;
import org.apache.flink.table.runtime.operators.window.tvf.common.SyncStateWindowProcessor;
import org.apache.flink.util.Preconditions;

@Internal
/* loaded from: input_file:org/apache/flink/table/runtime/operators/window/tvf/common/WindowAggOperator.class */
public final class WindowAggOperator<K, W> extends TableStreamOperator<RowData> implements OneInputStreamOperator<RowData, RowData>, Triggerable<K, W>, KeyContext {
    private static final long serialVersionUID = 1;
    private static final String LATE_ELEMENTS_DROPPED_METRIC_NAME = "numLateRecordsDropped";
    private static final String LATE_ELEMENTS_DROPPED_RATE_METRIC_NAME = "lateRecordsDroppedRate";
    private static final String WATERMARK_LATENCY_METRIC_NAME = "watermarkLatency";
    private final SyncStateWindowProcessor<W> windowProcessor;
    private final boolean isEventTime;
    private transient TimestampedCollector<RowData> collector;
    private transient InternalTimerService<W> internalTimerService;
    private transient long lastTriggeredProcessingTime;
    private transient ListState<Long> watermarkState;
    private transient Counter numLateRecordsDropped;
    private transient Meter lateRecordsDroppedRate;
    private transient Gauge<Long> watermarkLatency;

    /* loaded from: input_file:org/apache/flink/table/runtime/operators/window/tvf/common/WindowAggOperator$WindowProcessorSyncStateContext.class */
    private static final class WindowProcessorSyncStateContext<W> implements SyncStateWindowProcessor.SyncStateContext<W> {
        private final Object operatorOwner;
        private final MemoryManager memoryManager;
        private final long memorySize;
        private final InternalTimerService<W> timerService;
        private final KeyedStateBackend<RowData> keyedStateBackend;
        private final Output<RowData> collector;
        private final RuntimeContext runtimeContext;

        private WindowProcessorSyncStateContext(Object obj, MemoryManager memoryManager, long j, InternalTimerService<W> internalTimerService, KeyedStateBackend<RowData> keyedStateBackend, Output<RowData> output, RuntimeContext runtimeContext) {
            this.operatorOwner = obj;
            this.memoryManager = memoryManager;
            this.memorySize = j;
            this.timerService = internalTimerService;
            this.keyedStateBackend = (KeyedStateBackend) Preconditions.checkNotNull(keyedStateBackend);
            this.collector = (Output) Preconditions.checkNotNull(output);
            this.runtimeContext = (RuntimeContext) Preconditions.checkNotNull(runtimeContext);
        }

        @Override // org.apache.flink.table.runtime.operators.window.tvf.common.WindowProcessor.Context
        public Object getOperatorOwner() {
            return this.operatorOwner;
        }

        @Override // org.apache.flink.table.runtime.operators.window.tvf.common.WindowProcessor.Context
        public MemoryManager getMemoryManager() {
            return this.memoryManager;
        }

        @Override // org.apache.flink.table.runtime.operators.window.tvf.common.WindowProcessor.Context
        public long getMemorySize() {
            return this.memorySize;
        }

        @Override // org.apache.flink.table.runtime.operators.window.tvf.common.SyncStateWindowProcessor.SyncStateContext
        public KeyedStateBackend<RowData> getKeyedStateBackend() {
            return this.keyedStateBackend;
        }

        @Override // org.apache.flink.table.runtime.operators.window.tvf.common.WindowProcessor.Context
        public InternalTimerService<W> getTimerService() {
            return this.timerService;
        }

        @Override // org.apache.flink.table.runtime.operators.window.tvf.common.WindowProcessor.Context
        public void output(RowData rowData) {
            this.collector.collect(rowData);
        }

        @Override // org.apache.flink.table.runtime.operators.window.tvf.common.WindowProcessor.Context
        public RuntimeContext getRuntimeContext() {
            return this.runtimeContext;
        }
    }

    public WindowAggOperator(SyncStateWindowProcessor<W> syncStateWindowProcessor, boolean z) {
        this.windowProcessor = syncStateWindowProcessor;
        this.isEventTime = z;
    }

    @Override // org.apache.flink.table.runtime.operators.TableStreamOperator
    public void open() throws Exception {
        super.open();
        this.lastTriggeredProcessingTime = Long.MIN_VALUE;
        this.collector = new TimestampedCollector<>(this.output);
        this.collector.eraseTimestamp();
        this.internalTimerService = getInternalTimerService("window-timers", this.windowProcessor.createWindowSerializer(), this);
        this.windowProcessor.open(new WindowProcessorSyncStateContext(getContainingTask(), getContainingTask().getEnvironment().getMemoryManager(), computeMemorySize(), this.internalTimerService, getKeyedStateBackend(), this.collector, getRuntimeContext()));
        this.windowProcessor.initializeWatermark(this.currentWatermark);
        this.numLateRecordsDropped = this.metrics.counter("numLateRecordsDropped");
        this.lateRecordsDroppedRate = this.metrics.meter(LATE_ELEMENTS_DROPPED_RATE_METRIC_NAME, new MeterView(this.numLateRecordsDropped));
        this.watermarkLatency = this.metrics.gauge(WATERMARK_LATENCY_METRIC_NAME, () -> {
            long currentWatermark = this.internalTimerService.currentWatermark();
            if (currentWatermark < 0) {
                return 0L;
            }
            return Long.valueOf(this.internalTimerService.currentProcessingTime() - currentWatermark);
        });
    }

    public void initializeState(StateInitializationContext stateInitializationContext) throws Exception {
        Iterable iterable;
        super.initializeState(stateInitializationContext);
        this.watermarkState = stateInitializationContext.getOperatorStateStore().getUnionListState(new ListStateDescriptor("watermark", LongSerializer.INSTANCE));
        if (!stateInitializationContext.isRestored() || (iterable = (Iterable) this.watermarkState.get()) == null) {
            return;
        }
        long j = Long.MAX_VALUE;
        Iterator it = iterable.iterator();
        while (it.hasNext()) {
            j = Math.min(((Long) it.next()).longValue(), j);
        }
        if (j != Long.MAX_VALUE) {
            this.currentWatermark = j;
        }
    }

    public void snapshotState(StateSnapshotContext stateSnapshotContext) throws Exception {
        super.snapshotState(stateSnapshotContext);
        this.watermarkState.update(Collections.singletonList(Long.valueOf(this.currentWatermark)));
    }

    public void close() throws Exception {
        super.close();
        this.collector = null;
        this.windowProcessor.close();
    }

    public void processElement(StreamRecord<RowData> streamRecord) throws Exception {
        RowData rowData = (RowData) streamRecord.getValue();
        if (this.windowProcessor.processElement((RowData) getCurrentKey(), rowData)) {
            this.lateRecordsDroppedRate.markEvent();
        }
    }

    @Override // org.apache.flink.table.runtime.operators.TableStreamOperator
    public void processWatermark(Watermark watermark) throws Exception {
        if (watermark.getTimestamp() <= this.currentWatermark) {
            super.processWatermark(new Watermark(this.currentWatermark));
            return;
        }
        if (this.isEventTime) {
            this.windowProcessor.advanceProgress(watermark.getTimestamp());
        }
        super.processWatermark(watermark);
    }

    public void onEventTime(InternalTimer<K, W> internalTimer) throws Exception {
        onTimer(internalTimer);
    }

    public void onProcessingTime(InternalTimer<K, W> internalTimer) throws Exception {
        if (internalTimer.getTimestamp() > this.lastTriggeredProcessingTime) {
            this.lastTriggeredProcessingTime = internalTimer.getTimestamp();
            this.windowProcessor.advanceProgress(internalTimer.getTimestamp());
        }
        onTimer(internalTimer);
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void onTimer(InternalTimer<K, W> internalTimer) throws Exception {
        setCurrentKey(internalTimer.getKey());
        Object namespace = internalTimer.getNamespace();
        this.windowProcessor.fireWindow(internalTimer.getTimestamp(), namespace);
        this.windowProcessor.clearWindow(internalTimer.getTimestamp(), namespace);
    }

    public void prepareSnapshotPreBarrier(long j) throws Exception {
        this.windowProcessor.prepareCheckpoint();
    }

    @VisibleForTesting
    public Counter getNumLateRecordsDropped() {
        return this.numLateRecordsDropped;
    }

    @VisibleForTesting
    public Gauge<Long> getWatermarkLatency() {
        return this.watermarkLatency;
    }
}
