package org.apache.flink.streaming.api.graph;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.core.memory.ManagedMemoryUseCase;
import org.apache.flink.runtime.OperatorIDPair;
import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.InputOutputFormatContainer;
import org.apache.flink.runtime.jobgraph.InputOutputFormatVertex;
import org.apache.flink.runtime.jobgraph.JobEdge;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.jobgraph.topology.DefaultLogicalPipelinedRegion;
import org.apache.flink.runtime.jobgraph.topology.DefaultLogicalTopology;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.util.TaskConfig;
import org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.checkpoint.WithMasterCheckpointHook;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.InputSelectable;
import org.apache.flink.streaming.api.operators.InternalTimerServiceSerializationProxy;
import org.apache.flink.streaming.api.operators.SourceOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.operators.UdfStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.YieldingOperatorFactory;
import org.apache.flink.streaming.api.transformations.ShuffleMode;
import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
import org.apache.flink.streaming.runtime.partitioner.RescalePartitioner;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.flink.streaming.runtime.tasks.StreamIterationHead;
import org.apache.flink.streaming.runtime.tasks.StreamIterationTail;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.class */
public class StreamingJobGraphGenerator {
    private static final Logger LOG = LoggerFactory.getLogger(StreamingJobGraphGenerator.class);
    private static final long DEFAULT_NETWORK_BUFFER_TIMEOUT = 100;
    public static final long UNDEFINED_NETWORK_BUFFER_TIMEOUT = -1;
    private final StreamGraph streamGraph;
    private final JobGraph jobGraph;
    private final StreamGraphHasher defaultStreamGraphHasher = new StreamGraphHasherV2();
    private final List<StreamGraphHasher> legacyStreamGraphHashers = Arrays.asList(new StreamGraphUserHashHasher());
    private final Map<Integer, JobVertex> jobVertices = new HashMap();
    private final Collection<Integer> builtVertices = new HashSet();
    private final Map<Integer, Map<Integer, StreamConfig>> chainedConfigs = new HashMap();
    private final Map<Integer, StreamConfig> vertexConfigs = new HashMap();
    private final Map<Integer, String> chainedNames = new HashMap();
    private final Map<Integer, ResourceSpec> chainedMinResources = new HashMap();
    private final Map<Integer, ResourceSpec> chainedPreferredResources = new HashMap();
    private final Map<Integer, InputOutputFormatContainer> chainedInputOutputFormats = new HashMap();
    private final List<StreamEdge> physicalEdgesInOrder = new ArrayList();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$flink$streaming$api$transformations$ShuffleMode;
        static final /* synthetic */ int[] $SwitchMap$org$apache$flink$streaming$api$graph$GlobalDataExchangeMode;
        static final /* synthetic */ int[] $SwitchMap$org$apache$flink$streaming$api$operators$ChainingStrategy = new int[ChainingStrategy.values().length];

        static {
            try {
                $SwitchMap$org$apache$flink$streaming$api$operators$ChainingStrategy[ChainingStrategy.NEVER.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$flink$streaming$api$operators$ChainingStrategy[ChainingStrategy.ALWAYS.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$flink$streaming$api$operators$ChainingStrategy[ChainingStrategy.HEAD.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$flink$streaming$api$operators$ChainingStrategy[ChainingStrategy.HEAD_WITH_SOURCES.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            $SwitchMap$org$apache$flink$streaming$api$graph$GlobalDataExchangeMode = new int[GlobalDataExchangeMode.values().length];
            try {
                $SwitchMap$org$apache$flink$streaming$api$graph$GlobalDataExchangeMode[GlobalDataExchangeMode.ALL_EDGES_BLOCKING.ordinal()] = 1;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$org$apache$flink$streaming$api$graph$GlobalDataExchangeMode[GlobalDataExchangeMode.FORWARD_EDGES_PIPELINED.ordinal()] = 2;
            } catch (NoSuchFieldError e6) {
            }
            try {
                $SwitchMap$org$apache$flink$streaming$api$graph$GlobalDataExchangeMode[GlobalDataExchangeMode.POINTWISE_EDGES_PIPELINED.ordinal()] = 3;
            } catch (NoSuchFieldError e7) {
            }
            try {
                $SwitchMap$org$apache$flink$streaming$api$graph$GlobalDataExchangeMode[GlobalDataExchangeMode.ALL_EDGES_PIPELINED.ordinal()] = 4;
            } catch (NoSuchFieldError e8) {
            }
            try {
                $SwitchMap$org$apache$flink$streaming$api$graph$GlobalDataExchangeMode[GlobalDataExchangeMode.ALL_EDGES_PIPELINED_APPROXIMATE.ordinal()] = 5;
            } catch (NoSuchFieldError e9) {
            }
            $SwitchMap$org$apache$flink$streaming$api$transformations$ShuffleMode = new int[ShuffleMode.values().length];
            try {
                $SwitchMap$org$apache$flink$streaming$api$transformations$ShuffleMode[ShuffleMode.PIPELINED.ordinal()] = 1;
            } catch (NoSuchFieldError e10) {
            }
            try {
                $SwitchMap$org$apache$flink$streaming$api$transformations$ShuffleMode[ShuffleMode.BATCH.ordinal()] = 2;
            } catch (NoSuchFieldError e11) {
            }
            try {
                $SwitchMap$org$apache$flink$streaming$api$transformations$ShuffleMode[ShuffleMode.UNDEFINED.ordinal()] = 3;
            } catch (NoSuchFieldError e12) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator$ChainedSourceInfo.class */
    public static final class ChainedSourceInfo {
        private final StreamConfig operatorConfig;
        private final StreamConfig.SourceInputConfig inputConfig;

        ChainedSourceInfo(StreamConfig streamConfig, StreamConfig.SourceInputConfig sourceInputConfig) {
            this.operatorConfig = streamConfig;
            this.inputConfig = sourceInputConfig;
        }

        public StreamConfig getOperatorConfig() {
            return this.operatorConfig;
        }

        public StreamConfig.SourceInputConfig getInputConfig() {
            return this.inputConfig;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator$OperatorChainInfo.class */
    public static class OperatorChainInfo {
        private final Integer startNodeId;
        private final Map<Integer, byte[]> hashes;
        private final List<Map<Integer, byte[]>> legacyHashes;
        private final Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes;
        private final Map<Integer, ChainedSourceInfo> chainedSources;
        private final List<OperatorCoordinator.Provider> coordinatorProviders;
        private final StreamGraph streamGraph;

        private OperatorChainInfo(int i, Map<Integer, byte[]> map, List<Map<Integer, byte[]>> list, Map<Integer, ChainedSourceInfo> map2, StreamGraph streamGraph) {
            this.startNodeId = Integer.valueOf(i);
            this.hashes = map;
            this.legacyHashes = list;
            this.chainedOperatorHashes = new HashMap();
            this.coordinatorProviders = new ArrayList();
            this.chainedSources = map2;
            this.streamGraph = streamGraph;
        }

        byte[] getHash(Integer num) {
            return this.hashes.get(num);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public Integer getStartNodeId() {
            return this.startNodeId;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public List<Tuple2<byte[], byte[]>> getChainedOperatorHashes(int i) {
            return this.chainedOperatorHashes.get(Integer.valueOf(i));
        }

        void addCoordinatorProvider(OperatorCoordinator.Provider provider) {
            this.coordinatorProviders.add(provider);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public List<OperatorCoordinator.Provider> getCoordinatorProviders() {
            return this.coordinatorProviders;
        }

        Map<Integer, ChainedSourceInfo> getChainedSources() {
            return this.chainedSources;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public OperatorID addNodeToChain(int i, String str) {
            List<Tuple2<byte[], byte[]>> computeIfAbsent = this.chainedOperatorHashes.computeIfAbsent(this.startNodeId, num -> {
                return new ArrayList();
            });
            byte[] bArr = this.hashes.get(Integer.valueOf(i));
            Iterator<Map<Integer, byte[]>> it = this.legacyHashes.iterator();
            while (it.hasNext()) {
                computeIfAbsent.add(new Tuple2<>(bArr, it.next().get(Integer.valueOf(i))));
            }
            Optional<OperatorCoordinator.Provider> coordinatorProvider = this.streamGraph.getStreamNode(Integer.valueOf(i)).getCoordinatorProvider(str, new OperatorID(getHash(Integer.valueOf(i))));
            List<OperatorCoordinator.Provider> list = this.coordinatorProviders;
            list.getClass();
            coordinatorProvider.map((v1) -> {
                return r1.add(v1);
            });
            return new OperatorID(bArr);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public OperatorChainInfo newChain(Integer num) {
            return new OperatorChainInfo(num.intValue(), this.hashes, this.legacyHashes, this.chainedSources, this.streamGraph);
        }

        /* synthetic */ OperatorChainInfo(int i, Map map, List list, Map map2, StreamGraph streamGraph, AnonymousClass1 anonymousClass1) {
            this(i, map, list, map2, streamGraph);
        }
    }

    public static JobGraph createJobGraph(StreamGraph streamGraph) {
        return createJobGraph(streamGraph, null);
    }

    public static JobGraph createJobGraph(StreamGraph streamGraph, @Nullable JobID jobID) {
        return new StreamingJobGraphGenerator(streamGraph, jobID).createJobGraph();
    }

    private StreamingJobGraphGenerator(StreamGraph streamGraph, @Nullable JobID jobID) {
        this.streamGraph = streamGraph;
        this.jobGraph = new JobGraph(jobID, streamGraph.getJobName());
    }

    private JobGraph createJobGraph() {
        preValidate();
        this.jobGraph.setScheduleMode(this.streamGraph.getScheduleMode());
        this.jobGraph.enableApproximateLocalRecovery(this.streamGraph.getCheckpointConfig().isApproximateLocalRecoveryEnabled());
        Map<Integer, byte[]> traverseStreamGraphAndGenerateHashes = this.defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(this.streamGraph);
        ArrayList arrayList = new ArrayList(this.legacyStreamGraphHashers.size());
        Iterator<StreamGraphHasher> it = this.legacyStreamGraphHashers.iterator();
        while (it.hasNext()) {
            arrayList.add(it.next().traverseStreamGraphAndGenerateHashes(this.streamGraph));
        }
        setChaining(traverseStreamGraphAndGenerateHashes, arrayList);
        setPhysicalEdges();
        setSlotSharingAndCoLocation();
        setManagedMemoryFraction(Collections.unmodifiableMap(this.jobVertices), Collections.unmodifiableMap(this.vertexConfigs), Collections.unmodifiableMap(this.chainedConfigs), num -> {
            return this.streamGraph.getStreamNode(num).getManagedMemoryOperatorScopeUseCaseWeights();
        }, num2 -> {
            return this.streamGraph.getStreamNode(num2).getManagedMemorySlotScopeUseCases();
        });
        configureCheckpointing();
        this.jobGraph.setSavepointRestoreSettings(this.streamGraph.getSavepointRestoreSettings());
        JobGraphUtils.addUserArtifactEntries(this.streamGraph.getUserArtifacts(), this.jobGraph);
        try {
            this.jobGraph.setExecutionConfig(this.streamGraph.getExecutionConfig());
            return this.jobGraph;
        } catch (IOException e) {
            throw new IllegalConfigurationException("Could not serialize the ExecutionConfig.This indicates that non-serializable types (like custom serializers) were registered");
        }
    }

    private void preValidate() {
        CheckpointConfig checkpointConfig = this.streamGraph.getCheckpointConfig();
        if (checkpointConfig.isCheckpointingEnabled()) {
            if (this.streamGraph.isIterative() && !checkpointConfig.isForceCheckpointing()) {
                throw new UnsupportedOperationException("Checkpointing is currently not supported by default for iterative jobs, as we cannot guarantee exactly once semantics. State checkpoints happen normally, but records in-transit during the snapshot will be lost upon failure. \nThe user can force enable state checkpoints with the reduced guarantees by calling: env.enableCheckpointing(interval,true)");
            }
            if (this.streamGraph.isIterative() && checkpointConfig.isUnalignedCheckpointsEnabled() && !checkpointConfig.isForceUnalignedCheckpoints()) {
                throw new UnsupportedOperationException("Unaligned Checkpoints are currently not supported for iterative jobs,  as rescaling would require alignment (in addition to the reduced checkpointing guarantees).\nThe user can force Unaligned Checkpoints by using 'execution.checkpointing.unaligned.forced'");
            }
            ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
            Iterator<StreamNode> it = this.streamGraph.getStreamNodes().iterator();
            while (it.hasNext()) {
                StreamOperatorFactory<?> operatorFactory = it.next().getOperatorFactory();
                if (operatorFactory != null) {
                    Class<? extends StreamOperator> streamOperatorClass = operatorFactory.getStreamOperatorClass(contextClassLoader);
                    if (InputSelectable.class.isAssignableFrom(streamOperatorClass)) {
                        throw new UnsupportedOperationException("Checkpointing is currently not supported for operators that implement InputSelectable:" + streamOperatorClass.getName());
                    }
                }
            }
        }
        if (!checkpointConfig.isUnalignedCheckpointsEnabled() || getCheckpointingMode(checkpointConfig) == CheckpointingMode.EXACTLY_ONCE) {
            return;
        }
        LOG.warn("Unaligned checkpoints can only be used with checkpointing mode EXACTLY_ONCE");
        checkpointConfig.enableUnalignedCheckpoints(false);
    }

    private void setPhysicalEdges() {
        HashMap hashMap = new HashMap();
        for (StreamEdge streamEdge : this.physicalEdgesInOrder) {
            ((List) hashMap.computeIfAbsent(Integer.valueOf(streamEdge.getTargetId()), num -> {
                return new ArrayList();
            })).add(streamEdge);
        }
        for (Map.Entry entry : hashMap.entrySet()) {
            int intValue = ((Integer) entry.getKey()).intValue();
            this.vertexConfigs.get(Integer.valueOf(intValue)).setInPhysicalEdges((List) entry.getValue());
        }
    }

    private Map<Integer, OperatorChainInfo> buildChainedInputsAndGetHeadInputs(Map<Integer, byte[]> map, List<Map<Integer, byte[]>> list) {
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        for (Integer num : this.streamGraph.getSourceIDs()) {
            StreamNode streamNode = this.streamGraph.getStreamNode(num);
            if ((streamNode.getOperatorFactory() instanceof SourceOperatorFactory) && streamNode.getOutEdges().size() == 1) {
                StreamEdge streamEdge = streamNode.getOutEdges().get(0);
                if (this.streamGraph.getStreamNode(Integer.valueOf(streamEdge.getTargetId())).getOperatorFactory().getChainingStrategy() == ChainingStrategy.HEAD_WITH_SOURCES && isChainableInput(streamEdge, this.streamGraph)) {
                    OperatorID operatorID = new OperatorID(map.get(num));
                    StreamConfig.SourceInputConfig sourceInputConfig = new StreamConfig.SourceInputConfig(streamEdge);
                    StreamConfig streamConfig = new StreamConfig(new Configuration());
                    setVertexConfig(num, streamConfig, Collections.emptyList(), Collections.emptyList(), Collections.emptyMap());
                    streamConfig.setChainIndex(0);
                    streamConfig.setOperatorID(operatorID);
                    streamConfig.setOperatorName(streamNode.getOperatorName());
                    hashMap.put(num, new ChainedSourceInfo(streamConfig, sourceInputConfig));
                    ((OperatorChainInfo) hashMap2.computeIfAbsent(Integer.valueOf(streamEdge.getTargetId()), num2 -> {
                        return new OperatorChainInfo(streamEdge.getTargetId(), map, list, hashMap, this.streamGraph, null);
                    })).addCoordinatorProvider(((SourceOperatorFactory) streamNode.getOperatorFactory()).getCoordinatorProvider(streamNode.getOperatorName(), operatorID));
                }
            }
            hashMap2.put(num, new OperatorChainInfo(num.intValue(), map, list, hashMap, this.streamGraph, null));
        }
        return hashMap2;
    }

    private void setChaining(Map<Integer, byte[]> map, List<Map<Integer, byte[]>> list) {
        Map<Integer, OperatorChainInfo> buildChainedInputsAndGetHeadInputs = buildChainedInputsAndGetHeadInputs(map, list);
        for (OperatorChainInfo operatorChainInfo : (Collection) buildChainedInputsAndGetHeadInputs.entrySet().stream().sorted(Comparator.comparing((v0) -> {
            return v0.getKey();
        })).map((v0) -> {
            return v0.getValue();
        }).collect(Collectors.toList())) {
            createChain(operatorChainInfo.getStartNodeId(), 1, operatorChainInfo, buildChainedInputsAndGetHeadInputs);
        }
    }

    private List<StreamEdge> createChain(Integer num, int i, OperatorChainInfo operatorChainInfo, Map<Integer, OperatorChainInfo> map) {
        Integer startNodeId = operatorChainInfo.getStartNodeId();
        if (this.builtVertices.contains(startNodeId)) {
            return new ArrayList();
        }
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        ArrayList arrayList3 = new ArrayList();
        StreamNode streamNode = this.streamGraph.getStreamNode(num);
        for (StreamEdge streamEdge : streamNode.getOutEdges()) {
            if (isChainable(streamEdge, this.streamGraph)) {
                arrayList2.add(streamEdge);
            } else {
                arrayList3.add(streamEdge);
            }
        }
        Iterator<StreamEdge> it = arrayList2.iterator();
        while (it.hasNext()) {
            arrayList.addAll(createChain(Integer.valueOf(it.next().getTargetId()), i + 1, operatorChainInfo, map));
        }
        for (StreamEdge streamEdge2 : arrayList3) {
            arrayList.add(streamEdge2);
            createChain(Integer.valueOf(streamEdge2.getTargetId()), 1, map.computeIfAbsent(Integer.valueOf(streamEdge2.getTargetId()), num2 -> {
                return operatorChainInfo.newChain(Integer.valueOf(streamEdge2.getTargetId()));
            }), map);
        }
        this.chainedNames.put(num, createChainedName(num, arrayList2, Optional.ofNullable(map.get(num))));
        this.chainedMinResources.put(num, createChainedMinResources(num, arrayList2));
        this.chainedPreferredResources.put(num, createChainedPreferredResources(num, arrayList2));
        OperatorID addNodeToChain = operatorChainInfo.addNodeToChain(num.intValue(), this.chainedNames.get(num));
        if (streamNode.getInputFormat() != null) {
            getOrCreateFormatContainer(startNodeId).addInputFormat(addNodeToChain, streamNode.getInputFormat());
        }
        if (streamNode.getOutputFormat() != null) {
            getOrCreateFormatContainer(startNodeId).addOutputFormat(addNodeToChain, streamNode.getOutputFormat());
        }
        StreamConfig createJobVertex = num.equals(startNodeId) ? createJobVertex(startNodeId, operatorChainInfo) : new StreamConfig(new Configuration());
        setVertexConfig(num, createJobVertex, arrayList2, arrayList3, operatorChainInfo.getChainedSources());
        if (num.equals(startNodeId)) {
            createJobVertex.setChainStart();
            createJobVertex.setChainIndex(i);
            createJobVertex.setOperatorName(this.streamGraph.getStreamNode(num).getOperatorName());
            Iterator<StreamEdge> it2 = arrayList.iterator();
            while (it2.hasNext()) {
                connect(startNodeId, it2.next());
            }
            createJobVertex.setOutEdgesInOrder(arrayList);
            createJobVertex.setTransitiveChainedTaskConfigs(this.chainedConfigs.get(startNodeId));
        } else {
            this.chainedConfigs.computeIfAbsent(startNodeId, num3 -> {
                return new HashMap();
            });
            createJobVertex.setChainIndex(i);
            createJobVertex.setOperatorName(this.streamGraph.getStreamNode(num).getOperatorName());
            this.chainedConfigs.get(startNodeId).put(num, createJobVertex);
        }
        createJobVertex.setOperatorID(addNodeToChain);
        if (arrayList2.isEmpty()) {
            createJobVertex.setChainEnd();
        }
        return arrayList;
    }

    private InputOutputFormatContainer getOrCreateFormatContainer(Integer num) {
        return this.chainedInputOutputFormats.computeIfAbsent(num, num2 -> {
            return new InputOutputFormatContainer(Thread.currentThread().getContextClassLoader());
        });
    }

    private String createChainedName(Integer num, List<StreamEdge> list, Optional<OperatorChainInfo> optional) {
        String nameWithChainedSourcesInfo = nameWithChainedSourcesInfo(this.streamGraph.getStreamNode(num).getOperatorName(), (Collection) optional.map(operatorChainInfo -> {
            return operatorChainInfo.getChainedSources().values();
        }).orElse(Collections.emptyList()));
        if (list.size() <= 1) {
            return list.size() == 1 ? nameWithChainedSourcesInfo + " -> " + this.chainedNames.get(Integer.valueOf(list.get(0).getTargetId())) : nameWithChainedSourcesInfo;
        }
        ArrayList arrayList = new ArrayList();
        Iterator<StreamEdge> it = list.iterator();
        while (it.hasNext()) {
            arrayList.add(this.chainedNames.get(Integer.valueOf(it.next().getTargetId())));
        }
        return nameWithChainedSourcesInfo + " -> (" + StringUtils.join(arrayList, ", ") + ")";
    }

    private ResourceSpec createChainedMinResources(Integer num, List<StreamEdge> list) {
        ResourceSpec minResources = this.streamGraph.getStreamNode(num).getMinResources();
        Iterator<StreamEdge> it = list.iterator();
        while (it.hasNext()) {
            minResources = minResources.merge(this.chainedMinResources.get(Integer.valueOf(it.next().getTargetId())));
        }
        return minResources;
    }

    private ResourceSpec createChainedPreferredResources(Integer num, List<StreamEdge> list) {
        ResourceSpec preferredResources = this.streamGraph.getStreamNode(num).getPreferredResources();
        Iterator<StreamEdge> it = list.iterator();
        while (it.hasNext()) {
            preferredResources = preferredResources.merge(this.chainedPreferredResources.get(Integer.valueOf(it.next().getTargetId())));
        }
        return preferredResources;
    }

    private StreamConfig createJobVertex(Integer num, OperatorChainInfo operatorChainInfo) {
        JobVertex jobVertex;
        StreamNode streamNode = this.streamGraph.getStreamNode(num);
        byte[] hash = operatorChainInfo.getHash(num);
        if (hash == null) {
            throw new IllegalStateException("Cannot find node hash. Did you generate them before calling this method?");
        }
        JobVertexID jobVertexID = new JobVertexID(hash);
        List<Tuple2> chainedOperatorHashes = operatorChainInfo.getChainedOperatorHashes(num.intValue());
        ArrayList arrayList = new ArrayList();
        if (chainedOperatorHashes != null) {
            for (Tuple2 tuple2 : chainedOperatorHashes) {
                arrayList.add(OperatorIDPair.of(new OperatorID((byte[]) tuple2.f0), tuple2.f1 == null ? null : new OperatorID((byte[]) tuple2.f1)));
            }
        }
        if (this.chainedInputOutputFormats.containsKey(num)) {
            jobVertex = new InputOutputFormatVertex(this.chainedNames.get(num), jobVertexID, arrayList);
            this.chainedInputOutputFormats.get(num).write(new TaskConfig(jobVertex.getConfiguration()));
        } else {
            jobVertex = new JobVertex(this.chainedNames.get(num), jobVertexID, arrayList);
        }
        Iterator it = operatorChainInfo.getCoordinatorProviders().iterator();
        while (it.hasNext()) {
            try {
                jobVertex.addOperatorCoordinator(new SerializedValue((OperatorCoordinator.Provider) it.next()));
            } catch (IOException e) {
                throw new FlinkRuntimeException(String.format("Coordinator Provider for node %s is not serializable.", this.chainedNames.get(num)), e);
            }
        }
        jobVertex.setResources(this.chainedMinResources.get(num), this.chainedPreferredResources.get(num));
        jobVertex.setInvokableClass(streamNode.getJobVertexClass());
        int parallelism = streamNode.getParallelism();
        if (parallelism > 0) {
            jobVertex.setParallelism(parallelism);
        } else {
            parallelism = jobVertex.getParallelism();
        }
        jobVertex.setMaxParallelism(streamNode.getMaxParallelism());
        if (LOG.isDebugEnabled()) {
            LOG.debug("Parallelism set: {} for {}", Integer.valueOf(parallelism), num);
        }
        jobVertex.setInputDependencyConstraint(this.streamGraph.getExecutionConfig().getDefaultInputDependencyConstraint());
        this.jobVertices.put(num, jobVertex);
        this.builtVertices.add(num);
        this.jobGraph.addVertex(jobVertex);
        return new StreamConfig(jobVertex.getConfiguration());
    }

    private void setVertexConfig(Integer num, StreamConfig streamConfig, List<StreamEdge> list, List<StreamEdge> list2, Map<Integer, ChainedSourceInfo> map) {
        StreamNode streamNode = this.streamGraph.getStreamNode(num);
        streamConfig.setVertexID(num);
        List<StreamEdge> inEdges = streamNode.getInEdges();
        TypeSerializer<?>[] typeSerializersIn = streamNode.getTypeSerializersIn();
        StreamConfig.InputConfig[] inputConfigArr = new StreamConfig.InputConfig[typeSerializersIn.length];
        int i = 0;
        for (StreamEdge streamEdge : inEdges) {
            ChainedSourceInfo chainedSourceInfo = map.get(Integer.valueOf(streamEdge.getSourceId()));
            int typeNumber = streamEdge.getTypeNumber() == 0 ? 0 : streamEdge.getTypeNumber() - 1;
            if (chainedSourceInfo != null) {
                if (inputConfigArr[typeNumber] != null) {
                    throw new IllegalStateException("Trying to union a chained source with another input.");
                }
                inputConfigArr[typeNumber] = chainedSourceInfo.getInputConfig();
                this.chainedConfigs.computeIfAbsent(num, num2 -> {
                    return new HashMap();
                }).put(Integer.valueOf(streamEdge.getSourceId()), chainedSourceInfo.getOperatorConfig());
            } else if (inputConfigArr[typeNumber] == null) {
                int i2 = i;
                i++;
                inputConfigArr[typeNumber] = new StreamConfig.NetworkInputConfig(typeSerializersIn[typeNumber], i2, streamNode.getInputRequirements().getOrDefault(Integer.valueOf(typeNumber), StreamConfig.InputRequirement.PASS_THROUGH));
            }
        }
        streamConfig.setInputs(inputConfigArr);
        streamConfig.setTypeSerializerOut(streamNode.getTypeSerializerOut());
        for (StreamEdge streamEdge2 : list) {
            if (streamEdge2.getOutputTag() != null) {
                streamConfig.setTypeSerializerSideOut(streamEdge2.getOutputTag(), streamEdge2.getOutputTag().getTypeInfo().createSerializer(this.streamGraph.getExecutionConfig()));
            }
        }
        for (StreamEdge streamEdge3 : list2) {
            if (streamEdge3.getOutputTag() != null) {
                streamConfig.setTypeSerializerSideOut(streamEdge3.getOutputTag(), streamEdge3.getOutputTag().getTypeInfo().createSerializer(this.streamGraph.getExecutionConfig()));
            }
        }
        streamConfig.setStreamOperatorFactory(streamNode.getOperatorFactory());
        streamConfig.setNumberOfOutputs(list2.size());
        streamConfig.setNonChainedOutputs(list2);
        streamConfig.setChainedOutputs(list);
        streamConfig.setTimeCharacteristic(this.streamGraph.getTimeCharacteristic());
        CheckpointConfig checkpointConfig = this.streamGraph.getCheckpointConfig();
        streamConfig.setStateBackend(this.streamGraph.getStateBackend());
        streamConfig.setGraphContainingLoops(this.streamGraph.isIterative());
        streamConfig.setTimerServiceProvider(this.streamGraph.getTimerServiceProvider());
        streamConfig.setCheckpointingEnabled(checkpointConfig.isCheckpointingEnabled());
        streamConfig.setCheckpointMode(getCheckpointingMode(checkpointConfig));
        streamConfig.setUnalignedCheckpointsEnabled(checkpointConfig.isUnalignedCheckpointsEnabled());
        streamConfig.setAlignmentTimeout(checkpointConfig.getAlignmentTimeout());
        for (int i3 = 0; i3 < streamNode.getStatePartitioners().length; i3++) {
            streamConfig.setStatePartitioner(i3, streamNode.getStatePartitioners()[i3]);
        }
        streamConfig.setStateKeySerializer(streamNode.getStateKeySerializer());
        Class<? extends AbstractInvokable> jobVertexClass = streamNode.getJobVertexClass();
        if (jobVertexClass.equals(StreamIterationHead.class) || jobVertexClass.equals(StreamIterationTail.class)) {
            streamConfig.setIterationId(this.streamGraph.getBrokerID(num));
            streamConfig.setIterationWaitTime(this.streamGraph.getLoopTimeout(num));
        }
        this.vertexConfigs.put(num, streamConfig);
    }

    private CheckpointingMode getCheckpointingMode(CheckpointConfig checkpointConfig) {
        CheckpointingMode checkpointingMode = checkpointConfig.getCheckpointingMode();
        Preconditions.checkArgument(checkpointingMode == CheckpointingMode.EXACTLY_ONCE || checkpointingMode == CheckpointingMode.AT_LEAST_ONCE, "Unexpected checkpointing mode.");
        return checkpointConfig.isCheckpointingEnabled() ? checkpointingMode : CheckpointingMode.AT_LEAST_ONCE;
    }

    private void connect(Integer num, StreamEdge streamEdge) {
        ResultPartitionType determineResultPartitionType;
        this.physicalEdgesInOrder.add(streamEdge);
        Integer valueOf = Integer.valueOf(streamEdge.getTargetId());
        JobVertex jobVertex = this.jobVertices.get(num);
        JobVertex jobVertex2 = this.jobVertices.get(valueOf);
        StreamConfig streamConfig = new StreamConfig(jobVertex2.getConfiguration());
        streamConfig.setNumberOfNetworkInputs(streamConfig.getNumberOfNetworkInputs() + 1);
        StreamPartitioner<?> partitioner = streamEdge.getPartitioner();
        switch (AnonymousClass1.$SwitchMap$org$apache$flink$streaming$api$transformations$ShuffleMode[streamEdge.getShuffleMode().ordinal()]) {
            case CheckpointConfig.DEFAULT_MAX_CONCURRENT_CHECKPOINTS /* 1 */:
                determineResultPartitionType = ResultPartitionType.PIPELINED_BOUNDED;
                break;
            case InternalTimerServiceSerializationProxy.VERSION /* 2 */:
                determineResultPartitionType = ResultPartitionType.BLOCKING;
                break;
            case 3:
                determineResultPartitionType = determineResultPartitionType(partitioner);
                break;
            default:
                throw new UnsupportedOperationException("Data exchange mode " + streamEdge.getShuffleMode() + " is not supported yet.");
        }
        checkAndResetBufferTimeout(determineResultPartitionType, streamEdge);
        JobEdge connectNewDataSetAsInput = isPointwisePartitioner(partitioner) ? jobVertex2.connectNewDataSetAsInput(jobVertex, DistributionPattern.POINTWISE, determineResultPartitionType) : jobVertex2.connectNewDataSetAsInput(jobVertex, DistributionPattern.ALL_TO_ALL, determineResultPartitionType);
        connectNewDataSetAsInput.setShipStrategyName(partitioner.toString());
        connectNewDataSetAsInput.setDownstreamSubtaskStateMapper(partitioner.getDownstreamSubtaskStateMapper());
        connectNewDataSetAsInput.setUpstreamSubtaskStateMapper(partitioner.getUpstreamSubtaskStateMapper());
        if (LOG.isDebugEnabled()) {
            LOG.debug("CONNECTED: {} - {} -> {}", new Object[]{partitioner.getClass().getSimpleName(), num, valueOf});
        }
    }

    private void checkAndResetBufferTimeout(ResultPartitionType resultPartitionType, StreamEdge streamEdge) {
        long bufferTimeout = streamEdge.getBufferTimeout();
        if (resultPartitionType.isBlocking() && bufferTimeout != -1) {
            throw new UnsupportedOperationException("Blocking partition does not support buffer timeout " + bufferTimeout + " for src operator in edge " + streamEdge.toString() + ". \nPlease either reset buffer timeout as -1 or use the non-blocking partition.");
        }
        if (resultPartitionType.isPipelined() && bufferTimeout == -1) {
            streamEdge.setBufferTimeout(DEFAULT_NETWORK_BUFFER_TIMEOUT);
        }
    }

    private static boolean isPointwisePartitioner(StreamPartitioner<?> streamPartitioner) {
        return (streamPartitioner instanceof ForwardPartitioner) || (streamPartitioner instanceof RescalePartitioner);
    }

    private ResultPartitionType determineResultPartitionType(StreamPartitioner<?> streamPartitioner) {
        switch (AnonymousClass1.$SwitchMap$org$apache$flink$streaming$api$graph$GlobalDataExchangeMode[this.streamGraph.getGlobalDataExchangeMode().ordinal()]) {
            case CheckpointConfig.DEFAULT_MAX_CONCURRENT_CHECKPOINTS /* 1 */:
                return ResultPartitionType.BLOCKING;
            case InternalTimerServiceSerializationProxy.VERSION /* 2 */:
                return streamPartitioner instanceof ForwardPartitioner ? ResultPartitionType.PIPELINED_BOUNDED : ResultPartitionType.BLOCKING;
            case 3:
                return isPointwisePartitioner(streamPartitioner) ? ResultPartitionType.PIPELINED_BOUNDED : ResultPartitionType.BLOCKING;
            case 4:
                return ResultPartitionType.PIPELINED_BOUNDED;
            case 5:
                return ResultPartitionType.PIPELINED_APPROXIMATE;
            default:
                throw new RuntimeException("Unrecognized global data exchange mode " + this.streamGraph.getGlobalDataExchangeMode());
        }
    }

    public static boolean isChainable(StreamEdge streamEdge, StreamGraph streamGraph) {
        return streamGraph.getTargetVertex(streamEdge).getInEdges().size() == 1 && isChainableInput(streamEdge, streamGraph);
    }

    private static boolean isChainableInput(StreamEdge streamEdge, StreamGraph streamGraph) {
        StreamNode sourceVertex = streamGraph.getSourceVertex(streamEdge);
        StreamNode targetVertex = streamGraph.getTargetVertex(streamEdge);
        if (!sourceVertex.isSameSlotSharingGroup(targetVertex) || !areOperatorsChainable(sourceVertex, targetVertex, streamGraph) || !(streamEdge.getPartitioner() instanceof ForwardPartitioner) || streamEdge.getShuffleMode() == ShuffleMode.BATCH || sourceVertex.getParallelism() != targetVertex.getParallelism() || !streamGraph.isChainingEnabled()) {
            return false;
        }
        for (StreamEdge streamEdge2 : targetVertex.getInEdges()) {
            if (streamEdge2 != streamEdge && streamEdge2.getTypeNumber() == streamEdge.getTypeNumber()) {
                return false;
            }
        }
        return true;
    }

    @VisibleForTesting
    static boolean areOperatorsChainable(StreamNode streamNode, StreamNode streamNode2, StreamGraph streamGraph) {
        boolean z;
        StreamOperatorFactory<?> operatorFactory = streamNode.getOperatorFactory();
        StreamOperatorFactory<?> operatorFactory2 = streamNode2.getOperatorFactory();
        if (operatorFactory2 == null || operatorFactory == null) {
            return false;
        }
        if ((operatorFactory2 instanceof YieldingOperatorFactory) && getHeadOperator(streamNode, streamGraph).isLegacySource()) {
            return false;
        }
        switch (AnonymousClass1.$SwitchMap$org$apache$flink$streaming$api$operators$ChainingStrategy[operatorFactory.getChainingStrategy().ordinal()]) {
            case CheckpointConfig.DEFAULT_MAX_CONCURRENT_CHECKPOINTS /* 1 */:
                z = false;
                break;
            case InternalTimerServiceSerializationProxy.VERSION /* 2 */:
            case 3:
            case 4:
                z = true;
                break;
            default:
                throw new RuntimeException("Unknown chaining strategy: " + operatorFactory.getChainingStrategy());
        }
        switch (AnonymousClass1.$SwitchMap$org$apache$flink$streaming$api$operators$ChainingStrategy[operatorFactory2.getChainingStrategy().ordinal()]) {
            case CheckpointConfig.DEFAULT_MAX_CONCURRENT_CHECKPOINTS /* 1 */:
            case 3:
                z = false;
                break;
            case InternalTimerServiceSerializationProxy.VERSION /* 2 */:
                break;
            case 4:
                z &= operatorFactory instanceof SourceOperatorFactory;
                break;
            default:
                throw new RuntimeException("Unknown chaining strategy: " + operatorFactory.getChainingStrategy());
        }
        return z;
    }

    private static StreamOperatorFactory<?> getHeadOperator(StreamNode streamNode, StreamGraph streamGraph) {
        return (streamNode.getInEdges().size() == 1 && isChainable(streamNode.getInEdges().get(0), streamGraph)) ? getHeadOperator(streamGraph.getSourceVertex(streamNode.getInEdges().get(0)), streamGraph) : streamNode.getOperatorFactory();
    }

    private void setSlotSharingAndCoLocation() {
        setSlotSharing();
        setCoLocation();
    }

    private void setSlotSharing() {
        HashMap hashMap = new HashMap();
        Map<JobVertexID, SlotSharingGroup> buildVertexRegionSlotSharingGroups = buildVertexRegionSlotSharingGroups();
        for (Map.Entry<Integer, JobVertex> entry : this.jobVertices.entrySet()) {
            JobVertex value = entry.getValue();
            String slotSharingGroup = this.streamGraph.getStreamNode(entry.getKey()).getSlotSharingGroup();
            Preconditions.checkNotNull(slotSharingGroup, "StreamNode slot sharing group must not be null");
            value.setSlotSharingGroup((SlotSharingGroup) (slotSharingGroup.equals(StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP) ? Preconditions.checkNotNull(buildVertexRegionSlotSharingGroups.get(value.getID())) : hashMap.computeIfAbsent(slotSharingGroup, str -> {
                return new SlotSharingGroup();
            })));
        }
    }

    private Map<JobVertexID, SlotSharingGroup> buildVertexRegionSlotSharingGroups() {
        HashMap hashMap = new HashMap();
        SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
        boolean isAllVerticesInSameSlotSharingGroupByDefault = this.streamGraph.isAllVerticesInSameSlotSharingGroupByDefault();
        for (DefaultLogicalPipelinedRegion defaultLogicalPipelinedRegion : new DefaultLogicalTopology(this.jobGraph).getLogicalPipelinedRegions()) {
            SlotSharingGroup slotSharingGroup2 = isAllVerticesInSameSlotSharingGroupByDefault ? slotSharingGroup : new SlotSharingGroup();
            Iterator it = defaultLogicalPipelinedRegion.getVertexIDs().iterator();
            while (it.hasNext()) {
                hashMap.put((JobVertexID) it.next(), slotSharingGroup2);
            }
        }
        return hashMap;
    }

    private void setCoLocation() {
        HashMap hashMap = new HashMap();
        for (Map.Entry<Integer, JobVertex> entry : this.jobVertices.entrySet()) {
            StreamNode streamNode = this.streamGraph.getStreamNode(entry.getKey());
            JobVertex value = entry.getValue();
            SlotSharingGroup slotSharingGroup = value.getSlotSharingGroup();
            String coLocationGroup = streamNode.getCoLocationGroup();
            if (coLocationGroup != null) {
                if (slotSharingGroup == null) {
                    throw new IllegalStateException("Cannot use a co-location constraint without a slot sharing group");
                }
                Tuple2 tuple2 = (Tuple2) hashMap.computeIfAbsent(coLocationGroup, str -> {
                    return new Tuple2(slotSharingGroup, new CoLocationGroup());
                });
                if (tuple2.f0 != slotSharingGroup) {
                    throw new IllegalStateException("Cannot co-locate operators from different slot sharing groups");
                }
                value.updateCoLocationGroup((CoLocationGroup) tuple2.f1);
                ((CoLocationGroup) tuple2.f1).addVertex(value);
            }
        }
    }

    private static void setManagedMemoryFraction(Map<Integer, JobVertex> map, Map<Integer, StreamConfig> map2, Map<Integer, Map<Integer, StreamConfig>> map3, Function<Integer, Map<ManagedMemoryUseCase, Integer>> function, Function<Integer, Set<ManagedMemoryUseCase>> function2) {
        Set newSetFromMap = Collections.newSetFromMap(new IdentityHashMap());
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        for (Map.Entry<Integer, JobVertex> entry : map.entrySet()) {
            int intValue = entry.getKey().intValue();
            JobVertex value = entry.getValue();
            SlotSharingGroup slotSharingGroup = value.getSlotSharingGroup();
            Preconditions.checkState(slotSharingGroup != null, "JobVertex slot sharing group must not be null");
            newSetFromMap.add(slotSharingGroup);
            hashMap.put(value.getID(), Integer.valueOf(intValue));
            HashSet hashSet = new HashSet();
            hashSet.add(Integer.valueOf(intValue));
            hashSet.addAll(map3.getOrDefault(Integer.valueOf(intValue), Collections.emptyMap()).keySet());
            hashMap2.put(value.getID(), hashSet);
        }
        Iterator it = newSetFromMap.iterator();
        while (it.hasNext()) {
            setManagedMemoryFractionForSlotSharingGroup((SlotSharingGroup) it.next(), hashMap, hashMap2, map2, map3, function, function2);
        }
    }

    private static void setManagedMemoryFractionForSlotSharingGroup(SlotSharingGroup slotSharingGroup, Map<JobVertexID, Integer> map, Map<JobVertexID, Set<Integer>> map2, Map<Integer, StreamConfig> map3, Map<Integer, Map<Integer, StreamConfig>> map4, Function<Integer, Map<ManagedMemoryUseCase, Integer>> function, Function<Integer, Set<ManagedMemoryUseCase>> function2) {
        Set set = (Set) slotSharingGroup.getJobVertexIds().stream().flatMap(jobVertexID -> {
            return ((Set) map2.get(jobVertexID)).stream();
        }).collect(Collectors.toSet());
        Map map5 = (Map) set.stream().flatMap(num -> {
            return ((Map) function.apply(num)).entrySet().stream();
        }).collect(Collectors.groupingBy((v0) -> {
            return v0.getKey();
        }, Collectors.summingInt((v0) -> {
            return v0.getValue();
        })));
        Set set2 = (Set) set.stream().flatMap(num2 -> {
            return ((Set) function2.apply(num2)).stream();
        }).collect(Collectors.toSet());
        for (JobVertexID jobVertexID2 : slotSharingGroup.getJobVertexIds()) {
            Iterator<Integer> it = map2.get(jobVertexID2).iterator();
            while (it.hasNext()) {
                int intValue = it.next().intValue();
                StreamConfig streamConfig = map3.get(Integer.valueOf(intValue));
                setManagedMemoryFractionForOperator(slotSharingGroup.getResourceSpec(), function.apply(Integer.valueOf(intValue)), function2.apply(Integer.valueOf(intValue)), map5, set2, streamConfig);
            }
            int intValue2 = map.get(jobVertexID2).intValue();
            map3.get(Integer.valueOf(intValue2)).setTransitiveChainedTaskConfigs(map4.get(Integer.valueOf(intValue2)));
        }
    }

    private static void setManagedMemoryFractionForOperator(ResourceSpec resourceSpec, Map<ManagedMemoryUseCase, Integer> map, Set<ManagedMemoryUseCase> set, Map<ManagedMemoryUseCase, Integer> map2, Set<ManagedMemoryUseCase> set2, StreamConfig streamConfig) {
        if (!resourceSpec.equals(ResourceSpec.UNKNOWN)) {
            LOG.error("Failed setting managed memory fractions.  Operators may not be able to use managed memory properly. Calculating managed memory fractions with fine grained resource spec is currently not supported.");
            return;
        }
        for (Map.Entry<ManagedMemoryUseCase, Integer> entry : map2.entrySet()) {
            ManagedMemoryUseCase key = entry.getKey();
            int intValue = entry.getValue().intValue();
            int intValue2 = map.getOrDefault(key, 0).intValue();
            streamConfig.setManagedMemoryFractionOperatorOfUseCase(key, intValue2 > 0 ? ManagedMemoryUtils.getFractionRoundedDown(intValue2, intValue) : 0.0d);
        }
        for (ManagedMemoryUseCase managedMemoryUseCase : set2) {
            streamConfig.setManagedMemoryFractionOperatorOfUseCase(managedMemoryUseCase, set.contains(managedMemoryUseCase) ? 1.0d : 0.0d);
        }
    }

    private void configureCheckpointing() {
        CheckpointRetentionPolicy checkpointRetentionPolicy;
        SerializedValue serializedValue;
        SerializedValue serializedValue2;
        CheckpointConfig checkpointConfig = this.streamGraph.getCheckpointConfig();
        long checkpointInterval = checkpointConfig.getCheckpointInterval();
        if (checkpointInterval < 10) {
            checkpointInterval = Long.MAX_VALUE;
        }
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList(this.jobVertices.size());
        ArrayList arrayList3 = new ArrayList(this.jobVertices.size());
        for (JobVertex jobVertex : this.jobVertices.values()) {
            if (jobVertex.isInputVertex()) {
                arrayList.add(jobVertex.getID());
            }
            arrayList3.add(jobVertex.getID());
            arrayList2.add(jobVertex.getID());
        }
        if (checkpointConfig.isExternalizedCheckpointsEnabled()) {
            CheckpointConfig.ExternalizedCheckpointCleanup externalizedCheckpointCleanup = checkpointConfig.getExternalizedCheckpointCleanup();
            if (externalizedCheckpointCleanup == null) {
                throw new IllegalStateException("Externalized checkpoints enabled, but no cleanup mode configured.");
            }
            checkpointRetentionPolicy = externalizedCheckpointCleanup.deleteOnCancellation() ? CheckpointRetentionPolicy.RETAIN_ON_FAILURE : CheckpointRetentionPolicy.RETAIN_ON_CANCELLATION;
        } else {
            checkpointRetentionPolicy = CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION;
        }
        ArrayList arrayList4 = new ArrayList();
        for (StreamNode streamNode : this.streamGraph.getStreamNodes()) {
            if (streamNode.getOperatorFactory() instanceof UdfStreamOperatorFactory) {
                WithMasterCheckpointHook userFunction = ((UdfStreamOperatorFactory) streamNode.getOperatorFactory()).getUserFunction();
                if (userFunction instanceof WithMasterCheckpointHook) {
                    arrayList4.add(new FunctionMasterCheckpointHookFactory(userFunction));
                }
            }
        }
        if (arrayList4.isEmpty()) {
            serializedValue = null;
        } else {
            try {
                serializedValue = new SerializedValue((MasterTriggerRestoreHook.Factory[]) arrayList4.toArray(new MasterTriggerRestoreHook.Factory[arrayList4.size()]));
            } catch (IOException e) {
                throw new FlinkRuntimeException("Trigger/restore hook is not serializable", e);
            }
        }
        if (this.streamGraph.getStateBackend() == null) {
            serializedValue2 = null;
        } else {
            try {
                serializedValue2 = new SerializedValue(this.streamGraph.getStateBackend());
            } catch (IOException e2) {
                throw new FlinkRuntimeException("State backend is not serializable", e2);
            }
        }
        this.jobGraph.setSnapshotSettings(new JobCheckpointingSettings(arrayList, arrayList2, arrayList3, CheckpointCoordinatorConfiguration.builder().setCheckpointInterval(checkpointInterval).setCheckpointTimeout(checkpointConfig.getCheckpointTimeout()).setMinPauseBetweenCheckpoints(checkpointConfig.getMinPauseBetweenCheckpoints()).setMaxConcurrentCheckpoints(checkpointConfig.getMaxConcurrentCheckpoints()).setCheckpointRetentionPolicy(checkpointRetentionPolicy).setExactlyOnce(getCheckpointingMode(checkpointConfig) == CheckpointingMode.EXACTLY_ONCE).setPreferCheckpointForRecovery(checkpointConfig.isPreferCheckpointForRecovery()).setTolerableCheckpointFailureNumber(checkpointConfig.getTolerableCheckpointFailureNumber()).setUnalignedCheckpointsEnabled(checkpointConfig.isUnalignedCheckpointsEnabled()).setAlignmentTimeout(checkpointConfig.getAlignmentTimeout()).build(), serializedValue2, serializedValue));
    }

    private static String nameWithChainedSourcesInfo(String str, Collection<ChainedSourceInfo> collection) {
        return collection.isEmpty() ? str : String.format("%s [%s]", str, collection.stream().map(chainedSourceInfo -> {
            return chainedSourceInfo.getOperatorConfig().getOperatorName();
        }).collect(Collectors.joining(", ")));
    }
}
