package org.apache.flink.datastream.impl.extension.eventtime.functions;

import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.eventtime.WatermarksWithIdleness;
import org.apache.flink.api.common.operators.ProcessingTimeService;
import org.apache.flink.api.common.watermark.WatermarkDeclaration;
import org.apache.flink.api.common.watermark.WatermarkManager;
import org.apache.flink.datastream.api.common.Collector;
import org.apache.flink.datastream.api.context.PartitionedContext;
import org.apache.flink.datastream.api.extension.eventtime.EventTimeExtension;
import org.apache.flink.datastream.api.extension.eventtime.strategy.EventTimeWatermarkStrategy;
import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/datastream/impl/extension/eventtime/functions/ExtractEventTimeProcessFunction.class */
public class ExtractEventTimeProcessFunction<IN> implements OneInputStreamProcessFunction<IN, IN>, ProcessingTimeService.ProcessingTimeCallback {
    private final EventTimeWatermarkStrategy<IN> watermarkStrategy;
    private boolean enableIdleStatus;
    private WatermarksWithIdleness.IdlenessTimer idlenessTimer;
    private final long maxOutOfOrderTimeInMs;
    private org.apache.flink.streaming.runtime.tasks.ProcessingTimeService processingTimeService;
    private WatermarkManager watermarkManager;
    private long currentMaxEventTime = Long.MIN_VALUE;
    private long lastEmittedEventTime = Long.MIN_VALUE;
    private long periodicTimerInterval = 0;
    private boolean isIdleNow = false;

    public ExtractEventTimeProcessFunction(EventTimeWatermarkStrategy<IN> eventTimeWatermarkStrategy) {
        this.watermarkStrategy = eventTimeWatermarkStrategy;
        if (eventTimeWatermarkStrategy.getIdleTimeout().toMillis() > 0) {
            this.enableIdleStatus = true;
        }
        this.maxOutOfOrderTimeInMs = eventTimeWatermarkStrategy.getMaxOutOfOrderTime().toMillis();
    }

    public void initEventTimeExtension(ExecutionConfig executionConfig, WatermarkManager watermarkManager, org.apache.flink.streaming.runtime.tasks.ProcessingTimeService processingTimeService) {
        this.processingTimeService = processingTimeService;
        this.watermarkManager = watermarkManager;
        if (this.enableIdleStatus) {
            this.idlenessTimer = new WatermarksWithIdleness.IdlenessTimer(processingTimeService.getClock(), this.watermarkStrategy.getIdleTimeout());
        }
        boolean z = this.watermarkStrategy.getGenerateMode() == EventTimeWatermarkStrategy.EventTimeWatermarkGenerateMode.PERIODIC || this.enableIdleStatus;
        this.periodicTimerInterval = executionConfig.getAutoWatermarkInterval();
        if (this.watermarkStrategy.getGenerateMode() == EventTimeWatermarkStrategy.EventTimeWatermarkGenerateMode.PERIODIC && !this.watermarkStrategy.getPeriodicWatermarkInterval().isZero()) {
            this.periodicTimerInterval = this.watermarkStrategy.getPeriodicWatermarkInterval().toMillis();
        }
        Preconditions.checkState(this.periodicTimerInterval > 0, "Watermark interval " + this.periodicTimerInterval + " should large to 0.");
        if (z) {
            processingTimeService.registerTimer(processingTimeService.getCurrentProcessingTime() + this.periodicTimerInterval, this);
        }
    }

    public Set<? extends WatermarkDeclaration> declareWatermarks() {
        HashSet hashSet = new HashSet();
        hashSet.add(EventTimeExtension.EVENT_TIME_WATERMARK_DECLARATION);
        if (this.enableIdleStatus) {
            hashSet.add(EventTimeExtension.IDLE_STATUS_WATERMARK_DECLARATION);
        }
        return hashSet;
    }

    public void processRecord(IN in, Collector<IN> collector, PartitionedContext<IN> partitionedContext) throws Exception {
        if (this.enableIdleStatus) {
            if (this.isIdleNow) {
                this.watermarkManager.emitWatermark(EventTimeExtension.IDLE_STATUS_WATERMARK_DECLARATION.newWatermark(false));
                this.isIdleNow = false;
            }
            this.idlenessTimer.activity();
        }
        long extractTimestamp = this.watermarkStrategy.getEventTimeExtractor().extractTimestamp(in);
        this.currentMaxEventTime = Math.max(this.currentMaxEventTime, extractTimestamp);
        collector.collectAndOverwriteTimestamp(in, extractTimestamp);
        if (this.watermarkStrategy.getGenerateMode() == EventTimeWatermarkStrategy.EventTimeWatermarkGenerateMode.PER_EVENT) {
            tryEmitEventTimeWatermark(partitionedContext.getNonPartitionedContext().getWatermarkManager());
        }
    }

    public void onProcessingTime(long j) throws IOException, InterruptedException, Exception {
        if (this.enableIdleStatus && this.idlenessTimer.checkIfIdle()) {
            if (!this.isIdleNow) {
                this.watermarkManager.emitWatermark(EventTimeExtension.IDLE_STATUS_WATERMARK_DECLARATION.newWatermark(true));
                this.isIdleNow = true;
            }
        } else if (this.watermarkStrategy.getGenerateMode() == EventTimeWatermarkStrategy.EventTimeWatermarkGenerateMode.PERIODIC) {
            tryEmitEventTimeWatermark(this.watermarkManager);
        }
        this.processingTimeService.registerTimer(j + this.periodicTimerInterval, this);
    }

    private void tryEmitEventTimeWatermark(WatermarkManager watermarkManager) {
        if (this.currentMaxEventTime == Long.MIN_VALUE) {
            return;
        }
        long j = this.currentMaxEventTime - this.maxOutOfOrderTimeInMs;
        if (j > this.lastEmittedEventTime) {
            watermarkManager.emitWatermark(EventTimeExtension.EVENT_TIME_WATERMARK_DECLARATION.newWatermark(j));
            this.lastEmittedEventTime = j;
        }
    }
}
