/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.graph;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.attribute.Attribute;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.ManagedMemoryUseCase;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils;
import org.apache.flink.streaming.api.graph.NonChainedOutput;
import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.runtime.tasks.StreamTaskException;
import org.apache.flink.streaming.runtime.watermark.AbstractInternalWatermarkDeclaration;
import org.apache.flink.util.ClassLoaderUtil;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.OutputTag;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.concurrent.FutureUtils;

@Internal
public class StreamConfig
implements Serializable {
    private static final long serialVersionUID = 1L;
    public static final String SERIALIZED_UDF = "serializedUDF";
    public static final String SERIALIZED_UDF_CLASS = "serializedUdfClass";
    private static final ConfigOption<Integer> NUMBER_OF_OUTPUTS = ConfigOptions.key((String)"numberOfOutputs").intType().defaultValue((Object)0);
    private static final ConfigOption<Integer> NUMBER_OF_NETWORK_INPUTS = ConfigOptions.key((String)"numberOfNetworkInputs").intType().defaultValue((Object)0);
    private static final String CHAINED_OUTPUTS = "chainedOutputs";
    private static final String CHAINED_TASK_CONFIG = "chainedTaskConfig_";
    private static final ConfigOption<Boolean> IS_CHAINED_VERTEX = ConfigOptions.key((String)"isChainedSubtask").booleanType().defaultValue((Object)false);
    private static final ConfigOption<Integer> CHAIN_INDEX = ConfigOptions.key((String)"chainIndex").intType().defaultValue((Object)0);
    private static final ConfigOption<Integer> VERTEX_NAME = ConfigOptions.key((String)"vertexID").intType().defaultValue((Object)-1);
    private static final String ITERATION_ID = "iterationId";
    private static final String INPUTS = "inputs";
    private static final String TYPE_SERIALIZER_OUT_1 = "typeSerializer_out";
    private static final String TYPE_SERIALIZER_SIDEOUT_PREFIX = "typeSerializer_sideout_";
    private static final ConfigOption<Long> ITERATON_WAIT = ConfigOptions.key((String)"iterationWait").longType().defaultValue((Object)0L);
    private static final String OP_NONCHAINED_OUTPUTS = "opNonChainedOutputs";
    private static final String VERTEX_NONCHAINED_OUTPUTS = "vertexNonChainedOutputs";
    private static final String IN_STREAM_EDGES = "inStreamEdges";
    private static final String OPERATOR_NAME = "operatorName";
    private static final String OPERATOR_ID = "operatorID";
    private static final ConfigOption<Boolean> CHAIN_END = ConfigOptions.key((String)"chainEnd").booleanType().defaultValue((Object)false);
    private static final ConfigOption<Boolean> GRAPH_CONTAINING_LOOPS = ConfigOptions.key((String)"graphContainingLoops").booleanType().defaultValue((Object)false);
    private static final String ADDITIONAL_METRIC_VARIABLES = "additionalmetricvariables";
    private static final String CHECKPOINT_STORAGE = "checkpointstorage";
    private static final String STATE_BACKEND = "statebackend";
    private static final String TIMER_SERVICE_PROVIDER = "timerservice";
    private static final String STATE_PARTITIONER = "statePartitioner";
    private static final String STATE_KEY_SERIALIZER = "statekeyser";
    private static final String MANAGED_MEMORY_FRACTION_PREFIX = "managedMemFraction.";
    private static final String ATTRIBUTE = "attribute";
    private static final String WATERMARK_DECLARATIONS = "watermarkDeclarations";
    private Set<AbstractInternalWatermarkDeclaration<?>> deserializedWatermarkDeclarations;
    private static final ConfigOption<Boolean> STATE_BACKEND_USE_MANAGED_MEMORY = ConfigOptions.key((String)"statebackend.useManagedMemory").booleanType().noDefaultValue().withDescription("If state backend is specified, whether it uses managed memory.");
    private static final double DEFAULT_MANAGED_MEMORY_FRACTION = 0.0;
    private final Configuration config;
    private final transient Map<String, Object> toBeSerializedConfigObjects = new HashMap<String, Object>();
    private final transient Map<Integer, CompletableFuture<StreamConfig>> chainedTaskFutures = new HashMap<Integer, CompletableFuture<StreamConfig>>();
    private final transient CompletableFuture<StreamConfig> serializationFuture = new CompletableFuture();
    private final Set<String> removedKeys = new HashSet<String>();

    public StreamConfig(Configuration config) {
        this.config = config;
    }

    public Configuration getConfiguration() {
        return this.config;
    }

    public CompletableFuture<StreamConfig> getSerializationFuture() {
        return this.serializationFuture;
    }

    public CompletableFuture<StreamConfig> triggerSerializationAndReturnFuture(Executor ioExecutor) {
        FutureUtils.combineAll(this.chainedTaskFutures.values()).thenAcceptAsync(chainedConfigs -> {
            try {
                this.serializeAllConfigs();
                InstantiationUtil.writeObjectToConfig(chainedConfigs.stream().collect(Collectors.toMap(StreamConfig::getVertexID, Function.identity())), (Configuration)this.config, (String)CHAINED_TASK_CONFIG);
                this.serializationFuture.complete(this);
            }
            catch (Throwable throwable) {
                this.serializationFuture.completeExceptionally(throwable);
            }
        }, ioExecutor);
        return this.serializationFuture;
    }

    public void serializeAllConfigs() {
        this.toBeSerializedConfigObjects.forEach((key, object) -> {
            try {
                InstantiationUtil.writeObjectToConfig((Object)object, (Configuration)this.config, (String)key);
            }
            catch (IOException e) {
                throw new StreamTaskException(String.format("Could not serialize object for key %s.", key), e);
            }
        });
    }

    @VisibleForTesting
    public void setAndSerializeTransitiveChainedTaskConfigs(Map<Integer, StreamConfig> chainedTaskConfigs) {
        try {
            InstantiationUtil.writeObjectToConfig(chainedTaskConfigs, (Configuration)this.config, (String)CHAINED_TASK_CONFIG);
        }
        catch (IOException e) {
            throw new StreamTaskException("Could not serialize object for key chained task config.", e);
        }
    }

    public void setVertexID(Integer vertexID) {
        this.config.set(VERTEX_NAME, (Object)vertexID);
    }

    public Integer getVertexID() {
        return (Integer)this.config.get(VERTEX_NAME);
    }

    public void setManagedMemoryFractionOperatorOfUseCase(ManagedMemoryUseCase managedMemoryUseCase, double fraction) {
        ConfigOption<Double> configOption = StreamConfig.getManagedMemoryFractionConfigOption(managedMemoryUseCase);
        Preconditions.checkArgument((fraction >= 0.0 && fraction <= 1.0 ? 1 : 0) != 0, (Object)String.format("%s should be in range [0.0, 1.0], but was: %s", configOption.key(), fraction));
        this.config.set(configOption, (Object)fraction);
    }

    public double getManagedMemoryFractionOperatorUseCaseOfSlot(ManagedMemoryUseCase managedMemoryUseCase, Configuration jobConfig, Configuration taskManagerConfig, ClassLoader cl) {
        return ManagedMemoryUtils.convertToFractionOfSlot(managedMemoryUseCase, (Double)this.config.get(StreamConfig.getManagedMemoryFractionConfigOption(managedMemoryUseCase)), this.getAllManagedMemoryUseCases(), jobConfig, taskManagerConfig, this.config.getOptional(STATE_BACKEND_USE_MANAGED_MEMORY), cl);
    }

    private static ConfigOption<Double> getManagedMemoryFractionConfigOption(ManagedMemoryUseCase managedMemoryUseCase) {
        return ConfigOptions.key((String)(MANAGED_MEMORY_FRACTION_PREFIX + Preconditions.checkNotNull((Object)managedMemoryUseCase))).doubleType().defaultValue((Object)0.0);
    }

    private Set<ManagedMemoryUseCase> getAllManagedMemoryUseCases() {
        return this.config.keySet().stream().filter(key -> key.startsWith(MANAGED_MEMORY_FRACTION_PREFIX)).map(key -> ManagedMemoryUseCase.valueOf((String)key.replaceFirst(MANAGED_MEMORY_FRACTION_PREFIX, ""))).collect(Collectors.toSet());
    }

    public void setTypeSerializerOut(TypeSerializer<?> serializer) {
        this.setTypeSerializer(TYPE_SERIALIZER_OUT_1, serializer);
    }

    public <T> TypeSerializer<T> getTypeSerializerOut(ClassLoader cl) {
        try {
            return (TypeSerializer)InstantiationUtil.readObjectFromConfig((Configuration)this.config, (String)TYPE_SERIALIZER_OUT_1, (ClassLoader)cl);
        }
        catch (Exception e) {
            throw new StreamTaskException("Could not instantiate serializer.", e);
        }
    }

    public void setTypeSerializerSideOut(OutputTag<?> outputTag, TypeSerializer<?> serializer) {
        this.setTypeSerializer(TYPE_SERIALIZER_SIDEOUT_PREFIX + outputTag.getId(), serializer);
    }

    private void setTypeSerializer(String key, TypeSerializer<?> typeWrapper) {
        this.toBeSerializedConfigObjects.put(key, typeWrapper);
    }

    public <T> TypeSerializer<T> getTypeSerializerSideOut(OutputTag<?> outputTag, ClassLoader cl) {
        Preconditions.checkNotNull(outputTag, (String)"Side output id must not be null.");
        try {
            return (TypeSerializer)InstantiationUtil.readObjectFromConfig((Configuration)this.config, (String)(TYPE_SERIALIZER_SIDEOUT_PREFIX + outputTag.getId()), (ClassLoader)cl);
        }
        catch (Exception e) {
            throw new StreamTaskException("Could not instantiate serializer.", e);
        }
    }

    public void setWatermarkDeclarations(byte[] serializedWatermarkDeclarations) {
        if (serializedWatermarkDeclarations != null) {
            this.config.setBytes(WATERMARK_DECLARATIONS, serializedWatermarkDeclarations);
        }
    }

    public Set<AbstractInternalWatermarkDeclaration<?>> getWatermarkDeclarations(ClassLoader cl) {
        if (this.deserializedWatermarkDeclarations != null) {
            return this.deserializedWatermarkDeclarations;
        }
        try {
            Set watermarkDeclarations = (Set)InstantiationUtil.readObjectFromConfig((Configuration)this.config, (String)WATERMARK_DECLARATIONS, (ClassLoader)cl);
            this.deserializedWatermarkDeclarations = watermarkDeclarations == null ? Collections.emptySet() : watermarkDeclarations;
            return this.deserializedWatermarkDeclarations;
        }
        catch (Exception e) {
            throw new StreamTaskException("Could not instantiate serializer.", e);
        }
    }

    public void setupNetworkInputs(TypeSerializer<?> ... serializers) {
        InputConfig[] inputs = new InputConfig[serializers.length];
        for (int i = 0; i < serializers.length; ++i) {
            inputs[i] = new NetworkInputConfig(serializers[i], i, InputRequirement.PASS_THROUGH);
        }
        this.setInputs(inputs);
    }

    public void setInputs(InputConfig ... inputs) {
        this.toBeSerializedConfigObjects.put(INPUTS, inputs);
    }

    public InputConfig[] getInputs(ClassLoader cl) {
        try {
            InputConfig[] inputs = (InputConfig[])InstantiationUtil.readObjectFromConfig((Configuration)this.config, (String)INPUTS, (ClassLoader)cl);
            if (inputs == null) {
                return new InputConfig[0];
            }
            return inputs;
        }
        catch (Exception e) {
            throw new StreamTaskException("Could not deserialize inputs", e);
        }
    }

    @Deprecated
    public <T> TypeSerializer<T> getTypeSerializerIn1(ClassLoader cl) {
        return this.getTypeSerializerIn(0, cl);
    }

    @Deprecated
    public <T> TypeSerializer<T> getTypeSerializerIn2(ClassLoader cl) {
        return this.getTypeSerializerIn(1, cl);
    }

    public <T> TypeSerializer<T> getTypeSerializerIn(int index, ClassLoader cl) {
        InputConfig[] inputs = this.getInputs(cl);
        Preconditions.checkState((index < inputs.length ? 1 : 0) != 0);
        Preconditions.checkState((boolean)(inputs[index] instanceof NetworkInputConfig), (String)"Input [%s] was assumed to be network input", (Object[])new Object[]{index});
        return ((NetworkInputConfig)inputs[index]).typeSerializer;
    }

    @VisibleForTesting
    public void setStreamOperator(StreamOperator<?> operator) {
        this.setStreamOperatorFactory(SimpleOperatorFactory.of(operator));
    }

    public void setStreamOperatorFactory(StreamOperatorFactory<?> factory) {
        if (factory != null) {
            this.toBeSerializedConfigObjects.put(SERIALIZED_UDF, factory);
            this.toBeSerializedConfigObjects.put(SERIALIZED_UDF_CLASS, factory.getClass());
        }
    }

    @VisibleForTesting
    public <T extends StreamOperator<?>> T getStreamOperator(ClassLoader cl) {
        SimpleOperatorFactory factory = (SimpleOperatorFactory)this.getStreamOperatorFactory(cl);
        return (T)factory.getOperator();
    }

    public <T extends StreamOperatorFactory<?>> T getStreamOperatorFactory(ClassLoader cl) {
        try {
            Preconditions.checkState((!this.removedKeys.contains(SERIALIZED_UDF) ? 1 : 0) != 0, (Object)String.format("%s has been removed.", SERIALIZED_UDF));
            return (T)((StreamOperatorFactory)InstantiationUtil.readObjectFromConfig((Configuration)this.config, (String)SERIALIZED_UDF, (ClassLoader)cl));
        }
        catch (ClassNotFoundException e) {
            String classLoaderInfo = ClassLoaderUtil.getUserCodeClassLoaderInfo((ClassLoader)cl);
            boolean loadableDoubleCheck = ClassLoaderUtil.validateClassLoadable((ClassNotFoundException)e, (ClassLoader)cl);
            String exceptionMessage = "Cannot load user class: " + e.getMessage() + "\nClassLoader info: " + classLoaderInfo + (loadableDoubleCheck ? "\nClass was actually found in classloader - deserialization issue." : "\nClass not resolvable through given classloader.");
            throw new StreamTaskException(exceptionMessage, e);
        }
        catch (Exception e) {
            throw new StreamTaskException("Cannot instantiate user function.", e);
        }
    }

    public <T extends StreamOperatorFactory<?>> Class<T> getStreamOperatorFactoryClass(ClassLoader cl) {
        try {
            return (Class)InstantiationUtil.readObjectFromConfig((Configuration)this.config, (String)SERIALIZED_UDF_CLASS, (ClassLoader)cl);
        }
        catch (Exception e) {
            throw new StreamTaskException("Could not instantiate serialized udf class.", e);
        }
    }

    public void setIterationId(String iterationId) {
        this.config.setString(ITERATION_ID, iterationId);
    }

    public String getIterationId() {
        return this.config.getString(ITERATION_ID, "");
    }

    public void setIterationWaitTime(long time) {
        this.config.set(ITERATON_WAIT, (Object)time);
    }

    public long getIterationWaitTime() {
        return (Long)this.config.get(ITERATON_WAIT);
    }

    public void setNumberOfNetworkInputs(int numberOfInputs) {
        this.config.set(NUMBER_OF_NETWORK_INPUTS, (Object)numberOfInputs);
    }

    public int getNumberOfNetworkInputs() {
        return (Integer)this.config.get(NUMBER_OF_NETWORK_INPUTS);
    }

    public void setNumberOfOutputs(int numberOfOutputs) {
        this.config.set(NUMBER_OF_OUTPUTS, (Object)numberOfOutputs);
    }

    public int getNumberOfOutputs() {
        return (Integer)this.config.get(NUMBER_OF_OUTPUTS);
    }

    public void setOperatorNonChainedOutputs(List<NonChainedOutput> nonChainedOutputs) {
        this.toBeSerializedConfigObjects.put(OP_NONCHAINED_OUTPUTS, nonChainedOutputs);
    }

    public List<NonChainedOutput> getOperatorNonChainedOutputs(ClassLoader cl) {
        try {
            List nonChainedOutputs = (List)InstantiationUtil.readObjectFromConfig((Configuration)this.config, (String)OP_NONCHAINED_OUTPUTS, (ClassLoader)cl);
            return nonChainedOutputs == null ? new ArrayList() : nonChainedOutputs;
        }
        catch (Exception e) {
            throw new StreamTaskException("Could not instantiate non chained outputs.", e);
        }
    }

    public void setChainedOutputs(List<StreamEdge> chainedOutputs) {
        this.toBeSerializedConfigObjects.put(CHAINED_OUTPUTS, chainedOutputs);
    }

    public List<StreamEdge> getChainedOutputs(ClassLoader cl) {
        try {
            List chainedOutputs = (List)InstantiationUtil.readObjectFromConfig((Configuration)this.config, (String)CHAINED_OUTPUTS, (ClassLoader)cl);
            return chainedOutputs == null ? new ArrayList() : chainedOutputs;
        }
        catch (Exception e) {
            throw new StreamTaskException("Could not instantiate chained outputs.", e);
        }
    }

    public void setInPhysicalEdges(List<StreamEdge> inEdges) {
        this.toBeSerializedConfigObjects.put(IN_STREAM_EDGES, inEdges);
    }

    public List<StreamEdge> getInPhysicalEdges(ClassLoader cl) {
        try {
            List inEdges = (List)InstantiationUtil.readObjectFromConfig((Configuration)this.config, (String)IN_STREAM_EDGES, (ClassLoader)cl);
            return inEdges == null ? new ArrayList() : inEdges;
        }
        catch (Exception e) {
            throw new StreamTaskException("Could not instantiate inputs.", e);
        }
    }

    public void setVertexNonChainedOutputs(List<NonChainedOutput> nonChainedOutputs) {
        this.toBeSerializedConfigObjects.put(VERTEX_NONCHAINED_OUTPUTS, nonChainedOutputs);
    }

    public List<NonChainedOutput> getVertexNonChainedOutputs(ClassLoader cl) {
        try {
            List nonChainedOutputs = (List)InstantiationUtil.readObjectFromConfig((Configuration)this.config, (String)VERTEX_NONCHAINED_OUTPUTS, (ClassLoader)cl);
            return nonChainedOutputs == null ? new ArrayList() : nonChainedOutputs;
        }
        catch (Exception e) {
            throw new StreamTaskException("Could not instantiate outputs in order.", e);
        }
    }

    public void setTransitiveChainedTaskConfigs(Map<Integer, StreamConfig> chainedTaskConfigs) {
        if (chainedTaskConfigs != null) {
            chainedTaskConfigs.forEach((id, config) -> this.chainedTaskFutures.put((Integer)id, config.getSerializationFuture()));
        }
    }

    public Map<Integer, StreamConfig> getTransitiveChainedTaskConfigs(ClassLoader cl) {
        try {
            Preconditions.checkState((!this.removedKeys.contains(CHAINED_TASK_CONFIG) ? 1 : 0) != 0, (Object)String.format("%s has been removed.", CHAINED_TASK_CONFIG));
            Map confs = (Map)InstantiationUtil.readObjectFromConfig((Configuration)this.config, (String)CHAINED_TASK_CONFIG, (ClassLoader)cl);
            return confs == null ? new HashMap() : confs;
        }
        catch (Exception e) {
            throw new StreamTaskException("Could not instantiate configuration.", e);
        }
    }

    public Map<Integer, StreamConfig> getTransitiveChainedTaskConfigsWithSelf(ClassLoader cl) {
        Map<Integer, StreamConfig> chainedTaskConfigs = this.getTransitiveChainedTaskConfigs(cl);
        chainedTaskConfigs.put(this.getVertexID(), this);
        return chainedTaskConfigs;
    }

    public void setOperatorID(OperatorID operatorID) {
        this.config.setBytes(OPERATOR_ID, operatorID.getBytes());
    }

    public OperatorID getOperatorID() {
        byte[] operatorIDBytes = this.config.getBytes(OPERATOR_ID, null);
        return new OperatorID((byte[])Preconditions.checkNotNull((Object)operatorIDBytes));
    }

    public void setOperatorName(String name) {
        this.config.setString(OPERATOR_NAME, name);
    }

    public String getOperatorName() {
        return this.config.getString(OPERATOR_NAME, null);
    }

    public void setChainIndex(int index) {
        this.config.set(CHAIN_INDEX, (Object)index);
    }

    public int getChainIndex() {
        return (Integer)this.config.get(CHAIN_INDEX);
    }

    public void setStateBackend(StateBackend backend) {
        if (backend != null) {
            this.toBeSerializedConfigObjects.put(STATE_BACKEND, backend);
            this.setStateBackendUsesManagedMemory(backend.useManagedMemory());
        }
    }

    @VisibleForTesting
    public void setStateBackendUsesManagedMemory(boolean usesManagedMemory) {
        this.config.set(STATE_BACKEND_USE_MANAGED_MEMORY, (Object)usesManagedMemory);
    }

    public void setSerializedStateBackend(SerializedValue<StateBackend> serializedStateBackend, boolean useManagedMemory) {
        if (serializedStateBackend != null) {
            this.config.setBytes(STATE_BACKEND, serializedStateBackend.getByteArray());
            this.setStateBackendUsesManagedMemory(useManagedMemory);
        }
    }

    public void setSerializedCheckpointStorage(SerializedValue<CheckpointStorage> serializedCheckpointStorage) {
        if (serializedCheckpointStorage != null) {
            this.config.setBytes(CHECKPOINT_STORAGE, serializedCheckpointStorage.getByteArray());
        }
    }

    public StateBackend getStateBackend(ClassLoader cl) {
        try {
            return (StateBackend)InstantiationUtil.readObjectFromConfig((Configuration)this.config, (String)STATE_BACKEND, (ClassLoader)cl);
        }
        catch (Exception e) {
            throw new StreamTaskException("Could not instantiate statehandle provider.", e);
        }
    }

    public void setAdditionalMetricVariables(@Nullable Map<String, String> additionalMetricVariables) {
        if (additionalMetricVariables != null) {
            this.toBeSerializedConfigObjects.put(ADDITIONAL_METRIC_VARIABLES, additionalMetricVariables);
        }
    }

    public Map<String, String> getAdditionalMetricVariables() {
        try {
            Map additionalMetricVariables = (Map)InstantiationUtil.readObjectFromConfig((Configuration)this.config, (String)ADDITIONAL_METRIC_VARIABLES, (ClassLoader)this.getClass().getClassLoader());
            return additionalMetricVariables == null ? Collections.emptyMap() : additionalMetricVariables;
        }
        catch (Exception e) {
            throw new StreamTaskException("Could not instantiate additional metric variables.", e);
        }
    }

    @VisibleForTesting
    public void setCheckpointStorage(CheckpointStorage storage) {
        if (storage != null) {
            this.toBeSerializedConfigObjects.put(CHECKPOINT_STORAGE, storage);
        }
    }

    public CheckpointStorage getCheckpointStorage(ClassLoader cl) {
        try {
            return (CheckpointStorage)InstantiationUtil.readObjectFromConfig((Configuration)this.config, (String)CHECKPOINT_STORAGE, (ClassLoader)cl);
        }
        catch (Exception e) {
            throw new StreamTaskException("Could not instantiate checkpoint storage.", e);
        }
    }

    public void setTimerServiceProvider(InternalTimeServiceManager.Provider timerServiceProvider) {
        if (timerServiceProvider != null) {
            this.toBeSerializedConfigObjects.put(TIMER_SERVICE_PROVIDER, timerServiceProvider);
        }
    }

    public InternalTimeServiceManager.Provider getTimerServiceProvider(ClassLoader cl) {
        try {
            return (InternalTimeServiceManager.Provider)InstantiationUtil.readObjectFromConfig((Configuration)this.config, (String)TIMER_SERVICE_PROVIDER, (ClassLoader)cl);
        }
        catch (Exception e) {
            throw new StreamTaskException("Could not instantiate timer service provider.", e);
        }
    }

    public void setStatePartitioner(int input, KeySelector<?, ?> partitioner) {
        this.toBeSerializedConfigObjects.put(STATE_PARTITIONER + input, partitioner);
    }

    public <IN, K extends Serializable> KeySelector<IN, K> getStatePartitioner(int input, ClassLoader cl) {
        try {
            return (KeySelector)InstantiationUtil.readObjectFromConfig((Configuration)this.config, (String)(STATE_PARTITIONER + input), (ClassLoader)cl);
        }
        catch (Exception e) {
            throw new StreamTaskException("Could not instantiate state partitioner.", e);
        }
    }

    public void setStateKeySerializer(TypeSerializer<?> serializer) {
        this.toBeSerializedConfigObjects.put(STATE_KEY_SERIALIZER, serializer);
    }

    public <K> TypeSerializer<K> getStateKeySerializer(ClassLoader cl) {
        try {
            return (TypeSerializer)InstantiationUtil.readObjectFromConfig((Configuration)this.config, (String)STATE_KEY_SERIALIZER, (ClassLoader)cl);
        }
        catch (Exception e) {
            throw new StreamTaskException("Could not instantiate state key serializer from task config.", e);
        }
    }

    public void setChainStart() {
        this.config.set(IS_CHAINED_VERTEX, (Object)true);
    }

    public boolean isChainStart() {
        return (Boolean)this.config.get(IS_CHAINED_VERTEX);
    }

    public void setChainEnd() {
        this.config.set(CHAIN_END, (Object)true);
    }

    public boolean isChainEnd() {
        return (Boolean)this.config.get(CHAIN_END);
    }

    public String toString() {
        ClassLoader cl = this.getClass().getClassLoader();
        StringBuilder builder = new StringBuilder();
        builder.append("\n=======================");
        builder.append("Stream Config");
        builder.append("=======================");
        builder.append("\nNumber of non-chained inputs: ").append(this.getNumberOfNetworkInputs());
        builder.append("\nNumber of non-chained outputs: ").append(this.getNumberOfOutputs());
        builder.append("\nOutput names: ").append(this.getOperatorNonChainedOutputs(cl));
        builder.append("\nPartitioning:");
        for (NonChainedOutput output : this.getOperatorNonChainedOutputs(cl)) {
            String outputName = output.getDataSetId().toString();
            builder.append("\n\t").append(outputName).append(": ").append(output.getPartitioner());
        }
        builder.append("\nChained subtasks: ").append(this.getChainedOutputs(cl));
        try {
            builder.append("\nOperator: ").append(this.getStreamOperatorFactoryClass(cl).getSimpleName());
        }
        catch (Exception e) {
            builder.append("\nOperator: Missing");
        }
        if (this.isChainStart() && this.getChainedOutputs(cl).size() > 0) {
            builder.append("\n\n\n---------------------\nChained task configs\n---------------------\n");
            builder.append(this.getTransitiveChainedTaskConfigs(cl));
        }
        return builder.toString();
    }

    public void setGraphContainingLoops(boolean graphContainingLoops) {
        this.config.set(GRAPH_CONTAINING_LOOPS, (Object)graphContainingLoops);
    }

    public boolean isGraphContainingLoops() {
        return (Boolean)this.config.get(GRAPH_CONTAINING_LOOPS);
    }

    public void setAttribute(Attribute attribute) {
        if (attribute != null) {
            this.toBeSerializedConfigObjects.put(ATTRIBUTE, attribute);
        }
    }

    public Attribute getAttribute(ClassLoader cl) {
        try {
            return (Attribute)InstantiationUtil.readObjectFromConfig((Configuration)this.config, (String)ATTRIBUTE, (ClassLoader)cl);
        }
        catch (Exception e) {
            throw new StreamTaskException("Could not instantiate checkpoint storage.", e);
        }
    }

    public void clearInitialConfigs() {
        this.removedKeys.add(SERIALIZED_UDF);
        this.config.removeKey(SERIALIZED_UDF);
        this.removedKeys.add(CHAINED_TASK_CONFIG);
        this.config.removeKey(CHAINED_TASK_CONFIG);
    }

    public static boolean requiresSorting(InputConfig inputConfig) {
        return inputConfig instanceof NetworkInputConfig && ((NetworkInputConfig)inputConfig).getInputRequirement() == InputRequirement.SORTED;
    }

    public static class SourceInputConfig
    implements InputConfig {
        private final StreamEdge inputEdge;

        public SourceInputConfig(StreamEdge inputEdge) {
            this.inputEdge = inputEdge;
        }

        public StreamEdge getInputEdge() {
            return this.inputEdge;
        }

        public String toString() {
            return this.inputEdge.toString();
        }

        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof SourceInputConfig)) {
                return false;
            }
            SourceInputConfig other = (SourceInputConfig)obj;
            return Objects.equals(other.inputEdge, this.inputEdge);
        }

        public int hashCode() {
            return this.inputEdge.hashCode();
        }
    }

    public static class NetworkInputConfig
    implements InputConfig {
        private final TypeSerializer<?> typeSerializer;
        private final InputRequirement inputRequirement;
        private int inputGateIndex;

        public NetworkInputConfig(TypeSerializer<?> typeSerializer, int inputGateIndex) {
            this(typeSerializer, inputGateIndex, InputRequirement.PASS_THROUGH);
        }

        public NetworkInputConfig(TypeSerializer<?> typeSerializer, int inputGateIndex, InputRequirement inputRequirement) {
            this.typeSerializer = typeSerializer;
            this.inputGateIndex = inputGateIndex;
            this.inputRequirement = inputRequirement;
        }

        public TypeSerializer<?> getTypeSerializer() {
            return this.typeSerializer;
        }

        public int getInputGateIndex() {
            return this.inputGateIndex;
        }

        public InputRequirement getInputRequirement() {
            return this.inputRequirement;
        }
    }

    public static interface InputConfig
    extends Serializable {
    }

    public static enum InputRequirement {
        SORTED,
        PASS_THROUGH;

    }
}

