package org.apache.flink.streaming.api.operators;

import java.util.function.Function;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
import org.apache.flink.runtime.source.coordinator.SourceCoordinatorProvider;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeServiceAware;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/streaming/api/operators/SourceOperatorFactory.class */
public class SourceOperatorFactory<OUT> extends AbstractStreamOperatorFactory<OUT> implements CoordinatedOperatorFactory<OUT>, ProcessingTimeServiceAware {
    private static final long serialVersionUID = 1;
    private final Source<OUT, ?, ?> source;
    private final WatermarkStrategy<OUT> watermarkStrategy;
    private final int numCoordinatorWorkerThread;

    public SourceOperatorFactory(Source<OUT, ?, ?> source, WatermarkStrategy<OUT> watermarkStrategy) {
        this(source, watermarkStrategy, 1);
    }

    public SourceOperatorFactory(Source<OUT, ?, ?> source, WatermarkStrategy<OUT> watermarkStrategy, int i) {
        this.source = (Source) Preconditions.checkNotNull(source);
        this.watermarkStrategy = (WatermarkStrategy) Preconditions.checkNotNull(watermarkStrategy);
        this.numCoordinatorWorkerThread = i;
    }

    @Override // org.apache.flink.streaming.api.operators.StreamOperatorFactory
    public <T extends StreamOperator<OUT>> T createStreamOperator(StreamOperatorParameters<OUT> streamOperatorParameters) {
        OperatorID operatorID = streamOperatorParameters.getStreamConfig().getOperatorID();
        OperatorEventGateway operatorEventGateway = streamOperatorParameters.getOperatorEventDispatcher().getOperatorEventGateway(operatorID);
        Source<OUT, ?, ?> source = this.source;
        source.getClass();
        SourceOperator instantiateSourceOperator = instantiateSourceOperator(source::createReader, operatorEventGateway, this.source.getSplitSerializer(), this.watermarkStrategy, streamOperatorParameters.getProcessingTimeService());
        instantiateSourceOperator.setup(streamOperatorParameters.getContainingTask(), streamOperatorParameters.getStreamConfig(), streamOperatorParameters.getOutput());
        streamOperatorParameters.getOperatorEventDispatcher().registerEventHandler(operatorID, instantiateSourceOperator);
        return instantiateSourceOperator;
    }

    @Override // org.apache.flink.streaming.api.operators.CoordinatedOperatorFactory
    public OperatorCoordinator.Provider getCoordinatorProvider(String str, OperatorID operatorID) {
        return new SourceCoordinatorProvider(str, operatorID, this.source, this.numCoordinatorWorkerThread);
    }

    @Override // org.apache.flink.streaming.api.operators.StreamOperatorFactory
    public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
        return SourceOperator.class;
    }

    @Override // org.apache.flink.streaming.api.operators.StreamOperatorFactory
    public boolean isStreamSource() {
        return true;
    }

    private static <T, SplitT extends SourceSplit> SourceOperator<T, SplitT> instantiateSourceOperator(Function<SourceReaderContext, SourceReader<T, ?>> function, OperatorEventGateway operatorEventGateway, SimpleVersionedSerializer<?> simpleVersionedSerializer, WatermarkStrategy<T> watermarkStrategy, ProcessingTimeService processingTimeService) {
        return new SourceOperator<>(function, operatorEventGateway, simpleVersionedSerializer, watermarkStrategy, processingTimeService);
    }
}
