package org.apache.flink.streaming.api.operators;

import java.time.Duration;
import java.util.Objects;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.OperatorChain;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;

@Internal
@Deprecated
/* loaded from: input_file:org/apache/flink/streaming/api/operators/StreamSource.class */
public class StreamSource<OUT, SRC extends SourceFunction<OUT>> extends AbstractUdfStreamOperator<OUT, SRC> {
    private static final long serialVersionUID = 1;
    private final boolean emitProgressiveWatermarks;
    private transient SourceFunction.SourceContext<OUT> ctx;
    private volatile transient boolean canceledOrStopped;

    public StreamSource(SRC src, boolean z) {
        super(src);
        this.canceledOrStopped = false;
        this.emitProgressiveWatermarks = z;
    }

    public StreamSource(SRC src) {
        this(src, true);
    }

    @VisibleForTesting
    public boolean emitsProgressiveWatermarks() {
        return this.emitProgressiveWatermarks;
    }

    public void run(Object obj, OperatorChain<?, ?> operatorChain) throws Exception {
        run(obj, this.output, operatorChain);
    }

    public void run(Object obj, Output<StreamRecord<OUT>> output, OperatorChain<?, ?> operatorChain) throws Exception {
        long latencyTrackingInterval = getExecutionConfig().isLatencyTrackingConfigured() ? getExecutionConfig().getLatencyTrackingInterval() : ((Duration) getContainingTask().getEnvironment().getTaskManagerInfo().getConfiguration().get(MetricOptions.LATENCY_INTERVAL)).toMillis();
        LatencyMarkerEmitter latencyMarkerEmitter = null;
        if (latencyTrackingInterval > 0) {
            ProcessingTimeService processingTimeService = getProcessingTimeService();
            Objects.requireNonNull(output);
            latencyMarkerEmitter = new LatencyMarkerEmitter(processingTimeService, output::emitLatencyMarker, latencyTrackingInterval, getOperatorID(), getRuntimeContext().getTaskInfo().getIndexOfThisSubtask());
        }
        this.ctx = StreamSourceContexts.getSourceContext(getProcessingTimeService(), obj, output, getExecutionConfig().getAutoWatermarkInterval(), -1L, this.emitProgressiveWatermarks);
        try {
            ((SourceFunction) this.userFunction).run(this.ctx);
            if (latencyMarkerEmitter != null) {
                latencyMarkerEmitter.close();
            }
        } catch (Throwable th) {
            if (latencyMarkerEmitter != null) {
                latencyMarkerEmitter.close();
            }
            throw th;
        }
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator, org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public void close() throws Exception {
        if (this.ctx != null) {
            this.ctx.close();
        }
        super.close();
    }

    public void stop() {
        ((SourceFunction) this.userFunction).cancel();
    }

    public void cancel() {
        markCanceledOrStopped();
        ((SourceFunction) this.userFunction).cancel();
        if (this.ctx != null) {
            this.ctx.close();
        }
    }

    protected void markCanceledOrStopped() {
        this.canceledOrStopped = true;
    }

    protected boolean isCanceledOrStopped() {
        return this.canceledOrStopped;
    }
}
