/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.operators;

import org.apache.flink.api.common.eventtime.NoWatermarksGenerator;
import org.apache.flink.api.common.eventtime.TimestampAssigner;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.eventtime.WatermarkGenerator;
import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
import org.apache.flink.api.common.eventtime.WatermarkOutput;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.operators.ProcessingTimeService;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.api.operators.util.PausableRelativeClock;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.clock.RelativeClock;

public class TimestampsAndWatermarksOperator<T>
extends AbstractStreamOperator<T>
implements OneInputStreamOperator<T, T>,
ProcessingTimeService.ProcessingTimeCallback {
    private static final long serialVersionUID = 1L;
    private final WatermarkStrategy<T> watermarkStrategy;
    private transient TimestampAssigner<T> timestampAssigner;
    private transient WatermarkGenerator<T> watermarkGenerator;
    private transient WatermarkOutput wmOutput;
    private transient long watermarkInterval;
    private final boolean emitProgressiveWatermarks;
    private transient PausableRelativeClock inputActivityClock;

    public TimestampsAndWatermarksOperator(StreamOperatorParameters<T> parameters, WatermarkStrategy<T> watermarkStrategy, boolean emitProgressiveWatermarks) {
        super(parameters);
        this.watermarkStrategy = (WatermarkStrategy)Preconditions.checkNotNull(watermarkStrategy);
        this.emitProgressiveWatermarks = emitProgressiveWatermarks;
    }

    @Override
    public void open() throws Exception {
        super.open();
        this.inputActivityClock = new PausableRelativeClock(this.getProcessingTimeService().getClock());
        this.getContainingTask().getEnvironment().getMetricGroup().getIOMetricGroup().registerBackPressureListener(this.inputActivityClock);
        this.timestampAssigner = this.watermarkStrategy.createTimestampAssigner(this::getMetricGroup);
        this.watermarkGenerator = this.emitProgressiveWatermarks ? this.watermarkStrategy.createWatermarkGenerator(new WatermarkGeneratorSupplier.Context(){

            public MetricGroup getMetricGroup() {
                return TimestampsAndWatermarksOperator.this.getMetricGroup();
            }

            public RelativeClock getInputActivityClock() {
                return TimestampsAndWatermarksOperator.this.inputActivityClock;
            }
        }) : new NoWatermarksGenerator();
        this.wmOutput = new WatermarkEmitter(this.output);
        this.watermarkInterval = this.getExecutionConfig().getAutoWatermarkInterval();
        if (this.watermarkInterval > 0L && this.emitProgressiveWatermarks) {
            long now = this.getProcessingTimeService().getCurrentProcessingTime();
            this.getProcessingTimeService().registerTimer(now + this.watermarkInterval, this);
        }
    }

    @Override
    public void close() throws Exception {
        this.getContainingTask().getEnvironment().getMetricGroup().getIOMetricGroup().unregisterBackPressureListener(this.inputActivityClock);
        super.close();
    }

    @Override
    public void processElement(StreamRecord<T> element) throws Exception {
        T event = element.getValue();
        long previousTimestamp = element.hasTimestamp() ? element.getTimestamp() : Long.MIN_VALUE;
        long newTimestamp = this.timestampAssigner.extractTimestamp(event, previousTimestamp);
        element.setTimestamp(newTimestamp);
        this.output.collect(element);
        this.watermarkGenerator.onEvent(event, newTimestamp, this.wmOutput);
    }

    public void onProcessingTime(long timestamp) throws Exception {
        this.watermarkGenerator.onPeriodicEmit(this.wmOutput);
        long now = this.getProcessingTimeService().getCurrentProcessingTime();
        this.getProcessingTimeService().registerTimer(now + this.watermarkInterval, this);
    }

    @Override
    public void processWatermark(org.apache.flink.streaming.api.watermark.Watermark mark) throws Exception {
        if (mark.getTimestamp() == Long.MAX_VALUE) {
            this.wmOutput.emitWatermark(Watermark.MAX_WATERMARK);
        }
    }

    @Override
    public void processWatermarkStatus(WatermarkStatus watermarkStatus) throws Exception {
    }

    @Override
    public void finish() throws Exception {
        super.finish();
        this.watermarkGenerator.onPeriodicEmit(this.wmOutput);
    }

    public static final class WatermarkEmitter
    implements WatermarkOutput {
        private final Output<?> output;
        private long currentWatermark;
        private boolean idle;

        public WatermarkEmitter(Output<?> output) {
            this.output = output;
            this.currentWatermark = Long.MIN_VALUE;
        }

        public void emitWatermark(Watermark watermark) {
            long ts = watermark.getTimestamp();
            if (ts <= this.currentWatermark) {
                return;
            }
            this.currentWatermark = ts;
            this.markActive();
            this.output.emitWatermark(new org.apache.flink.streaming.api.watermark.Watermark(ts));
        }

        public void markIdle() {
            if (!this.idle) {
                this.idle = true;
                this.output.emitWatermarkStatus(WatermarkStatus.IDLE);
            }
        }

        public void markActive() {
            if (this.idle) {
                this.idle = false;
                this.output.emitWatermarkStatus(WatermarkStatus.ACTIVE);
            }
        }
    }
}

