package org.apache.flink.streaming.api.graph;

import java.io.IOException;
import java.io.Serializable;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.attribute.Attribute;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.dag.Pipeline;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.typeutils.MissingTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExternalizedCheckpointRetention;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.configuration.StateChangelogOptions;
import org.apache.flink.core.execution.CheckpointingMode;
import org.apache.flink.core.execution.JobStatusHook;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.memory.ManagedMemoryUseCase;
import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobType;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.jobgraph.tasks.TaskInvokable;
import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.util.Hardware;
import org.apache.flink.streaming.api.checkpoint.WithMasterCheckpointHook;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.lineage.LineageGraph;
import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
import org.apache.flink.streaming.api.operators.OutputFormatOperatorFactory;
import org.apache.flink.streaming.api.operators.SourceOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.operators.UdfStreamOperatorFactory;
import org.apache.flink.streaming.api.transformations.StreamExchangeMode;
import org.apache.flink.streaming.runtime.partitioner.ForwardForConsecutiveHashPartitioner;
import org.apache.flink.streaming.runtime.partitioner.ForwardForUnspecifiedPartitioner;
import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.flink.streaming.runtime.tasks.MultipleInputStreamTask;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
import org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask;
import org.apache.flink.streaming.runtime.tasks.SourceStreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamTaskException;
import org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask;
import org.apache.flink.streaming.runtime.watermark.AbstractInternalWatermarkDeclaration;
import org.apache.flink.streaming.util.watermark.WatermarkUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.OutputTag;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TernaryBoolean;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.apache.flink.util.concurrent.FutureUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/api/graph/StreamGraph.class */
public class StreamGraph implements Pipeline, ExecutionPlan {
    private static final long serialVersionUID = 1;
    private static final Logger LOG = LoggerFactory.getLogger(StreamGraph.class);
    private long initialClientHeartbeatTimeout;
    private final Configuration jobConfiguration;
    private transient ExecutionConfig executionConfig;
    private final CheckpointConfig checkpointConfig;
    private SavepointRestoreSettings savepointRestoreSettings;
    private GlobalStreamExchangeMode globalExchangeMode;
    private boolean enableCheckpointsAfterTasksFinish;
    private transient Map<Integer, StreamNode> streamNodes;
    private Set<Integer> sources;
    private Set<Integer> sinks;
    private transient Map<Integer, Tuple2<Integer, OutputTag>> virtualSideOutputNodes;
    private transient Map<Integer, Tuple3<Integer, StreamPartitioner<?>, StreamExchangeMode>> virtualPartitionNodes;
    protected Map<Integer, String> vertexIDtoBrokerID;
    protected Map<Integer, Long> vertexIDtoLoopTimeout;
    private transient StateBackend stateBackend;
    private transient CheckpointStorage checkpointStorage;
    private InternalTimeServiceManager.Provider timerServiceProvider;
    private transient LineageGraph lineageGraph;
    private Map<String, ResourceProfile> slotSharingGroupResources;
    private boolean dynamic;
    private boolean autoParallelismEnabled;
    private JobCheckpointingSettings checkpointingSettings;
    private boolean isEmpty;
    private UserDefinedObjectsHolder userDefinedObjectsHolder;
    private byte[] serializedWatermarkDeclarations;
    private boolean allVerticesInSameSlotSharingGroupByDefault = true;
    private JobType jobType = JobType.STREAMING;
    private PipelineOptions.VertexDescriptionMode descriptionMode = PipelineOptions.VertexDescriptionMode.TREE;
    private boolean vertexNameIncludeIndexPrefix = false;
    private final List<JobStatusHook> jobStatusHooks = new ArrayList();
    private final Map<StreamNode, StreamOperatorFactory<?>> nodeToHeadOperatorCache = new HashMap();
    private final List<PermanentBlobKey> userJarBlobKeys = new ArrayList();
    private final Map<String, DistributedCache.DistributedCacheEntry> userArtifacts = new HashMap();
    private List<URL> classpath = Collections.emptyList();
    private final List<Path> userJars = new ArrayList();
    private final Map<Integer, ResourceSpec> streamNodeMinResources = new HashMap();
    private JobID jobId = new JobID();
    private String jobName = "(unnamed job)";

    /* loaded from: input_file:org/apache/flink/streaming/api/graph/StreamGraph$UserDefinedObjectsHolder.class */
    private class UserDefinedObjectsHolder implements Serializable {
        private static final long serialVersionUID = 1;
        private final SerializedValue<Map<Integer, Tuple3<Integer, StreamPartitioner<?>, StreamExchangeMode>>> serializedVirtualPartitionNodes;
        private final SerializedValue<ExecutionConfig> serializedExecutionConfig;
        private SerializedValue<Map<Integer, StreamNode>> serializedStreamNodes;
        private Collection<Tuple2<Integer, SerializedValue<StreamOperatorFactory<?>>>> streamNodeToSerializedOperatorFactories;
        private final SerializedValue<Map<Integer, Tuple2<Integer, OutputTag>>> serializedVirtualSideOutputNodes;

        public UserDefinedObjectsHolder(Map<Integer, StreamNode> map, Map<Integer, Tuple2<Integer, OutputTag>> map2, Map<Integer, Tuple3<Integer, StreamPartitioner<?>, StreamExchangeMode>> map3, ExecutionConfig executionConfig, Executor executor) throws IOException {
            serializeStreamNodes(map, executor);
            this.serializedVirtualSideOutputNodes = new SerializedValue<>(map2);
            this.serializedVirtualPartitionNodes = new SerializedValue<>(map3);
            this.serializedExecutionConfig = new SerializedValue<>(executionConfig);
        }

        private void serializeStreamNodes(Map<Integer, StreamNode> map, Executor executor) {
            try {
                this.streamNodeToSerializedOperatorFactories = serializeOperatorFactories(map.values(), executor);
                this.serializedStreamNodes = new SerializedValue<>(map);
            } catch (Exception e) {
                throw new RuntimeException("Could not serialize stream nodes", e);
            }
        }

        private Collection<Tuple2<Integer, SerializedValue<StreamOperatorFactory<?>>>> serializeOperatorFactories(Collection<StreamNode> collection, Executor executor) throws Exception {
            return (Collection) FutureUtils.combineAll((List) collection.stream().filter(streamNode -> {
                return streamNode.getOperatorFactory() != null;
            }).map(streamNode2 -> {
                return serializeOperatorFactoriesAsync(executor, streamNode2);
            }).collect(Collectors.toList())).get();
        }

        private CompletableFuture<Tuple2<Integer, SerializedValue<StreamOperatorFactory<?>>>> serializeOperatorFactoriesAsync(Executor executor, StreamNode streamNode) {
            return CompletableFuture.supplyAsync(() -> {
                try {
                    return Tuple2.of(Integer.valueOf(streamNode.getId()), new SerializedValue(streamNode.getOperatorFactory()));
                } catch (Throwable th) {
                    throw new RuntimeException(String.format("Could not serialize stream node %s", streamNode), th);
                }
            }, executor);
        }

        private void deserialize(ClassLoader classLoader, Executor executor) throws Exception {
            Collection<Tuple2<Integer, StreamOperatorFactory<?>>> deserializeOperators = deserializeOperators(classLoader, executor);
            StreamGraph.this.virtualSideOutputNodes = (Map) this.serializedVirtualSideOutputNodes.deserializeValue(classLoader);
            StreamGraph.this.virtualPartitionNodes = (Map) this.serializedVirtualPartitionNodes.deserializeValue(classLoader);
            StreamGraph.this.executionConfig = (ExecutionConfig) this.serializedExecutionConfig.deserializeValue(classLoader);
            StreamGraph.this.streamNodes = (Map) this.serializedStreamNodes.deserializeValue(classLoader);
            deserializeOperators.forEach(tuple2 -> {
                StreamGraph.this.getStreamNode((Integer) tuple2.f0).setOperatorFactory((StreamOperatorFactory) tuple2.f1);
            });
        }

        private Collection<Tuple2<Integer, StreamOperatorFactory<?>>> deserializeOperators(ClassLoader classLoader, Executor executor) throws Exception {
            return (Collection) FutureUtils.combineAll((List) this.streamNodeToSerializedOperatorFactories.stream().map(tuple2 -> {
                return deserializeOperatorFactoriesAsync(classLoader, executor, tuple2);
            }).collect(Collectors.toList())).get();
        }

        private CompletableFuture<Tuple2<Integer, StreamOperatorFactory<?>>> deserializeOperatorFactoriesAsync(ClassLoader classLoader, Executor executor, Tuple2<Integer, SerializedValue<StreamOperatorFactory<?>>> tuple2) {
            return CompletableFuture.supplyAsync(() -> {
                try {
                    return Tuple2.of((Integer) tuple2.f0, (StreamOperatorFactory) ((SerializedValue) tuple2.f1).deserializeValue(classLoader));
                } catch (Throwable th) {
                    throw new RuntimeException(String.format("Could not deserialize stream node %s", tuple2.f0), th);
                }
            }, executor);
        }
    }

    public StreamGraph(Configuration configuration, ExecutionConfig executionConfig, CheckpointConfig checkpointConfig, SavepointRestoreSettings savepointRestoreSettings) {
        this.savepointRestoreSettings = SavepointRestoreSettings.none();
        this.jobConfiguration = new Configuration((Configuration) Preconditions.checkNotNull(configuration));
        this.executionConfig = (ExecutionConfig) Preconditions.checkNotNull(executionConfig);
        this.checkpointConfig = (CheckpointConfig) Preconditions.checkNotNull(checkpointConfig);
        this.savepointRestoreSettings = (SavepointRestoreSettings) Preconditions.checkNotNull(savepointRestoreSettings);
        clear();
    }

    public void clear() {
        this.streamNodes = new HashMap();
        this.virtualSideOutputNodes = new HashMap();
        this.virtualPartitionNodes = new HashMap();
        this.vertexIDtoBrokerID = new HashMap();
        this.vertexIDtoLoopTimeout = new HashMap();
        this.sources = new HashSet();
        this.sinks = new HashSet();
        this.slotSharingGroupResources = new HashMap();
    }

    public ExecutionConfig getExecutionConfig() {
        return this.executionConfig;
    }

    @Override // org.apache.flink.streaming.api.graph.ExecutionPlan
    public Configuration getJobConfiguration() {
        return this.jobConfiguration;
    }

    public CheckpointConfig getCheckpointConfig() {
        return this.checkpointConfig;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void cacheHeadOperatorForNode(StreamNode streamNode, StreamOperatorFactory<?> streamOperatorFactory) {
        this.nodeToHeadOperatorCache.put(streamNode, streamOperatorFactory);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public StreamOperatorFactory<?> getHeadOperatorForNodeFromCache(StreamNode streamNode) {
        return this.nodeToHeadOperatorCache.get(streamNode);
    }

    public CheckpointingMode getCheckpointingMode() {
        return getCheckpointingMode(this.checkpointConfig);
    }

    public static CheckpointingMode getCheckpointingMode(CheckpointConfig checkpointConfig) {
        CheckpointingMode checkpointingConsistencyMode = checkpointConfig.getCheckpointingConsistencyMode();
        Preconditions.checkArgument(checkpointingConsistencyMode == CheckpointingMode.EXACTLY_ONCE || checkpointingConsistencyMode == CheckpointingMode.AT_LEAST_ONCE, "Unexpected checkpointing mode.");
        return checkpointConfig.isCheckpointingEnabled() ? checkpointingConsistencyMode : CheckpointingMode.AT_LEAST_ONCE;
    }

    public void addJar(Path path) {
        if (path == null) {
            throw new IllegalArgumentException();
        }
        if (this.userJars.contains(path)) {
            return;
        }
        this.userJars.add(path);
    }

    @Override // org.apache.flink.streaming.api.graph.ExecutionPlan
    public List<Path> getUserJars() {
        return this.userJars;
    }

    public void createJobCheckpointingSettings() {
        this.checkpointingSettings = createJobCheckpointingSettingsInternal();
    }

    private JobCheckpointingSettings createJobCheckpointingSettingsInternal() {
        CheckpointRetentionPolicy checkpointRetentionPolicy;
        SerializedValue serializedValue;
        SerializedValue serializedValue2;
        SerializedValue serializedValue3;
        CheckpointConfig checkpointConfig = getCheckpointConfig();
        long checkpointInterval = checkpointConfig.getCheckpointInterval();
        if (checkpointInterval < 10) {
            checkpointInterval = Long.MAX_VALUE;
        }
        if (checkpointConfig.isExternalizedCheckpointsEnabled()) {
            ExternalizedCheckpointRetention externalizedCheckpointRetention = checkpointConfig.getExternalizedCheckpointRetention();
            if (externalizedCheckpointRetention == null) {
                throw new IllegalStateException("Externalized checkpoints enabled, but no cleanup mode configured.");
            }
            checkpointRetentionPolicy = externalizedCheckpointRetention.deleteOnCancellation() ? CheckpointRetentionPolicy.RETAIN_ON_FAILURE : CheckpointRetentionPolicy.RETAIN_ON_CANCELLATION;
        } else {
            checkpointRetentionPolicy = CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION;
        }
        ArrayList arrayList = new ArrayList();
        for (StreamNode streamNode : getStreamNodes()) {
            if (streamNode.getOperatorFactory() != null && (streamNode.getOperatorFactory() instanceof UdfStreamOperatorFactory)) {
                WithMasterCheckpointHook userFunction = ((UdfStreamOperatorFactory) streamNode.getOperatorFactory()).getUserFunction();
                if (userFunction instanceof WithMasterCheckpointHook) {
                    arrayList.add(new FunctionMasterCheckpointHookFactory(userFunction));
                }
            }
        }
        if (arrayList.isEmpty()) {
            serializedValue = null;
        } else {
            try {
                serializedValue = new SerializedValue((MasterTriggerRestoreHook.Factory[]) arrayList.toArray(new MasterTriggerRestoreHook.Factory[0]));
            } catch (IOException e) {
                throw new FlinkRuntimeException("Trigger/restore hook is not serializable", e);
            }
        }
        if (this.stateBackend == null) {
            serializedValue2 = null;
        } else {
            try {
                serializedValue2 = new SerializedValue(this.stateBackend);
            } catch (IOException e2) {
                throw new FlinkRuntimeException("State backend is not serializable", e2);
            }
        }
        if (this.checkpointStorage == null) {
            serializedValue3 = null;
        } else {
            try {
                serializedValue3 = new SerializedValue(this.checkpointStorage);
            } catch (IOException e3) {
                throw new FlinkRuntimeException("Checkpoint storage is not serializable", e3);
            }
        }
        return new JobCheckpointingSettings(CheckpointCoordinatorConfiguration.builder().setCheckpointInterval(checkpointInterval).setCheckpointIntervalDuringBacklog(checkpointConfig.getCheckpointIntervalDuringBacklog()).setCheckpointTimeout(checkpointConfig.getCheckpointTimeout()).setMinPauseBetweenCheckpoints(checkpointConfig.getMinPauseBetweenCheckpoints()).setMaxConcurrentCheckpoints(checkpointConfig.getMaxConcurrentCheckpoints()).setCheckpointRetentionPolicy(checkpointRetentionPolicy).setExactlyOnce(getCheckpointingMode() == CheckpointingMode.EXACTLY_ONCE).setTolerableCheckpointFailureNumber(checkpointConfig.getTolerableCheckpointFailureNumber()).setUnalignedCheckpointsEnabled(checkpointConfig.isUnalignedCheckpointsEnabled()).setCheckpointIdOfIgnoredInFlightData(checkpointConfig.getCheckpointIdOfIgnoredInFlightData()).setAlignedCheckpointTimeout(checkpointConfig.getAlignedCheckpointTimeout().toMillis()).setEnableCheckpointsAfterTasksFinish(isEnableCheckpointsAfterTasksFinish()).build(), serializedValue2, (TernaryBoolean) getJobConfiguration().getOptional(StateChangelogOptions.ENABLE_STATE_CHANGE_LOG).map((v0) -> {
            return TernaryBoolean.fromBoolean(v0);
        }).orElse(TernaryBoolean.UNDEFINED), serializedValue3, serializedValue, (TernaryBoolean) Optional.ofNullable(this.stateBackend).map((v0) -> {
            return v0.useManagedMemory();
        }).map((v0) -> {
            return TernaryBoolean.fromBoolean(v0);
        }).orElse(TernaryBoolean.UNDEFINED));
    }

    @Override // org.apache.flink.streaming.api.graph.ExecutionPlan
    public void setSavepointRestoreSettings(SavepointRestoreSettings savepointRestoreSettings) {
        this.savepointRestoreSettings = savepointRestoreSettings;
    }

    @Override // org.apache.flink.streaming.api.graph.ExecutionPlan
    public SerializedValue<ExecutionConfig> getSerializedExecutionConfig() {
        Preconditions.checkNotNull(this.userDefinedObjectsHolder);
        return this.userDefinedObjectsHolder.serializedExecutionConfig;
    }

    @Override // org.apache.flink.streaming.api.graph.ExecutionPlan
    public SavepointRestoreSettings getSavepointRestoreSettings() {
        return this.savepointRestoreSettings;
    }

    public String getJobName() {
        return this.jobName;
    }

    public void setJobName(String str) {
        this.jobName = str;
    }

    public LineageGraph getLineageGraph() {
        return this.lineageGraph;
    }

    public void setLineageGraph(LineageGraph lineageGraph) {
        this.lineageGraph = lineageGraph;
    }

    public void setStateBackend(StateBackend stateBackend) {
        this.stateBackend = stateBackend;
    }

    @VisibleForTesting
    public StateBackend getStateBackend() {
        return this.stateBackend;
    }

    public void setCheckpointStorage(CheckpointStorage checkpointStorage) {
        this.checkpointStorage = checkpointStorage;
    }

    public InternalTimeServiceManager.Provider getTimerServiceProvider() {
        return this.timerServiceProvider;
    }

    public void setTimerServiceProvider(InternalTimeServiceManager.Provider provider) {
        this.timerServiceProvider = (InternalTimeServiceManager.Provider) Preconditions.checkNotNull(provider);
    }

    public GlobalStreamExchangeMode getGlobalStreamExchangeMode() {
        return this.globalExchangeMode;
    }

    public void setGlobalStreamExchangeMode(GlobalStreamExchangeMode globalStreamExchangeMode) {
        this.globalExchangeMode = globalStreamExchangeMode;
    }

    public void setSlotSharingGroupResource(Map<String, ResourceProfile> map) {
        this.slotSharingGroupResources.putAll(map);
    }

    public Optional<ResourceProfile> getSlotSharingGroupResource(String str) {
        return Optional.ofNullable(this.slotSharingGroupResources.get(str));
    }

    public boolean hasFineGrainedResource() {
        return this.slotSharingGroupResources.values().stream().anyMatch(resourceProfile -> {
            return !resourceProfile.equals(ResourceProfile.UNKNOWN);
        });
    }

    public void setAllVerticesInSameSlotSharingGroupByDefault(boolean z) {
        this.allVerticesInSameSlotSharingGroupByDefault = z;
    }

    public boolean isAllVerticesInSameSlotSharingGroupByDefault() {
        return this.allVerticesInSameSlotSharingGroupByDefault;
    }

    public boolean isEnableCheckpointsAfterTasksFinish() {
        return this.enableCheckpointsAfterTasksFinish;
    }

    public void setEnableCheckpointsAfterTasksFinish(boolean z) {
        this.enableCheckpointsAfterTasksFinish = z;
    }

    public boolean isChainingEnabled() {
        return ((Boolean) this.jobConfiguration.get(PipelineOptions.OPERATOR_CHAINING)).booleanValue();
    }

    public boolean isChainingOfOperatorsWithDifferentMaxParallelismEnabled() {
        return ((Boolean) this.jobConfiguration.get(PipelineOptions.OPERATOR_CHAINING_CHAIN_OPERATORS_WITH_DIFFERENT_MAX_PARALLELISM)).booleanValue();
    }

    public boolean isIterative() {
        return !this.vertexIDtoLoopTimeout.isEmpty();
    }

    public <IN, OUT> void addSource(Integer num, @Nullable String str, @Nullable String str2, SourceOperatorFactory<OUT> sourceOperatorFactory, TypeInformation<IN> typeInformation, TypeInformation<OUT> typeInformation2, String str3) {
        addOperator(num, str, str2, sourceOperatorFactory, typeInformation, typeInformation2, str3, SourceOperatorStreamTask.class);
        this.sources.add(num);
    }

    public <IN, OUT> void addLegacySource(Integer num, @Nullable String str, @Nullable String str2, StreamOperatorFactory<OUT> streamOperatorFactory, TypeInformation<IN> typeInformation, TypeInformation<OUT> typeInformation2, String str3) {
        addOperator(num, str, str2, streamOperatorFactory, typeInformation, typeInformation2, str3);
        this.sources.add(num);
    }

    public <IN, OUT> void addSink(Integer num, @Nullable String str, @Nullable String str2, StreamOperatorFactory<OUT> streamOperatorFactory, TypeInformation<IN> typeInformation, TypeInformation<OUT> typeInformation2, String str3) {
        addOperator(num, str, str2, streamOperatorFactory, typeInformation, typeInformation2, str3);
        if (streamOperatorFactory instanceof OutputFormatOperatorFactory) {
            setOutputFormat(num, ((OutputFormatOperatorFactory) streamOperatorFactory).getOutputFormat());
        }
        this.sinks.add(num);
    }

    public <IN, OUT> void addOperator(Integer num, @Nullable String str, @Nullable String str2, StreamOperatorFactory<OUT> streamOperatorFactory, TypeInformation<IN> typeInformation, TypeInformation<OUT> typeInformation2, String str3) {
        addOperator(num, str, str2, streamOperatorFactory, typeInformation, typeInformation2, str3, streamOperatorFactory.isStreamSource() ? SourceStreamTask.class : OneInputStreamTask.class);
    }

    private <IN, OUT> void addOperator(Integer num, @Nullable String str, @Nullable String str2, StreamOperatorFactory<OUT> streamOperatorFactory, TypeInformation<IN> typeInformation, TypeInformation<OUT> typeInformation2, String str3, Class<? extends TaskInvokable> cls) {
        addNode(num, str, str2, cls, streamOperatorFactory, str3);
        setSerializers(num, createSerializer(typeInformation), null, createSerializer(typeInformation2));
        if (streamOperatorFactory.isOutputTypeConfigurable() && typeInformation2 != null) {
            streamOperatorFactory.setOutputType(typeInformation2, this.executionConfig);
        }
        if (streamOperatorFactory.isInputTypeConfigurable()) {
            streamOperatorFactory.setInputType(typeInformation, this.executionConfig);
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Vertex: {}", num);
        }
    }

    public <IN1, IN2, OUT> void addCoOperator(Integer num, String str, @Nullable String str2, StreamOperatorFactory<OUT> streamOperatorFactory, TypeInformation<IN1> typeInformation, TypeInformation<IN2> typeInformation2, TypeInformation<OUT> typeInformation3, String str3) {
        addNode(num, str, str2, TwoInputStreamTask.class, streamOperatorFactory, str3);
        setSerializers(num, typeInformation.createSerializer(this.executionConfig.getSerializerConfig()), typeInformation2.createSerializer(this.executionConfig.getSerializerConfig()), createSerializer(typeInformation3));
        if (streamOperatorFactory.isOutputTypeConfigurable()) {
            streamOperatorFactory.setOutputType(typeInformation3, this.executionConfig);
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("CO-TASK: {}", num);
        }
    }

    public <OUT> void addMultipleInputOperator(Integer num, String str, @Nullable String str2, StreamOperatorFactory<OUT> streamOperatorFactory, List<TypeInformation<?>> list, TypeInformation<OUT> typeInformation, String str3) {
        addNode(num, str, str2, MultipleInputStreamTask.class, streamOperatorFactory, str3);
        setSerializers(num, list, createSerializer(typeInformation));
        if (streamOperatorFactory.isOutputTypeConfigurable()) {
            streamOperatorFactory.setOutputType(typeInformation, this.executionConfig);
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("CO-TASK: {}", num);
        }
    }

    protected StreamNode addNode(Integer num, @Nullable String str, @Nullable String str2, Class<? extends TaskInvokable> cls, @Nullable StreamOperatorFactory<?> streamOperatorFactory, String str3) {
        if (this.streamNodes.containsKey(num)) {
            throw new RuntimeException("Duplicate vertexID " + num);
        }
        StreamNode streamNode = new StreamNode(num, str, str2, streamOperatorFactory, str3, cls);
        this.streamNodes.put(num, streamNode);
        this.isEmpty = false;
        return streamNode;
    }

    public void addVirtualSideOutputNode(Integer num, Integer num2, OutputTag outputTag) {
        if (this.virtualSideOutputNodes.containsKey(num2)) {
            throw new IllegalStateException("Already has virtual output node with id " + num2);
        }
        for (Tuple2<Integer, OutputTag> tuple2 : this.virtualSideOutputNodes.values()) {
            if (((Integer) tuple2.f0).equals(num) && ((OutputTag) tuple2.f1).getId().equals(outputTag.getId()) && !((OutputTag) tuple2.f1).getTypeInfo().equals(outputTag.getTypeInfo())) {
                throw new IllegalArgumentException("Trying to add a side output for the same side-output id with a different type. This is not allowed. Side-output ID: " + ((OutputTag) tuple2.f1).getId());
            }
        }
        this.virtualSideOutputNodes.put(num2, new Tuple2<>(num, outputTag));
    }

    public void addVirtualPartitionNode(Integer num, Integer num2, StreamPartitioner<?> streamPartitioner, StreamExchangeMode streamExchangeMode) {
        if (this.virtualPartitionNodes.containsKey(num2)) {
            throw new IllegalStateException("Already has virtual partition node with id " + num2);
        }
        this.virtualPartitionNodes.put(num2, new Tuple3<>(num, streamPartitioner, streamExchangeMode));
    }

    public String getSlotSharingGroup(Integer num) {
        return this.virtualSideOutputNodes.containsKey(num) ? getSlotSharingGroup((Integer) this.virtualSideOutputNodes.get(num).f0) : this.virtualPartitionNodes.containsKey(num) ? getSlotSharingGroup((Integer) this.virtualPartitionNodes.get(num).f0) : getStreamNode(num).getSlotSharingGroup();
    }

    public void addEdge(Integer num, Integer num2, int i) {
        addEdge(num, num2, i, null);
    }

    public void addEdge(Integer num, Integer num2, int i, IntermediateDataSetID intermediateDataSetID) {
        addEdgeInternal(num, num2, i, null, new ArrayList(), null, null, intermediateDataSetID);
    }

    private void addEdgeInternal(Integer num, Integer num2, int i, StreamPartitioner<?> streamPartitioner, List<String> list, OutputTag outputTag, StreamExchangeMode streamExchangeMode, IntermediateDataSetID intermediateDataSetID) {
        if (this.virtualSideOutputNodes.containsKey(num)) {
            int intValue = num.intValue();
            Integer num3 = (Integer) this.virtualSideOutputNodes.get(Integer.valueOf(intValue)).f0;
            if (outputTag == null) {
                outputTag = (OutputTag) this.virtualSideOutputNodes.get(Integer.valueOf(intValue)).f1;
            }
            addEdgeInternal(num3, num2, i, streamPartitioner, null, outputTag, streamExchangeMode, intermediateDataSetID);
            return;
        }
        if (!this.virtualPartitionNodes.containsKey(num)) {
            createActualEdge(num, num2, i, streamPartitioner, outputTag, streamExchangeMode, intermediateDataSetID);
            return;
        }
        int intValue2 = num.intValue();
        Integer num4 = (Integer) this.virtualPartitionNodes.get(Integer.valueOf(intValue2)).f0;
        if (streamPartitioner == null) {
            streamPartitioner = (StreamPartitioner) this.virtualPartitionNodes.get(Integer.valueOf(intValue2)).f1;
        }
        addEdgeInternal(num4, num2, i, streamPartitioner, list, outputTag, (StreamExchangeMode) this.virtualPartitionNodes.get(Integer.valueOf(intValue2)).f2, intermediateDataSetID);
    }

    private void createActualEdge(Integer num, Integer num2, int i, StreamPartitioner<?> streamPartitioner, OutputTag outputTag, StreamExchangeMode streamExchangeMode, IntermediateDataSetID intermediateDataSetID) {
        StreamNode streamNode = getStreamNode(num);
        StreamNode streamNode2 = getStreamNode(num2);
        if (streamPartitioner == null && streamNode.getParallelism() == streamNode2.getParallelism()) {
            streamPartitioner = this.dynamic ? new ForwardForUnspecifiedPartitioner<>() : new ForwardPartitioner<>();
        } else if (streamPartitioner == null) {
            streamPartitioner = new RebalancePartitioner();
        }
        if ((streamPartitioner instanceof ForwardPartitioner) && streamNode.getParallelism() != streamNode2.getParallelism()) {
            if (!(streamPartitioner instanceof ForwardForConsecutiveHashPartitioner)) {
                throw new UnsupportedOperationException("Forward partitioning does not allow change of parallelism. Upstream operation: " + streamNode + " parallelism: " + streamNode.getParallelism() + ", downstream operation: " + streamNode2 + " parallelism: " + streamNode2.getParallelism() + " You must use another partitioning strategy, such as broadcast, rebalance, shuffle or global.");
            }
            streamPartitioner = ((ForwardForConsecutiveHashPartitioner) streamPartitioner).getHashPartitioner();
        }
        if (streamExchangeMode == null) {
            streamExchangeMode = StreamExchangeMode.UNDEFINED;
        }
        StreamEdge streamEdge = new StreamEdge(streamNode, streamNode2, i, streamPartitioner, outputTag, streamExchangeMode, getStreamEdges(streamNode.getId(), streamNode2.getId()).size(), intermediateDataSetID);
        getStreamNode(Integer.valueOf(streamEdge.getSourceId())).addOutEdge(streamEdge);
        getStreamNode(Integer.valueOf(streamEdge.getTargetId())).addInEdge(streamEdge);
    }

    public void setParallelism(Integer num, int i) {
        if (getStreamNode(num) != null) {
            getStreamNode(num).setParallelism(Integer.valueOf(i));
        }
    }

    @Override // org.apache.flink.streaming.api.graph.ExecutionPlan
    public boolean isDynamic() {
        return this.dynamic;
    }

    @Override // org.apache.flink.streaming.api.graph.ExecutionPlan
    public JobCheckpointingSettings getCheckpointingSettings() {
        if (this.checkpointingSettings == null) {
            createJobCheckpointingSettings();
        }
        return this.checkpointingSettings;
    }

    @Override // org.apache.flink.streaming.api.graph.ExecutionPlan
    public boolean isEmpty() {
        return this.streamNodes == null ? this.isEmpty : this.streamNodes.isEmpty();
    }

    public void setParallelism(Integer num, int i, boolean z) {
        if (getStreamNode(num) != null) {
            getStreamNode(num).setParallelism(Integer.valueOf(i), z);
        }
    }

    public void setDynamic(boolean z) {
        this.dynamic = z;
    }

    public void setMaxParallelism(int i, int i2) {
        if (getStreamNode(Integer.valueOf(i)) != null) {
            getStreamNode(Integer.valueOf(i)).setMaxParallelism(i2);
        }
    }

    public void setResources(int i, ResourceSpec resourceSpec, ResourceSpec resourceSpec2) {
        if (getStreamNode(Integer.valueOf(i)) != null) {
            getStreamNode(Integer.valueOf(i)).setResources(resourceSpec, resourceSpec2);
            this.streamNodeMinResources.put(Integer.valueOf(i), resourceSpec);
        }
    }

    public void setManagedMemoryUseCaseWeights(int i, Map<ManagedMemoryUseCase, Integer> map, Set<ManagedMemoryUseCase> set) {
        if (getStreamNode(Integer.valueOf(i)) != null) {
            getStreamNode(Integer.valueOf(i)).setManagedMemoryUseCaseWeights(map, set);
        }
    }

    public void setOneInputStateKey(Integer num, KeySelector<?, ?> keySelector, TypeSerializer<?> typeSerializer) {
        StreamNode streamNode = getStreamNode(num);
        streamNode.setStatePartitioners(keySelector);
        streamNode.setStateKeySerializer(typeSerializer);
    }

    public void setTwoInputStateKey(Integer num, KeySelector<?, ?> keySelector, KeySelector<?, ?> keySelector2, TypeSerializer<?> typeSerializer) {
        StreamNode streamNode = getStreamNode(num);
        streamNode.setStatePartitioners(keySelector, keySelector2);
        streamNode.setStateKeySerializer(typeSerializer);
    }

    public void setMultipleInputStateKey(Integer num, List<KeySelector<?, ?>> list, TypeSerializer<?> typeSerializer) {
        StreamNode streamNode = getStreamNode(num);
        streamNode.setStatePartitioners((KeySelector[]) list.stream().toArray(i -> {
            return new KeySelector[i];
        }));
        streamNode.setStateKeySerializer(typeSerializer);
    }

    public void setBufferTimeout(Integer num, long j) {
        if (getStreamNode(num) != null) {
            getStreamNode(num).setBufferTimeout(Long.valueOf(j));
        }
    }

    public void setSerializers(Integer num, TypeSerializer<?> typeSerializer, TypeSerializer<?> typeSerializer2, TypeSerializer<?> typeSerializer3) {
        StreamNode streamNode = getStreamNode(num);
        streamNode.setSerializersIn(typeSerializer, typeSerializer2);
        streamNode.setSerializerOut(typeSerializer3);
    }

    private <OUT> void setSerializers(Integer num, List<TypeInformation<?>> list, TypeSerializer<OUT> typeSerializer) {
        StreamNode streamNode = getStreamNode(num);
        streamNode.setSerializersIn((TypeSerializer[]) list.stream().map(typeInformation -> {
            return typeInformation.createSerializer(this.executionConfig.getSerializerConfig());
        }).toArray(i -> {
            return new TypeSerializer[i];
        }));
        streamNode.setSerializerOut(typeSerializer);
    }

    public void setInputFormat(Integer num, InputFormat<?, ?> inputFormat) {
        getStreamNode(num).setInputFormat(inputFormat);
    }

    public void setOutputFormat(Integer num, OutputFormat<?> outputFormat) {
        getStreamNode(num).setOutputFormat(outputFormat);
    }

    public void setTransformationUID(Integer num, String str) {
        StreamNode streamNode = this.streamNodes.get(num);
        if (streamNode != null) {
            streamNode.setTransformationUID(str);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setTransformationUserHash(Integer num, String str) {
        StreamNode streamNode = this.streamNodes.get(num);
        if (streamNode != null) {
            streamNode.setUserHash(str);
        }
    }

    public StreamNode getStreamNode(Integer num) {
        return this.streamNodes.get(num);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Collection<? extends Integer> getVertexIDs() {
        return this.streamNodes.keySet();
    }

    @VisibleForTesting
    public List<StreamEdge> getStreamEdges(int i) {
        return getStreamNode(Integer.valueOf(i)).getOutEdges();
    }

    public List<StreamEdge> getStreamEdges(int i, int i2) {
        ArrayList arrayList = new ArrayList();
        for (StreamEdge streamEdge : getStreamNode(Integer.valueOf(i)).getOutEdges()) {
            if (streamEdge.getTargetId() == i2) {
                arrayList.add(streamEdge);
            }
        }
        return arrayList;
    }

    @VisibleForTesting
    @Deprecated
    public List<StreamEdge> getStreamEdgesOrThrow(int i, int i2) {
        List<StreamEdge> streamEdges = getStreamEdges(i, i2);
        if (streamEdges.isEmpty()) {
            throw new RuntimeException("No such edge in stream graph: " + i + " -> " + i2);
        }
        return streamEdges;
    }

    public Collection<Integer> getSourceIDs() {
        return this.sources;
    }

    public Collection<Integer> getSinkIDs() {
        return this.sinks;
    }

    public Collection<StreamNode> getStreamNodes() {
        return this.streamNodes.values();
    }

    public String getBrokerID(Integer num) {
        return this.vertexIDtoBrokerID.get(num);
    }

    public long getLoopTimeout(Integer num) {
        return this.vertexIDtoLoopTimeout.get(num).longValue();
    }

    public StreamNode getSourceVertex(StreamEdge streamEdge) {
        return this.streamNodes.get(Integer.valueOf(streamEdge.getSourceId()));
    }

    public StreamNode getTargetVertex(StreamEdge streamEdge) {
        return this.streamNodes.get(Integer.valueOf(streamEdge.getTargetId()));
    }

    @VisibleForTesting
    public JobGraph getJobGraph() {
        return getJobGraph(Thread.currentThread().getContextClassLoader(), this.jobId);
    }

    public JobGraph getJobGraph(ClassLoader classLoader) {
        return getJobGraph(classLoader, this.jobId);
    }

    public JobGraph getJobGraph(ClassLoader classLoader, @Nullable JobID jobID) {
        return StreamingJobGraphGenerator.createJobGraph(classLoader, this, jobID);
    }

    public String getStreamingPlanAsJSON() {
        try {
            return new JSONGenerator(this).getJSON();
        } catch (Exception e) {
            throw new RuntimeException("JSON plan creation failed", e);
        }
    }

    private <T> TypeSerializer<T> createSerializer(TypeInformation<T> typeInformation) {
        if (typeInformation == null || (typeInformation instanceof MissingTypeInfo)) {
            return null;
        }
        return typeInformation.createSerializer(this.executionConfig.getSerializerConfig());
    }

    public void setJobType(JobType jobType) {
        this.jobType = jobType;
    }

    @Override // org.apache.flink.streaming.api.graph.ExecutionPlan
    public String getName() {
        return this.jobName;
    }

    @Override // org.apache.flink.streaming.api.graph.ExecutionPlan
    public JobType getJobType() {
        return this.jobType;
    }

    public boolean isAutoParallelismEnabled() {
        return this.autoParallelismEnabled;
    }

    public void setAutoParallelismEnabled(boolean z) {
        this.autoParallelismEnabled = z;
    }

    public PipelineOptions.VertexDescriptionMode getVertexDescriptionMode() {
        return this.descriptionMode;
    }

    public void setVertexDescriptionMode(PipelineOptions.VertexDescriptionMode vertexDescriptionMode) {
        this.descriptionMode = vertexDescriptionMode;
    }

    public void setVertexNameIncludeIndexPrefix(boolean z) {
        this.vertexNameIncludeIndexPrefix = z;
    }

    public boolean isVertexNameIncludeIndexPrefix() {
        return this.vertexNameIncludeIndexPrefix;
    }

    public void registerJobStatusHook(JobStatusHook jobStatusHook) {
        Preconditions.checkNotNull(jobStatusHook, "Registering a null JobStatusHook is not allowed. ");
        if (this.jobStatusHooks.contains(jobStatusHook)) {
            return;
        }
        this.jobStatusHooks.add(jobStatusHook);
    }

    public List<JobStatusHook> getJobStatusHooks() {
        return this.jobStatusHooks;
    }

    public void setSupportsConcurrentExecutionAttempts(Integer num, boolean z) {
        StreamNode streamNode = getStreamNode(num);
        if (streamNode != null) {
            streamNode.setSupportsConcurrentExecutionAttempts(z);
        }
    }

    public void setAttribute(Integer num, Attribute attribute) {
        if (getStreamNode(num) != null) {
            getStreamNode(num).setAttribute(attribute);
        }
    }

    public void setJobId(JobID jobID) {
        this.jobId = jobID;
    }

    @Override // org.apache.flink.streaming.api.graph.ExecutionPlan
    public JobID getJobID() {
        return this.jobId;
    }

    public void setClasspath(List<URL> list) {
        this.classpath = list;
    }

    public List<URL> getClasspath() {
        return this.classpath;
    }

    public void addJars(List<URL> list) {
        Iterator<URL> it = list.iterator();
        while (it.hasNext()) {
            try {
                addJar(new Path(it.next().toURI()));
            } catch (URISyntaxException e) {
                throw new RuntimeException("URL is invalid. This should not happen.", e);
            }
        }
    }

    @Override // org.apache.flink.streaming.api.graph.ExecutionPlan
    public List<PermanentBlobKey> getUserJarBlobKeys() {
        return this.userJarBlobKeys;
    }

    @Override // org.apache.flink.streaming.api.graph.ExecutionPlan
    public List<URL> getClasspaths() {
        return this.classpath;
    }

    public void addUserArtifact(String str, DistributedCache.DistributedCacheEntry distributedCacheEntry) {
        if (distributedCacheEntry == null) {
            throw new IllegalArgumentException();
        }
        this.userArtifacts.putIfAbsent(str, distributedCacheEntry);
    }

    @Override // org.apache.flink.streaming.api.graph.ExecutionPlan
    public Map<String, DistributedCache.DistributedCacheEntry> getUserArtifacts() {
        return this.userArtifacts;
    }

    @Override // org.apache.flink.streaming.api.graph.ExecutionPlan
    public void addUserJarBlobKey(PermanentBlobKey permanentBlobKey) {
        if (permanentBlobKey == null) {
            throw new IllegalArgumentException();
        }
        if (this.userJarBlobKeys.contains(permanentBlobKey)) {
            return;
        }
        this.userJarBlobKeys.add(permanentBlobKey);
    }

    @Override // org.apache.flink.streaming.api.graph.ExecutionPlan
    public void setUserArtifactBlobKey(String str, PermanentBlobKey permanentBlobKey) throws IOException {
        byte[] serializeObject = InstantiationUtil.serializeObject(permanentBlobKey);
        this.userArtifacts.computeIfPresent(str, (str2, distributedCacheEntry) -> {
            return new DistributedCache.DistributedCacheEntry(distributedCacheEntry.filePath, distributedCacheEntry.isExecutable, serializeObject, distributedCacheEntry.isZipped);
        });
    }

    @Override // org.apache.flink.streaming.api.graph.ExecutionPlan
    public void writeUserArtifactEntriesToConfiguration() {
        for (Map.Entry<String, DistributedCache.DistributedCacheEntry> entry : this.userArtifacts.entrySet()) {
            DistributedCache.writeFileInfoToConfig(entry.getKey(), entry.getValue(), this.jobConfiguration);
        }
    }

    @Override // org.apache.flink.streaming.api.graph.ExecutionPlan
    public int getMaximumParallelism() {
        int i = -1;
        Iterator<StreamNode> it = this.streamNodes.values().iterator();
        while (it.hasNext()) {
            i = Math.max(it.next().getParallelism(), i);
        }
        return i;
    }

    public void setInitialClientHeartbeatTimeout(long j) {
        this.initialClientHeartbeatTimeout = j;
    }

    @Override // org.apache.flink.streaming.api.graph.ExecutionPlan
    public long getInitialClientHeartbeatTimeout() {
        return this.initialClientHeartbeatTimeout;
    }

    @Override // org.apache.flink.streaming.api.graph.ExecutionPlan
    public boolean isPartialResourceConfigured() {
        boolean z = false;
        boolean z2 = false;
        Iterator<ResourceSpec> it = this.streamNodeMinResources.values().iterator();
        while (it.hasNext()) {
            if (it.next() == ResourceSpec.UNKNOWN) {
                z = true;
            } else {
                z2 = true;
            }
            if (z && z2) {
                return true;
            }
        }
        return false;
    }

    public void serializeUserDefinedInstances() throws IOException {
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(Math.max(1, Math.min(Hardware.getNumberCPUCores(), getExecutionConfig().getParallelism())), new ExecutorThreadFactory("flink-operator-serialization-io"));
        try {
            this.userDefinedObjectsHolder = new UserDefinedObjectsHolder(this.streamNodes, this.virtualSideOutputNodes, this.virtualPartitionNodes, this.executionConfig, newFixedThreadPool);
        } finally {
            newFixedThreadPool.shutdown();
        }
    }

    public void deserializeUserDefinedInstances(ClassLoader classLoader, Executor executor) throws Exception {
        this.userDefinedObjectsHolder.deserialize(classLoader, executor);
    }

    public List<StreamNode> getStreamNodesSortedTopologicallyFromSources() throws InvalidProgramException {
        if (this.streamNodes.isEmpty()) {
            return Collections.emptyList();
        }
        ArrayList arrayList = new ArrayList(this.streamNodes.size());
        LinkedHashSet linkedHashSet = new LinkedHashSet(this.streamNodes.values());
        Iterator<Integer> it = this.sources.iterator();
        while (it.hasNext()) {
            StreamNode streamNode = getStreamNode(it.next());
            arrayList.add(streamNode);
            linkedHashSet.remove(streamNode);
        }
        int i = 0;
        while (!linkedHashSet.isEmpty()) {
            if (i >= arrayList.size()) {
                throw new InvalidProgramException("The stream graph is cyclic.");
            }
            int i2 = i;
            i++;
            addNodesThatHaveNoNewPredecessors(arrayList.get(i2), arrayList, linkedHashSet);
        }
        return arrayList;
    }

    private void addNodesThatHaveNoNewPredecessors(StreamNode streamNode, List<StreamNode> list, Set<StreamNode> set) {
        for (StreamEdge streamEdge : streamNode.getOutEdges()) {
            StreamNode streamNode2 = getStreamNode(Integer.valueOf(streamEdge.getTargetId()));
            if (set.contains(streamNode2)) {
                boolean z = false;
                Iterator<StreamEdge> it = streamNode2.getInEdges().iterator();
                while (true) {
                    if (!it.hasNext()) {
                        break;
                    }
                    StreamEdge next = it.next();
                    if (next != streamEdge && set.contains(getStreamNode(Integer.valueOf(next.getSourceId())))) {
                        z = true;
                        break;
                    }
                }
                if (!z) {
                    list.add(streamNode2);
                    set.remove(streamNode2);
                    addNodesThatHaveNoNewPredecessors(streamNode2, list, set);
                }
            }
        }
    }

    public void serializeAndSaveWatermarkDeclarations() {
        Set<AbstractInternalWatermarkDeclaration<?>> internalWatermarkDeclarationsFromStreamGraph = WatermarkUtils.getInternalWatermarkDeclarationsFromStreamGraph(this);
        if (internalWatermarkDeclarationsFromStreamGraph.isEmpty()) {
            return;
        }
        try {
            this.serializedWatermarkDeclarations = InstantiationUtil.serializeObject(internalWatermarkDeclarationsFromStreamGraph);
        } catch (IOException e) {
            throw new StreamTaskException("Could not serialize watermark declarations.", e);
        }
    }

    public byte[] getSerializedWatermarkDeclarations() {
        return this.serializedWatermarkDeclarations;
    }

    public String toString() {
        return "StreamGraph(jobId: " + this.jobId + ")";
    }
}
