package org.apache.flink.streaming.api.operators;

import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.watermark.WatermarkDeclaration;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
import org.apache.flink.runtime.source.coordinator.SourceCoordinatorProvider;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeServiceAware;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.util.watermark.WatermarkUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.FunctionWithException;

/* loaded from: input_file:org/apache/flink/streaming/api/operators/SourceOperatorFactory.class */
public class SourceOperatorFactory<OUT> extends AbstractStreamOperatorFactory<OUT> implements CoordinatedOperatorFactory<OUT>, ProcessingTimeServiceAware {
    private static final long serialVersionUID = 1;
    private final Source<OUT, ?, ?> source;
    private final WatermarkStrategy<OUT> watermarkStrategy;
    private final boolean emitProgressiveWatermarks;
    private final int numCoordinatorWorkerThread;

    @Nullable
    private String coordinatorListeningID;

    public SourceOperatorFactory(Source<OUT, ?, ?> source, WatermarkStrategy<OUT> watermarkStrategy) {
        this(source, watermarkStrategy, true, 1);
    }

    public SourceOperatorFactory(Source<OUT, ?, ?> source, WatermarkStrategy<OUT> watermarkStrategy, boolean z) {
        this(source, watermarkStrategy, z, 1);
    }

    public SourceOperatorFactory(Source<OUT, ?, ?> source, WatermarkStrategy<OUT> watermarkStrategy, boolean z, int i) {
        this.source = (Source) Preconditions.checkNotNull(source);
        this.watermarkStrategy = (WatermarkStrategy) Preconditions.checkNotNull(watermarkStrategy);
        this.emitProgressiveWatermarks = z;
        this.numCoordinatorWorkerThread = i;
    }

    public Boundedness getBoundedness() {
        return this.source.getBoundedness();
    }

    public void setCoordinatorListeningID(@Nullable String str) {
        this.coordinatorListeningID = str;
    }

    @Override // org.apache.flink.streaming.api.operators.StreamOperatorFactory
    public <T extends StreamOperator<OUT>> T createStreamOperator(StreamOperatorParameters<OUT> streamOperatorParameters) {
        OperatorID operatorID = streamOperatorParameters.getStreamConfig().getOperatorID();
        OperatorEventGateway operatorEventGateway = streamOperatorParameters.getOperatorEventDispatcher().getOperatorEventGateway(operatorID);
        Source<OUT, ?, ?> source = this.source;
        Objects.requireNonNull(source);
        SourceOperator instantiateSourceOperator = instantiateSourceOperator(streamOperatorParameters, source::createReader, operatorEventGateway, this.source.getSplitSerializer(), this.watermarkStrategy, streamOperatorParameters.getProcessingTimeService(), streamOperatorParameters.getContainingTask().getEnvironment().getTaskManagerInfo().getConfiguration(), streamOperatorParameters.getContainingTask().getEnvironment().getTaskManagerInfo().getTaskManagerExternalAddress(), this.emitProgressiveWatermarks, streamOperatorParameters.getContainingTask().getCanEmitBatchOfRecords(), getSourceWatermarkDeclarations());
        streamOperatorParameters.getOperatorEventDispatcher().registerEventHandler(operatorID, instantiateSourceOperator);
        return instantiateSourceOperator;
    }

    @Override // org.apache.flink.streaming.api.operators.CoordinatedOperatorFactory
    public OperatorCoordinator.Provider getCoordinatorProvider(String str, OperatorID operatorID) {
        return new SourceCoordinatorProvider(str, operatorID, this.source, this.numCoordinatorWorkerThread, this.watermarkStrategy.getAlignmentParameters(), this.coordinatorListeningID);
    }

    @Override // org.apache.flink.streaming.api.operators.StreamOperatorFactory
    public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
        return SourceOperator.class;
    }

    @Override // org.apache.flink.streaming.api.operators.StreamOperatorFactory
    public boolean isStreamSource() {
        return true;
    }

    @Override // org.apache.flink.streaming.api.operators.StreamOperatorFactory
    public boolean isOutputTypeConfigurable() {
        return this.source instanceof OutputTypeConfigurable;
    }

    @Override // org.apache.flink.streaming.api.operators.StreamOperatorFactory
    public void setOutputType(TypeInformation<OUT> typeInformation, ExecutionConfig executionConfig) {
        if (this.source instanceof OutputTypeConfigurable) {
            ((OutputTypeConfigurable) this.source).setOutputType(typeInformation, executionConfig);
        }
    }

    public Set<? extends WatermarkDeclaration> getSourceWatermarkDeclarations() {
        return this.source.declareWatermarks();
    }

    private static <T, SplitT extends SourceSplit> SourceOperator<T, SplitT> instantiateSourceOperator(StreamOperatorParameters<T> streamOperatorParameters, FunctionWithException<SourceReaderContext, SourceReader<T, ?>, Exception> functionWithException, OperatorEventGateway operatorEventGateway, SimpleVersionedSerializer<?> simpleVersionedSerializer, WatermarkStrategy<T> watermarkStrategy, ProcessingTimeService processingTimeService, Configuration configuration, String str, boolean z, StreamTask.CanEmitBatchOfRecordsChecker canEmitBatchOfRecordsChecker, Collection<? extends WatermarkDeclaration> collection) {
        return new SourceOperator<>(streamOperatorParameters, functionWithException, operatorEventGateway, simpleVersionedSerializer, watermarkStrategy, processingTimeService, configuration, str, z, canEmitBatchOfRecordsChecker, (Map) WatermarkUtils.convertToInternalWatermarkDeclarations(new HashSet(collection)).stream().collect(Collectors.toMap((v0) -> {
            return v0.getIdentifier();
        }, (v0) -> {
            return v0.isAligned();
        })));
    }
}
