/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.wmassigners;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.functions.DefaultOpenContext;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.functions.util.FunctionUtils;
import org.apache.flink.api.common.operators.ProcessingTimeService;
import org.apache.flink.runtime.metrics.TimerGauge;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.api.operators.util.PausableRelativeClock;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.generated.WatermarkGenerator;
import org.apache.flink.util.Preconditions;

public class WatermarkAssignerOperator
extends AbstractStreamOperator<RowData>
implements OneInputStreamOperator<RowData, RowData>,
ProcessingTimeService.ProcessingTimeCallback {
    private static final long serialVersionUID = 1L;
    private final int rowtimeFieldIndex;
    private final long idleTimeout;
    private final WatermarkGenerator watermarkGenerator;
    private transient long lastWatermark;
    private transient long watermarkInterval;
    private transient long timerInterval;
    private transient long currentWatermark;
    private transient long lastWatermarkPeriodicEmitTime;
    private transient long timeSinceLastIdleCheck;
    private transient WatermarkStatus currentStatus = WatermarkStatus.ACTIVE;
    private transient long processedElements;
    private transient long lastIdleCheckProcessedElements = -1L;
    private transient PausableRelativeClock inputActivityClock;

    public WatermarkAssignerOperator(StreamOperatorParameters<RowData> parameters, int rowtimeFieldIndex, WatermarkGenerator watermarkGenerator, long idleTimeout, ProcessingTimeService processingTimeService) {
        super(parameters);
        this.rowtimeFieldIndex = rowtimeFieldIndex;
        this.watermarkGenerator = watermarkGenerator;
        this.idleTimeout = idleTimeout;
        this.processingTimeService = (ProcessingTimeService)Preconditions.checkNotNull((Object)processingTimeService);
    }

    public void open() throws Exception {
        long now;
        super.open();
        this.inputActivityClock = new PausableRelativeClock(this.getProcessingTimeService().getClock());
        this.getContainingTask().getEnvironment().getMetricGroup().getIOMetricGroup().registerBackPressureListener((TimerGauge.StartStopListener)this.inputActivityClock);
        this.currentWatermark = 0L;
        this.watermarkInterval = this.getExecutionConfig().getAutoWatermarkInterval();
        this.lastWatermarkPeriodicEmitTime = now = this.getProcessingTimeService().getCurrentProcessingTime();
        this.timeSinceLastIdleCheck = now;
        if (this.watermarkInterval > 0L || this.idleTimeout > 0L) {
            this.timerInterval = WatermarkAssignerOperator.calculateProcessingTimeTimerInterval(this.watermarkInterval, this.idleTimeout);
            this.getProcessingTimeService().registerTimer(now + this.timerInterval, (ProcessingTimeService.ProcessingTimeCallback)this);
        }
        FunctionUtils.setFunctionRuntimeContext((Function)this.watermarkGenerator, (RuntimeContext)this.getRuntimeContext());
        FunctionUtils.openFunction((Function)this.watermarkGenerator, (OpenContext)DefaultOpenContext.INSTANCE);
    }

    public void processElement(StreamRecord<RowData> element) throws Exception {
        RowData row;
        ++this.processedElements;
        if (this.isIdlenessEnabled() && this.currentStatus.equals((Object)WatermarkStatus.IDLE)) {
            this.emitWatermarkStatus(WatermarkStatus.ACTIVE);
        }
        if ((row = (RowData)element.getValue()).isNullAt(this.rowtimeFieldIndex)) {
            throw new RuntimeException("RowTime field should not be null, please convert it to a non-null long value.");
        }
        Long watermark = this.watermarkGenerator.currentWatermark(row);
        if (watermark != null) {
            this.currentWatermark = Math.max(this.currentWatermark, watermark);
        }
        this.output.collect(element);
        if (this.currentWatermark - this.lastWatermark > this.watermarkInterval) {
            this.advanceWatermark();
        }
    }

    private void advanceWatermark() {
        if (this.currentWatermark > this.lastWatermark) {
            this.lastWatermark = this.currentWatermark;
            this.output.emitWatermark(new Watermark(this.currentWatermark));
        }
    }

    public void onProcessingTime(long timestamp) throws Exception {
        long now = this.getProcessingTimeService().getCurrentProcessingTime();
        long inputActivityNow = this.inputActivityClock.relativeTimeMillis();
        if (this.watermarkInterval > 0L && this.lastWatermarkPeriodicEmitTime + this.watermarkInterval <= now) {
            this.lastWatermarkPeriodicEmitTime = now;
            this.advanceWatermark();
        }
        if (this.processedElements != this.lastIdleCheckProcessedElements) {
            this.timeSinceLastIdleCheck = inputActivityNow;
            this.lastIdleCheckProcessedElements = this.processedElements;
        }
        if (this.isIdlenessEnabled() && this.currentStatus.equals((Object)WatermarkStatus.ACTIVE) && this.timeSinceLastIdleCheck + this.idleTimeout <= inputActivityNow) {
            this.emitWatermarkStatus(WatermarkStatus.IDLE);
        }
        this.getProcessingTimeService().registerTimer(now + this.timerInterval, (ProcessingTimeService.ProcessingTimeCallback)this);
    }

    public void processWatermark(Watermark mark) throws Exception {
        if (mark.getTimestamp() == Long.MAX_VALUE && this.currentWatermark != Long.MAX_VALUE) {
            if (this.isIdlenessEnabled() && this.currentStatus.equals((Object)WatermarkStatus.IDLE)) {
                this.emitWatermarkStatus(WatermarkStatus.ACTIVE);
            }
            this.currentWatermark = Long.MAX_VALUE;
            this.output.emitWatermark(mark);
        }
    }

    public void processWatermarkStatus(WatermarkStatus watermarkStatus) throws Exception {
        this.emitWatermarkStatus(watermarkStatus);
    }

    private void emitWatermarkStatus(WatermarkStatus watermarkStatus) {
        this.currentStatus = watermarkStatus;
        this.output.emitWatermarkStatus(watermarkStatus);
    }

    public void finish() throws Exception {
        this.processWatermark(Watermark.MAX_WATERMARK);
    }

    public void close() throws Exception {
        this.getContainingTask().getEnvironment().getMetricGroup().getIOMetricGroup().unregisterBackPressureListener((TimerGauge.StartStopListener)this.inputActivityClock);
        FunctionUtils.closeFunction((Function)this.watermarkGenerator);
        super.close();
    }

    private boolean isIdlenessEnabled() {
        return this.idleTimeout > 0L;
    }

    @VisibleForTesting
    static long calculateProcessingTimeTimerInterval(long watermarkInterval, long idleTimeout) {
        long largerInterval;
        Preconditions.checkArgument((watermarkInterval > 0L || idleTimeout > 0L ? 1 : 0) != 0);
        if (watermarkInterval <= 0L) {
            return idleTimeout;
        }
        if (idleTimeout <= 0L) {
            return watermarkInterval;
        }
        long smallerInterval = Math.min(watermarkInterval, idleTimeout);
        long timerInterval = smallerInterval * 5L < (largerInterval = Math.max(watermarkInterval, idleTimeout)) ? smallerInterval : smallerInterval / 5L;
        return Math.max(timerInterval, 1L);
    }
}

