/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.graph;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.api.common.operators.SlotSharingGroup;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.StateRecoveryOptions;
import org.apache.flink.core.memory.ManagedMemoryUseCase;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.CachedDataStream;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction;
import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
import org.apache.flink.streaming.api.graph.StreamNode;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
import org.apache.flink.streaming.api.operators.SimpleUdfStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.SourceOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.transformations.CacheTransformation;
import org.apache.flink.streaming.api.transformations.MultipleInputTransformation;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.apache.flink.streaming.api.transformations.StreamExchangeMode;
import org.apache.flink.streaming.api.transformations.StubTransformation;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperatorFactory;
import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
import org.apache.flink.streaming.runtime.partitioner.GlobalPartitioner;
import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
import org.apache.flink.streaming.runtime.partitioner.ShufflePartitioner;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.util.NoOpIntMap;
import org.apache.flink.streaming.util.TestExpandingSink;
import org.apache.flink.util.AbstractID;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.Condition;
import org.assertj.core.api.HamcrestCondition;
import org.assertj.core.api.InstanceOfAssertFactories;
import org.assertj.core.api.ListAssert;
import org.assertj.core.api.ObjectAssert;
import org.assertj.core.api.ThrowingConsumer;
import org.hamcrest.Description;
import org.hamcrest.FeatureMatcher;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.hamcrest.TypeSafeMatcher;
import org.junit.jupiter.api.Test;

class StreamGraphGeneratorTest {
    StreamGraphGeneratorTest() {
    }

    @Test
    void generatorForwardsSavepointRestoreSettings() {
        StreamGraphGenerator streamGraphGenerator = new StreamGraphGenerator(Collections.emptyList(), new ExecutionConfig(), new CheckpointConfig());
        streamGraphGenerator.setSavepointRestoreSettings(SavepointRestoreSettings.forPath((String)"hello"));
        StreamGraph streamGraph = streamGraphGenerator.generate();
        Assertions.assertThat((String)streamGraph.getSavepointRestoreSettings().getRestorePath()).isEqualTo("hello");
    }

    @Test
    void testBufferTimeout() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setBufferTimeout(77L);
        env.fromData((Object[])new Integer[]{1, 2, 3, 4, 5}).map((MapFunction & Serializable)value -> value).setBufferTimeout(-1L).name("A").map((MapFunction & Serializable)value -> value).setBufferTimeout(0L).name("B").map((MapFunction & Serializable)value -> value).setBufferTimeout(12L).name("C").map((MapFunction & Serializable)value -> value).name("D");
        StreamGraph sg = env.getStreamGraph();
        block12: for (StreamNode node : sg.getStreamNodes()) {
            switch (node.getOperatorName()) {
                case "A": {
                    Assertions.assertThat((long)77L).isEqualTo(node.getBufferTimeout());
                    continue block12;
                }
                case "B": {
                    Assertions.assertThat((long)node.getBufferTimeout()).isEqualTo(0L);
                    continue block12;
                }
                case "C": {
                    Assertions.assertThat((long)node.getBufferTimeout()).isEqualTo(12L);
                    continue block12;
                }
                case "D": {
                    Assertions.assertThat((long)node.getBufferTimeout()).isEqualTo(77L);
                    continue block12;
                }
            }
            Assertions.assertThat((Object)node.getOperatorFactory()).isInstanceOf(SourceOperatorFactory.class);
        }
    }

    @Test
    void testVirtualTransformations() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource source = env.fromData((Object[])new Integer[]{1, 10});
        SingleOutputStreamOperator rebalanceMap = source.rebalance().map((MapFunction)new NoOpIntMap());
        SingleOutputStreamOperator broadcastMap = rebalanceMap.forward().global().broadcast().map((MapFunction)new NoOpIntMap());
        broadcastMap.sinkTo((Sink)new DiscardingSink());
        SingleOutputStreamOperator broadcastOperator = rebalanceMap.map((MapFunction)new NoOpIntMap()).name("broadcast");
        DataStream map1 = broadcastOperator.broadcast();
        SingleOutputStreamOperator globalOperator = rebalanceMap.map((MapFunction)new NoOpIntMap()).name("global");
        DataStream map2 = globalOperator.global();
        SingleOutputStreamOperator shuffleOperator = rebalanceMap.map((MapFunction)new NoOpIntMap()).name("shuffle");
        DataStream map3 = shuffleOperator.shuffle();
        SingleOutputStreamOperator unionedMap = map1.union(new DataStream[]{map2}).union(new DataStream[]{map3}).map((MapFunction)new NoOpIntMap()).name("union");
        unionedMap.sinkTo((Sink)new DiscardingSink());
        StreamGraph graph = env.getStreamGraph();
        Assertions.assertThat((Object)((StreamEdge)graph.getStreamNode(Integer.valueOf(rebalanceMap.getId())).getInEdges().get(0)).getPartitioner()).isInstanceOf(RebalancePartitioner.class);
        Assertions.assertThat((Object)((StreamEdge)graph.getStreamNode(Integer.valueOf(broadcastMap.getId())).getInEdges().get(0)).getPartitioner()).isInstanceOf(BroadcastPartitioner.class);
        Assertions.assertThat((int)graph.getSourceVertex((StreamEdge)graph.getStreamNode(Integer.valueOf(broadcastMap.getId())).getInEdges().get(0)).getId()).isEqualTo(rebalanceMap.getId());
        Assertions.assertThat((Object)((StreamEdge)graph.getStreamNode(Integer.valueOf(broadcastOperator.getId())).getOutEdges().get(0)).getPartitioner()).isInstanceOf(BroadcastPartitioner.class);
        Assertions.assertThat((Object)((StreamEdge)graph.getStreamNode(Integer.valueOf(globalOperator.getId())).getOutEdges().get(0)).getPartitioner()).isInstanceOf(GlobalPartitioner.class);
        Assertions.assertThat((Object)((StreamEdge)graph.getStreamNode(Integer.valueOf(shuffleOperator.getId())).getOutEdges().get(0)).getPartitioner()).isInstanceOf(ShufflePartitioner.class);
    }

    @Test
    void testOutputTypeConfigurationWithUdfStreamOperator() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        OutputTypeConfigurableFunction function = new OutputTypeConfigurableFunction();
        DataStreamSource source = env.fromData((Object[])new Integer[]{1, 10});
        NoOpUdfOperator udfOperator = new NoOpUdfOperator(function);
        source.transform("no-op udf operator", (TypeInformation)BasicTypeInfo.INT_TYPE_INFO, udfOperator).sinkTo((Sink)new DiscardingSink());
        env.getStreamGraph();
        Assertions.assertThat(udfOperator).isInstanceOf(AbstractUdfStreamOperator.class);
        Assertions.assertThat(function.getTypeInformation()).isEqualTo((Object)BasicTypeInfo.INT_TYPE_INFO);
    }

    @Test
    void testOutputTypeConfigurationWithOneInputTransformation() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource source = env.fromData((Object[])new Integer[]{1, 10});
        OutputTypeConfigurableOperationWithOneInput outputTypeConfigurableOperation = new OutputTypeConfigurableOperationWithOneInput();
        SingleOutputStreamOperator result = source.transform("Single input and output type configurable operation", (TypeInformation)BasicTypeInfo.INT_TYPE_INFO, (OneInputStreamOperator)outputTypeConfigurableOperation);
        result.sinkTo((Sink)new DiscardingSink());
        env.getStreamGraph();
        Assertions.assertThat(outputTypeConfigurableOperation.getTypeInformation()).isEqualTo((Object)BasicTypeInfo.INT_TYPE_INFO);
    }

    @Test
    void testOutputTypeConfigurationWithTwoInputTransformation() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource source1 = env.fromData((Object[])new Integer[]{1, 10});
        DataStreamSource source2 = env.fromData((Object[])new Integer[]{2, 11});
        ConnectedStreams connectedSource = source1.connect((DataStream)source2);
        OutputTypeConfigurableOperationWithTwoInputs outputTypeConfigurableOperation = new OutputTypeConfigurableOperationWithTwoInputs();
        SingleOutputStreamOperator result = connectedSource.transform("Two input and output type configurable operation", (TypeInformation)BasicTypeInfo.INT_TYPE_INFO, (TwoInputStreamOperator)outputTypeConfigurableOperation);
        result.sinkTo((Sink)new DiscardingSink());
        env.getStreamGraph();
        Assertions.assertThat(outputTypeConfigurableOperation.getTypeInformation()).isEqualTo((Object)BasicTypeInfo.INT_TYPE_INFO);
    }

    @Test
    void testMultipleInputTransformation() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource source1 = env.fromData((Object[])new Integer[]{1, 10});
        DataStreamSource source2 = env.fromData((Object[])new Long[]{2L, 11L});
        DataStreamSource source3 = env.fromData((Object[])new String[]{"42", "44"});
        MultipleInputTransformation transform = new MultipleInputTransformation("My Operator", (StreamOperatorFactory)new MultipleInputOperatorFactory(), (TypeInformation)BasicTypeInfo.STRING_TYPE_INFO, 3);
        env.addOperator((Transformation)transform.addInput(source1.getTransformation()).addInput(source2.getTransformation()).addInput(source3.getTransformation()));
        StreamGraph streamGraph = env.getStreamGraph();
        Assertions.assertThat((int)streamGraph.getStreamNodes().size()).isEqualTo(4);
        Assertions.assertThat((List)streamGraph.getStreamEdges(source1.getId(), transform.getId())).hasSize(1);
        Assertions.assertThat((List)streamGraph.getStreamEdges(source2.getId(), transform.getId())).hasSize(1);
        Assertions.assertThat((List)streamGraph.getStreamEdges(source3.getId(), transform.getId())).hasSize(1);
        Assertions.assertThat((List)streamGraph.getStreamEdges(source1.getId())).hasSize(1);
        Assertions.assertThat((List)streamGraph.getStreamEdges(source2.getId())).hasSize(1);
        Assertions.assertThat((List)streamGraph.getStreamEdges(source3.getId())).hasSize(1);
        Assertions.assertThat((List)streamGraph.getStreamEdges(transform.getId())).hasSize(0);
    }

    @Test
    void testUnalignedCheckpointDisabledOnPointwise() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(42);
        DataStreamSource source1 = env.fromSequence(1L, 10L);
        SingleOutputStreamOperator map1 = source1.forward().map((MapFunction & Serializable)l -> l);
        DataStreamSource source2 = env.fromSequence(2L, 11L);
        SingleOutputStreamOperator map2 = source2.shuffle().map((MapFunction & Serializable)l -> l);
        MapStateDescriptor descriptor = new MapStateDescriptor("broadcast", (TypeInformation)BasicTypeInfo.LONG_TYPE_INFO, (TypeInformation)BasicTypeInfo.LONG_TYPE_INFO);
        BroadcastStream broadcast = map1.broadcast(new MapStateDescriptor[]{descriptor});
        SingleOutputStreamOperator joined = map2.connect(broadcast).process((BroadcastProcessFunction)new BroadcastProcessFunction<Long, Long, Long>(){

            public void processElement(Long value, BroadcastProcessFunction.ReadOnlyContext ctx, Collector<Long> out) {
            }

            public void processBroadcastElement(Long value, BroadcastProcessFunction.Context ctx, Collector<Long> out) {
            }
        });
        SingleOutputStreamOperator map3 = joined.shuffle().map((MapFunction & Serializable)l -> l);
        SingleOutputStreamOperator map4 = map3.rescale().map((MapFunction & Serializable)l -> l).setParallelism(1337);
        StreamGraph streamGraph = env.getStreamGraph();
        Assertions.assertThat((int)streamGraph.getStreamNodes().size()).isEqualTo(7);
        Assertions.assertThat((Object)StreamGraphGeneratorTest.edge(streamGraph, (DataStream<Long>)source1, (DataStream<Long>)map1)).is((Condition)HamcrestCondition.matching(StreamGraphGeneratorTest.supportsUnalignedCheckpoints(false)));
        Assertions.assertThat((Object)StreamGraphGeneratorTest.edge(streamGraph, (DataStream<Long>)source2, (DataStream<Long>)map2)).is((Condition)HamcrestCondition.matching(StreamGraphGeneratorTest.supportsUnalignedCheckpoints(true)));
        Assertions.assertThat((Object)StreamGraphGeneratorTest.edge(streamGraph, (DataStream<Long>)map1, (DataStream<Long>)joined)).is((Condition)HamcrestCondition.matching(StreamGraphGeneratorTest.supportsUnalignedCheckpoints(false)));
        Assertions.assertThat((Object)StreamGraphGeneratorTest.edge(streamGraph, (DataStream<Long>)map2, (DataStream<Long>)joined)).is((Condition)HamcrestCondition.matching(StreamGraphGeneratorTest.supportsUnalignedCheckpoints(false)));
        Assertions.assertThat((Object)StreamGraphGeneratorTest.edge(streamGraph, (DataStream<Long>)joined, (DataStream<Long>)map3)).is((Condition)HamcrestCondition.matching(StreamGraphGeneratorTest.supportsUnalignedCheckpoints(true)));
        Assertions.assertThat((Object)StreamGraphGeneratorTest.edge(streamGraph, (DataStream<Long>)map3, (DataStream<Long>)map4)).is((Condition)HamcrestCondition.matching(StreamGraphGeneratorTest.supportsUnalignedCheckpoints(false)));
    }

    @Test
    void testUnalignedCheckpointDisabledOnBroadcast() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(42);
        DataStreamSource source1 = env.fromSequence(1L, 10L);
        SingleOutputStreamOperator map1 = source1.broadcast().map((MapFunction & Serializable)l -> l);
        DataStreamSource source2 = env.fromSequence(2L, 11L);
        KeyedStream keyed = source2.keyBy((KeySelector & Serializable)r -> 0L);
        MapStateDescriptor descriptor = new MapStateDescriptor("broadcast", (TypeInformation)BasicTypeInfo.LONG_TYPE_INFO, (TypeInformation)BasicTypeInfo.LONG_TYPE_INFO);
        BroadcastStream broadcast = map1.broadcast(new MapStateDescriptor[]{descriptor});
        SingleOutputStreamOperator joined = keyed.connect(broadcast).process((KeyedBroadcastProcessFunction)new KeyedBroadcastProcessFunction<Long, Long, Long, Long>(){

            public void processElement(Long value, KeyedBroadcastProcessFunction.ReadOnlyContext ctx, Collector<Long> out) {
            }

            public void processBroadcastElement(Long value, KeyedBroadcastProcessFunction.Context ctx, Collector<Long> out) {
            }
        });
        StreamGraph streamGraph = env.getStreamGraph();
        Assertions.assertThat((int)streamGraph.getStreamNodes().size()).isEqualTo(4);
        Assertions.assertThat((Object)StreamGraphGeneratorTest.edge(streamGraph, (DataStream<Long>)source1, (DataStream<Long>)map1)).is((Condition)HamcrestCondition.matching(StreamGraphGeneratorTest.supportsUnalignedCheckpoints(false)));
        Assertions.assertThat((Object)StreamGraphGeneratorTest.edge(streamGraph, (DataStream<Long>)source2, (DataStream<Long>)joined)).is((Condition)HamcrestCondition.matching(StreamGraphGeneratorTest.supportsUnalignedCheckpoints(false)));
        Assertions.assertThat((Object)StreamGraphGeneratorTest.edge(streamGraph, (DataStream<Long>)map1, (DataStream<Long>)joined)).is((Condition)HamcrestCondition.matching(StreamGraphGeneratorTest.supportsUnalignedCheckpoints(false)));
    }

    private static StreamEdge edge(StreamGraph streamGraph, DataStream<Long> op1, DataStream<Long> op2) {
        List streamEdges = streamGraph.getStreamEdges(op1.getId(), op2.getId());
        Assertions.assertThat((List)streamEdges).hasSize(1);
        return (StreamEdge)streamEdges.get(0);
    }

    private static Matcher<StreamEdge> supportsUnalignedCheckpoints(boolean enabled) {
        return new FeatureMatcher<StreamEdge, Boolean>(Matchers.equalTo((Object)enabled), "supports unaligned checkpoint", "supports unaligned checkpoint"){

            protected Boolean featureValueOf(StreamEdge actual) {
                return actual.supportsUnalignedCheckpoints();
            }
        };
    }

    @Test
    void testSetupOfKeyGroupPartitioner() {
        int maxParallelism = 42;
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().setMaxParallelism(maxParallelism);
        DataStreamSource source = env.fromData((Object[])new Integer[]{1, 2, 3});
        SingleOutputStreamOperator keyedResult = source.keyBy((KeySelector & Serializable)value -> value).map((MapFunction)new NoOpIntMap());
        keyedResult.sinkTo((Sink)new DiscardingSink());
        StreamGraph graph = env.getStreamGraph();
        StreamNode keyedResultNode = graph.getStreamNode(Integer.valueOf(keyedResult.getId()));
        StreamPartitioner streamPartitioner = ((StreamEdge)keyedResultNode.getInEdges().get(0)).getPartitioner();
    }

    @Test
    void testMaxParallelismForwarding() {
        int globalMaxParallelism = 42;
        int keyedResult2MaxParallelism = 17;
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().setMaxParallelism(globalMaxParallelism);
        DataStreamSource source = env.fromData((Object[])new Integer[]{1, 2, 3});
        SingleOutputStreamOperator keyedResult1 = source.keyBy((KeySelector & Serializable)value -> value).map((MapFunction)new NoOpIntMap());
        SingleOutputStreamOperator keyedResult2 = keyedResult1.keyBy((KeySelector & Serializable)value -> value).map((MapFunction)new NoOpIntMap()).setMaxParallelism(keyedResult2MaxParallelism);
        keyedResult2.sinkTo((Sink)new DiscardingSink());
        StreamGraph graph = env.getStreamGraph();
        StreamNode keyedResult1Node = graph.getStreamNode(Integer.valueOf(keyedResult1.getId()));
        StreamNode keyedResult2Node = graph.getStreamNode(Integer.valueOf(keyedResult2.getId()));
        Assertions.assertThat((int)keyedResult1Node.getMaxParallelism()).isEqualTo(globalMaxParallelism);
        Assertions.assertThat((int)keyedResult2Node.getMaxParallelism()).isEqualTo(keyedResult2MaxParallelism);
    }

    @Test
    void testAutoMaxParallelism() {
        int globalParallelism = 42;
        int mapParallelism = 17;
        int maxParallelism = 21;
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(globalParallelism);
        DataStreamSource source = env.fromData((Object[])new Integer[]{1, 2, 3});
        SingleOutputStreamOperator keyedResult1 = source.keyBy((KeySelector & Serializable)value -> value).map((MapFunction)new NoOpIntMap());
        SingleOutputStreamOperator keyedResult2 = keyedResult1.keyBy((KeySelector & Serializable)value -> value).map((MapFunction)new NoOpIntMap()).setParallelism(mapParallelism);
        SingleOutputStreamOperator keyedResult3 = keyedResult2.keyBy((KeySelector & Serializable)value -> value).map((MapFunction)new NoOpIntMap()).setMaxParallelism(maxParallelism);
        SingleOutputStreamOperator keyedResult4 = keyedResult3.keyBy((KeySelector & Serializable)value -> value).map((MapFunction)new NoOpIntMap()).setMaxParallelism(maxParallelism).setParallelism(mapParallelism);
        keyedResult4.sinkTo((Sink)new DiscardingSink());
        StreamGraph graph = env.getStreamGraph();
        StreamNode keyedResult3Node = graph.getStreamNode(Integer.valueOf(keyedResult3.getId()));
        StreamNode keyedResult4Node = graph.getStreamNode(Integer.valueOf(keyedResult4.getId()));
        Assertions.assertThat((int)keyedResult3Node.getMaxParallelism()).isEqualTo(maxParallelism);
        Assertions.assertThat((int)keyedResult4Node.getMaxParallelism()).isEqualTo(maxParallelism);
    }

    @Test
    void testMaxParallelismWithConnectedKeyedStream() {
        int maxParallelism = 42;
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        SingleOutputStreamOperator input1 = env.fromSequence(1L, 4L).setMaxParallelism(128);
        SingleOutputStreamOperator input2 = env.fromSequence(1L, 4L).setMaxParallelism(129);
        env.getConfig().setMaxParallelism(maxParallelism);
        SingleOutputStreamOperator keyedResult = input1.connect((DataStream)input2).keyBy((KeySelector & Serializable)value -> value, (KeySelector & Serializable)value -> value).map((CoMapFunction)new NoOpLongCoMap());
        keyedResult.sinkTo((Sink)new DiscardingSink());
        StreamGraph graph = env.getStreamGraph();
        StreamNode keyedResultNode = graph.getStreamNode(Integer.valueOf(keyedResult.getId()));
        StreamPartitioner streamPartitioner1 = ((StreamEdge)keyedResultNode.getInEdges().get(0)).getPartitioner();
        StreamPartitioner streamPartitioner2 = ((StreamEdge)keyedResultNode.getInEdges().get(1)).getPartitioner();
    }

    @Test
    void testSinkIdComparison() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource source = env.fromData((Object[])new Integer[]{1, 2, 3});
        for (int i = 0; i < 32; ++i) {
            if (i % 2 == 0) {
                source.addSink((SinkFunction)new SinkFunction<Integer>(){

                    public void invoke(Integer value, SinkFunction.Context ctx) throws Exception {
                    }
                });
                continue;
            }
            source.map((MapFunction & Serializable)x -> x + 1);
        }
        env.getStreamGraph().getStreamingPlanAsJSON();
    }

    @Test
    void testEnableSlotSharing() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource sourceDataStream = env.fromData((Object[])new Integer[]{1, 2, 3});
        SingleOutputStreamOperator mapDataStream = sourceDataStream.map((MapFunction & Serializable)x -> x + 1);
        ArrayList<Transformation> transformations = new ArrayList<Transformation>();
        transformations.add(sourceDataStream.getTransformation());
        transformations.add(mapDataStream.getTransformation());
        StreamGraph streamGraph = new StreamGraphGenerator(transformations, env.getConfig(), env.getCheckpointConfig()).generate();
        Collection streamNodes = streamGraph.getStreamNodes();
        for (StreamNode streamNode : streamNodes) {
            Assertions.assertThat((String)streamNode.getSlotSharingGroup()).isEqualTo("default");
        }
    }

    @Test
    void testSetManagedMemoryWeight() {
        int weight = 123;
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        SingleOutputStreamOperator source = env.fromData((Object[])new Integer[]{1, 2, 3}).name("source");
        source.getTransformation().declareManagedMemoryUseCaseAtOperatorScope(ManagedMemoryUseCase.OPERATOR, 123);
        source.print().name("sink");
        StreamGraph streamGraph = env.getStreamGraph();
        for (StreamNode streamNode : streamGraph.getStreamNodes()) {
            if (streamNode.getOperatorName().contains("source")) {
                Assertions.assertThat((Integer)((Integer)streamNode.getManagedMemoryOperatorScopeUseCaseWeights().get(ManagedMemoryUseCase.OPERATOR))).isEqualTo(123);
                continue;
            }
            Assertions.assertThat((Map)streamNode.getManagedMemoryOperatorScopeUseCaseWeights()).isEmpty();
        }
    }

    @Test
    void testSetSlotSharingResource() {
        String slotSharingGroup1 = "a";
        String slotSharingGroup2 = "b";
        ResourceProfile resourceProfile1 = ResourceProfile.fromResources((double)1.0, (int)10);
        ResourceProfile resourceProfile2 = ResourceProfile.fromResources((double)2.0, (int)20);
        ResourceProfile resourceProfile3 = ResourceProfile.fromResources((double)3.0, (int)30);
        HashMap<String, ResourceProfile> slotSharingGroupResource = new HashMap<String, ResourceProfile>();
        slotSharingGroupResource.put("a", resourceProfile1);
        slotSharingGroupResource.put("b", resourceProfile2);
        slotSharingGroupResource.put("default", resourceProfile3);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        SingleOutputStreamOperator sourceDataStream = env.fromData((Object[])new Integer[]{1, 2, 3}).slotSharingGroup("a");
        SingleOutputStreamOperator mapDataStream1 = sourceDataStream.map((MapFunction & Serializable)x -> x + 1).slotSharingGroup("b");
        SingleOutputStreamOperator mapDataStream2 = mapDataStream1.map((MapFunction & Serializable)x -> x * 2);
        ArrayList<Transformation> transformations = new ArrayList<Transformation>();
        transformations.add(sourceDataStream.getTransformation());
        transformations.add(mapDataStream1.getTransformation());
        transformations.add(mapDataStream2.getTransformation());
        StreamGraph streamGraph = new StreamGraphGenerator(transformations, env.getConfig(), env.getCheckpointConfig()).setSlotSharingGroupResource(slotSharingGroupResource).generate();
        Assertions.assertThat((Optional)streamGraph.getSlotSharingGroupResource("a")).hasValue((Object)resourceProfile1);
        Assertions.assertThat((Optional)streamGraph.getSlotSharingGroupResource("b")).hasValue((Object)resourceProfile2);
        Assertions.assertThat((Optional)streamGraph.getSlotSharingGroupResource("default")).hasValue((Object)resourceProfile3);
    }

    @Test
    void testSettingSavepointRestoreSettings() {
        Configuration config = new Configuration();
        config.set(StateRecoveryOptions.SAVEPOINT_PATH, (Object)"/tmp/savepoint");
        StreamGraph streamGraph = new StreamGraphGenerator(Collections.emptyList(), new ExecutionConfig(), new CheckpointConfig(), config).generate();
        SavepointRestoreSettings savepointRestoreSettings = streamGraph.getSavepointRestoreSettings();
        Assertions.assertThat((Object)savepointRestoreSettings).isEqualTo((Object)SavepointRestoreSettings.forPath((String)"/tmp/savepoint"));
    }

    @Test
    void testSettingSavepointRestoreSettingsSetterOverrides() {
        Configuration config = new Configuration();
        config.set(StateRecoveryOptions.SAVEPOINT_PATH, (Object)"/tmp/savepoint");
        StreamGraphGenerator generator = new StreamGraphGenerator(Collections.emptyList(), new ExecutionConfig(), new CheckpointConfig(), config);
        generator.setSavepointRestoreSettings(SavepointRestoreSettings.forPath((String)"/tmp/savepoint1"));
        StreamGraph streamGraph = generator.generate();
        SavepointRestoreSettings savepointRestoreSettings = streamGraph.getSavepointRestoreSettings();
        Assertions.assertThat((Object)savepointRestoreSettings).isEqualTo((Object)SavepointRestoreSettings.forPath((String)"/tmp/savepoint1"));
    }

    @Test
    void testConfigureSlotSharingGroupResource() {
        SlotSharingGroup ssg1 = SlotSharingGroup.newBuilder((String)"ssg1").setCpuCores(1.0).setTaskHeapMemoryMB(100).build();
        SlotSharingGroup ssg2 = SlotSharingGroup.newBuilder((String)"ssg2").setCpuCores(2.0).setTaskHeapMemoryMB(200).build();
        SlotSharingGroup ssg3 = SlotSharingGroup.newBuilder((String)"default").setCpuCores(3.0).setTaskHeapMemoryMB(300).build();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        SingleOutputStreamOperator source = env.fromData((Object[])new Integer[]{1}).slotSharingGroup("ssg1");
        source.map((MapFunction & Serializable)value -> value).slotSharingGroup(ssg2).map((MapFunction & Serializable)value -> value * 2).map((MapFunction & Serializable)value -> value * 3).slotSharingGroup(SlotSharingGroup.newBuilder((String)"ssg4").build()).map((MapFunction & Serializable)value -> value * 4).slotSharingGroup(ssg3).sinkTo((Sink)new DiscardingSink()).slotSharingGroup(ssg1);
        StreamGraph streamGraph = env.getStreamGraph();
        Assertions.assertThat((Optional)streamGraph.getSlotSharingGroupResource("ssg1")).hasValue((Object)ResourceProfile.fromResources((double)1.0, (int)100));
        Assertions.assertThat((Optional)streamGraph.getSlotSharingGroupResource("ssg2")).hasValue((Object)ResourceProfile.fromResources((double)2.0, (int)200));
        Assertions.assertThat((Optional)streamGraph.getSlotSharingGroupResource("default")).hasValue((Object)ResourceProfile.fromResources((double)3.0, (int)300));
    }

    @Test
    void testConflictSlotSharingGroup() {
        SlotSharingGroup ssg = SlotSharingGroup.newBuilder((String)"ssg").setCpuCores(1.0).setTaskHeapMemoryMB(100).build();
        SlotSharingGroup ssgConflict = SlotSharingGroup.newBuilder((String)"ssg").setCpuCores(2.0).setTaskHeapMemoryMB(200).build();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        SingleOutputStreamOperator source = env.fromData((Object[])new Integer[]{1}).slotSharingGroup(ssg);
        source.map((MapFunction & Serializable)value -> value).slotSharingGroup(ssgConflict).sinkTo((Sink)new DiscardingSink()).slotSharingGroup(ssgConflict);
        Assertions.assertThatThrownBy(() -> ((StreamExecutionEnvironment)env).getStreamGraph()).isInstanceOf(IllegalArgumentException.class);
    }

    @Test
    void testTrackTransformationsByIdentity() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Transformation noopTransformation = env.fromSequence(1L, 2L).getTransformation();
        StreamGraphGenerator generator = new StreamGraphGenerator(Arrays.asList(new Transformation[]{noopTransformation, new FailingTransformation(noopTransformation.hashCode())}), new ExecutionConfig(), new CheckpointConfig());
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> ((StreamGraphGenerator)generator).generate()).isInstanceOf(IllegalStateException.class)).hasMessageContaining("Unknown transformation: FailingTransformation");
    }

    @Test
    void testResetBatchExchangeModeInStreamingExecution() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource sourceDataStream = env.fromData((Object[])new Integer[]{1, 2, 3});
        PartitionTransformation transformation = new PartitionTransformation(sourceDataStream.getTransformation(), (StreamPartitioner)new RebalancePartitioner(), StreamExchangeMode.BATCH);
        DataStream partitionStream = new DataStream(env, (Transformation)transformation);
        partitionStream.map((MapFunction & Serializable)value -> value).print();
        StreamGraph streamGraph = env.getStreamGraph();
        List nodeIds = streamGraph.getStreamNodes().stream().map(StreamNode::getId).sorted(Integer::compare).collect(Collectors.toList());
        ((ListAssert)Assertions.assertThat((List)streamGraph.getStreamEdges(((Integer)nodeIds.get(0)).intValue(), ((Integer)nodeIds.get(1)).intValue())).hasSize(1)).satisfies(new ThrowingConsumer[]{e -> Assertions.assertThat((Comparable)((StreamEdge)e.get(0)).getExchangeMode()).isEqualTo((Object)StreamExchangeMode.UNDEFINED)});
    }

    @Test
    void testAutoParallelismForExpandedTransformations() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);
        DataStreamSource sourceDataStream = env.fromData((Object[])new Integer[]{1, 2, 3});
        sourceDataStream.sinkTo((Sink)new TestExpandingSink()).setParallelism(-1);
        StreamGraph graph = env.getStreamGraph();
        graph.getStreamNodes().forEach(node -> {
            if (!node.getOperatorName().startsWith("Source")) {
                Assertions.assertThat((int)node.getParallelism()).isEqualTo(2);
            }
        });
    }

    @Test
    void testCacheTransformation() {
        TestingStreamExecutionEnvironment env = new TestingStreamExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
        DataStreamSource source = env.fromData(new Integer[]{1, 2, 3});
        int upstreamParallelism = 3;
        CachedDataStream cachedStream = source.keyBy((KeySelector & Serializable)i -> i).reduce(Integer::sum).setParallelism(3).cache();
        Assertions.assertThat((Object)cachedStream.getTransformation()).isInstanceOf(CacheTransformation.class);
        CacheTransformation cacheTransformation = (CacheTransformation)cachedStream.getTransformation();
        cachedStream.print();
        StreamGraph streamGraph = env.getStreamGraph();
        this.verifyCacheProduceNode(3, (CacheTransformation<Integer>)cacheTransformation, streamGraph, null);
        env.addCompletedClusterDatasetIds(cacheTransformation.getDatasetId());
        cachedStream.print();
        this.verifyCacheConsumeNode(env, 3, (CacheTransformation<Integer>)cacheTransformation);
    }

    @Test
    void testCacheSideOutput() {
        TestingStreamExecutionEnvironment env = new TestingStreamExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
        int upstreamParallelism = 2;
        SingleOutputStreamOperator stream = env.fromData(new Integer[]{1, 2, 3}).map((MapFunction & Serializable)i -> i).setParallelism(2);
        CachedDataStream sideOutputCache = stream.getSideOutput((OutputTag)new OutputTag<Integer>("1"){}).cache();
        Assertions.assertThat((Object)sideOutputCache.getTransformation()).isInstanceOf(CacheTransformation.class);
        CacheTransformation cacheTransformation = (CacheTransformation)sideOutputCache.getTransformation();
        sideOutputCache.print();
        StreamGraph streamGraph = env.getStreamGraph();
        this.verifyCacheProduceNode(2, (CacheTransformation<Integer>)cacheTransformation, streamGraph, "1");
        env.addCompletedClusterDatasetIds(cacheTransformation.getDatasetId());
        sideOutputCache.print();
        this.verifyCacheConsumeNode(env, 2, (CacheTransformation<Integer>)cacheTransformation);
    }

    @Test
    void testStubTransformation() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);
        env.fromData((Object[])new Integer[]{1, 2, 3}).setParallelism(2).name("source").map((MapFunction & Serializable)i -> i).name("mapper").sinkTo((Sink)new DiscardingSink()).name("primary output");
        new DataStream(env, (Transformation)StubTransformation.create((TypeInformation)BasicTypeInfo.INT_TYPE_INFO, t -> t.getName().equals("mapper"))).sinkTo((Sink)new DiscardingSink()).name("secondary output");
        new DataStream(env, (Transformation)StubTransformation.create((TypeInformation)BasicTypeInfo.INT_TYPE_INFO, t -> t.getName().equals("mapper"))).sinkTo((Sink)new DiscardingSink()).name("tertiary output");
        StreamGraph streamGraph = env.getStreamGraph();
        Assertions.assertThat((Collection)streamGraph.getStreamNodes()).hasSize(5);
        ((ListAssert)((ListAssert)((ListAssert)((ObjectAssert)((ObjectAssert)((ListAssert)((ObjectAssert)Assertions.assertThat((Collection)streamGraph.getSourceIDs()).singleElement()).extracting(arg_0 -> ((StreamGraph)streamGraph).getStreamNode(arg_0)).returns(SourceOperatorFactory.class, node -> node.getOperatorFactory().getClass()).returns((Object)2, StreamNode::getParallelism).extracting(StreamNode::getOutEdges, InstanceOfAssertFactories.list(StreamEdge.class))).singleElement()).returns(ForwardPartitioner.class, edge -> edge.getPartitioner().getClass())).extracting(edge -> streamGraph.getStreamNode(Integer.valueOf(edge.getTargetId()))).returns(SimpleUdfStreamOperatorFactory.class, node -> node.getOperatorFactory().getClass()).returns((Object)2, StreamNode::getParallelism).extracting(StreamNode::getOutEdges, InstanceOfAssertFactories.list(StreamEdge.class))).hasSize(3)).allMatch(edge -> edge.getPartitioner() instanceof ForwardPartitioner)).map(edge -> streamGraph.getStreamNode(Integer.valueOf(edge.getTargetId()))).allMatch(sink -> sink.getOperatorFactory() instanceof SinkWriterOperatorFactory);
    }

    @Test
    void testStubTransformationFailsWithoutMatch() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);
        new DataStream(env, (Transformation)StubTransformation.create((TypeInformation)BasicTypeInfo.INT_TYPE_INFO, t -> t.getName().equals("mapper"))).sinkTo((Sink)new DiscardingSink()).name("secondary output");
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> ((StreamExecutionEnvironment)env).getStreamGraph()).isInstanceOf(IllegalStateException.class)).hasMessageContaining("No upstream transformation found for StubTransformation");
    }

    @Test
    void testStubTransformationFailWithTypeMismatch() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);
        env.fromData((Object[])new Integer[]{1, 2, 3}).setParallelism(2).name("source").map((MapFunction & Serializable)i -> i).name("mapper").sinkTo((Sink)new DiscardingSink()).name("primary output");
        new DataStream(env, (Transformation)StubTransformation.create((TypeInformation)BasicTypeInfo.LONG_TYPE_INFO, t -> t.getName().equals("mapper"))).sinkTo((Sink)new DiscardingSink()).name("secondary output");
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> ((StreamExecutionEnvironment)env).getStreamGraph()).isInstanceOf(IllegalStateException.class)).hasMessageContaining("The output type of the input transformation does not match the expected output type of the StubTransformation");
    }

    private void verifyCacheProduceNode(int upstreamParallelism, CacheTransformation<Integer> cacheTransformation, StreamGraph streamGraph, String expectedTagId) {
        Assertions.assertThat((Collection)streamGraph.getStreamNodes()).anyMatch(node -> {
            if (!"CacheWrite".equals(node.getOperatorName())) {
                return false;
            }
            Assertions.assertThat((int)node.getParallelism()).isEqualTo(upstreamParallelism);
            Assertions.assertThat((int)node.getInEdges().size()).isEqualTo(1);
            StreamEdge inEdge = (StreamEdge)node.getInEdges().get(0);
            Assertions.assertThat((Object)inEdge.getPartitioner()).isInstanceOf(ForwardPartitioner.class);
            if (expectedTagId != null) {
                Assertions.assertThat((String)inEdge.getOutputTag().getId()).isEqualTo(expectedTagId);
            }
            Assertions.assertThat((Comparable)inEdge.getIntermediateDatasetIdToProduce()).isNotNull();
            Assertions.assertThat((Comparable)new AbstractID((AbstractID)inEdge.getIntermediateDatasetIdToProduce())).isEqualTo((Object)cacheTransformation.getDatasetId());
            return true;
        });
    }

    private void verifyCacheConsumeNode(StreamExecutionEnvironment env, int upstreamParallelism, CacheTransformation<Integer> cacheTransformation) {
        Assertions.assertThat((Collection)env.getStreamGraph().getStreamNodes()).anyMatch(node -> {
            if (!"CacheRead".equals(node.getOperatorName())) {
                return false;
            }
            Assertions.assertThat((int)node.getParallelism()).isEqualTo(upstreamParallelism);
            Assertions.assertThat((Comparable)new AbstractID((AbstractID)node.getConsumeClusterDatasetId())).isEqualTo((Object)cacheTransformation.getDatasetId());
            return true;
        });
    }

    private static class TestingStreamExecutionEnvironment
    extends StreamExecutionEnvironment {
        Set<AbstractID> completedClusterDatasetIds = new HashSet<AbstractID>();

        private TestingStreamExecutionEnvironment() {
        }

        public void addCompletedClusterDatasetIds(AbstractID id) {
            this.completedClusterDatasetIds.add(id);
        }

        public Set<AbstractID> listCompletedClusterDatasets() {
            return new HashSet<AbstractID>(this.completedClusterDatasetIds);
        }
    }

    private static class MultipleInputOperatorFactory
    implements StreamOperatorFactory<String> {
        private MultipleInputOperatorFactory() {
        }

        public <T extends StreamOperator<String>> T createStreamOperator(StreamOperatorParameters<String> parameters) {
            throw new UnsupportedOperationException();
        }

        public void setChainingStrategy(ChainingStrategy strategy) {
            throw new UnsupportedOperationException();
        }

        public ChainingStrategy getChainingStrategy() {
            throw new UnsupportedOperationException();
        }

        public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
            throw new UnsupportedOperationException();
        }
    }

    private static class EqualsResourceSpecMatcher
    extends TypeSafeMatcher<ResourceSpec> {
        private final ResourceSpec resources;

        EqualsResourceSpecMatcher(ResourceSpec resources) {
            this.resources = resources;
        }

        public void describeTo(Description description) {
            description.appendText("expected resource spec ").appendValue((Object)this.resources);
        }

        protected boolean matchesSafely(ResourceSpec item) {
            return this.resources.lessThanOrEqual(item) && item.lessThanOrEqual(this.resources);
        }
    }

    static class NoOpLongCoMap
    implements CoMapFunction<Long, Long, Long> {
        private static final long serialVersionUID = 1886595528149124270L;

        NoOpLongCoMap() {
        }

        public Long map1(Long value) throws Exception {
            return value;
        }

        public Long map2(Long value) throws Exception {
            return value;
        }
    }

    private static class OutputTypeConfigurableOperationWithOneInput
    extends AbstractStreamOperator<Integer>
    implements OneInputStreamOperator<Integer, Integer>,
    OutputTypeConfigurable<Integer> {
        private static final long serialVersionUID = 1L;
        TypeInformation<Integer> tpeInformation;

        private OutputTypeConfigurableOperationWithOneInput() {
        }

        public TypeInformation<Integer> getTypeInformation() {
            return this.tpeInformation;
        }

        public void processElement(StreamRecord<Integer> element) {
            this.output.collect(element);
        }

        public void processWatermark(Watermark mark) {
        }

        public void processLatencyMarker(LatencyMarker latencyMarker) throws Exception {
        }

        public void setOutputType(TypeInformation<Integer> outTypeInfo, ExecutionConfig executionConfig) {
            this.tpeInformation = outTypeInfo;
        }
    }

    static class OutputTypeConfigurableOperationWithTwoInputs
    extends AbstractStreamOperator<Integer>
    implements TwoInputStreamOperator<Integer, Integer, Integer>,
    OutputTypeConfigurable<Integer> {
        private static final long serialVersionUID = 1L;
        TypeInformation<Integer> tpeInformation;

        OutputTypeConfigurableOperationWithTwoInputs() {
        }

        public TypeInformation<Integer> getTypeInformation() {
            return this.tpeInformation;
        }

        public void setOutputType(TypeInformation<Integer> outTypeInfo, ExecutionConfig executionConfig) {
            this.tpeInformation = outTypeInfo;
        }

        public void processElement1(StreamRecord<Integer> element) throws Exception {
            this.output.collect(element);
        }

        public void processElement2(StreamRecord<Integer> element) throws Exception {
            this.output.collect(element);
        }

        public void processWatermark1(Watermark mark) throws Exception {
        }

        public void processWatermark2(Watermark mark) throws Exception {
        }

        public void processLatencyMarker1(LatencyMarker latencyMarker) throws Exception {
        }

        public void processLatencyMarker2(LatencyMarker latencyMarker) throws Exception {
        }

        protected void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<Integer>> output) {
        }
    }

    static class NoOpUdfOperator<T>
    extends AbstractUdfStreamOperator<T, Function>
    implements OneInputStreamOperator<T, T> {
        NoOpUdfOperator(Function function) {
            super(function);
        }

        public void processElement(StreamRecord<T> element) throws Exception {
            this.output.collect(element);
        }
    }

    private static class OutputTypeConfigurableFunction<T>
    implements OutputTypeConfigurable<T>,
    Function {
        private TypeInformation<T> typeInformation;

        private OutputTypeConfigurableFunction() {
        }

        public TypeInformation<T> getTypeInformation() {
            return this.typeInformation;
        }

        public void setOutputType(TypeInformation<T> outTypeInfo, ExecutionConfig executionConfig) {
            this.typeInformation = outTypeInfo;
        }
    }

    private static class FailingTransformation
    extends Transformation<String> {
        private final int hashCode;

        FailingTransformation(int hashCode) {
            super("FailingTransformation", (TypeInformation)BasicTypeInfo.STRING_TYPE_INFO, 1);
            this.hashCode = hashCode;
        }

        protected List<Transformation<?>> getTransitivePredecessorsInternal() {
            return Collections.emptyList();
        }

        public List<Transformation<?>> getInputs() {
            return Collections.emptyList();
        }

        public boolean equals(Object o) {
            return true;
        }

        public int hashCode() {
            return this.hashCode;
        }
    }
}

