package org.apache.flink.streaming.api.graph;

import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Objects;
import org.apache.flink.api.common.BatchShuffleMode;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.core.memory.ManagedMemoryUseCase;
import org.apache.flink.runtime.jobgraph.JobType;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.MultipleConnectedStreams;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.Input;
import org.apache.flink.streaming.api.operators.InputSelectable;
import org.apache.flink.streaming.api.operators.InputSelection;
import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionStateBackend;
import org.apache.flink.streaming.api.transformations.KeyedMultipleInputTransformation;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.Collector;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/streaming/api/graph/StreamGraphGeneratorBatchExecutionTest.class */
class StreamGraphGeneratorBatchExecutionTest {
    private static final KeyedProcessFunction<Integer, Integer, Integer> DUMMY_PROCESS_FUNCTION = new KeyedProcessFunction<Integer, Integer, Integer>() { // from class: org.apache.flink.streaming.api.graph.StreamGraphGeneratorBatchExecutionTest.1
        public void processElement(Integer num, KeyedProcessFunction<Integer, Integer, Integer>.Context context, Collector<Integer> collector) {
        }

        public /* bridge */ /* synthetic */ void processElement(Object obj, KeyedProcessFunction.Context context, Collector collector) throws Exception {
            processElement((Integer) obj, (KeyedProcessFunction<Integer, Integer, Integer>.Context) context, (Collector<Integer>) collector);
        }
    };
    private static final KeyedCoProcessFunction<Integer, Integer, Integer, Integer> DUMMY_KEYED_CO_PROCESS_FUNCTION = new KeyedCoProcessFunction<Integer, Integer, Integer, Integer>() { // from class: org.apache.flink.streaming.api.graph.StreamGraphGeneratorBatchExecutionTest.2
        public void processElement1(Integer num, KeyedCoProcessFunction<Integer, Integer, Integer, Integer>.Context context, Collector<Integer> collector) {
        }

        public void processElement2(Integer num, KeyedCoProcessFunction<Integer, Integer, Integer, Integer>.Context context, Collector<Integer> collector) {
        }

        public /* bridge */ /* synthetic */ void processElement2(Object obj, KeyedCoProcessFunction.Context context, Collector collector) throws Exception {
            processElement2((Integer) obj, (KeyedCoProcessFunction<Integer, Integer, Integer, Integer>.Context) context, (Collector<Integer>) collector);
        }

        public /* bridge */ /* synthetic */ void processElement1(Object obj, KeyedCoProcessFunction.Context context, Collector collector) throws Exception {
            processElement1((Integer) obj, (KeyedCoProcessFunction<Integer, Integer, Integer, Integer>.Context) context, (Collector<Integer>) collector);
        }
    };

    /* loaded from: input_file:org/apache/flink/streaming/api/graph/StreamGraphGeneratorBatchExecutionTest$InputSelectableMultipleInputOperator.class */
    private static final class InputSelectableMultipleInputOperator extends MultipleInputOperator implements InputSelectable {
        public InputSelectableMultipleInputOperator(StreamOperatorParameters<Integer> streamOperatorParameters, int i) {
            super(streamOperatorParameters, i);
        }

        public InputSelection nextSelection() {
            return null;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/graph/StreamGraphGeneratorBatchExecutionTest$InputSelectableTwoInputOperator.class */
    private static final class InputSelectableTwoInputOperator extends AbstractStreamOperator<Integer> implements TwoInputStreamOperator<Integer, Integer, Integer>, InputSelectable {
        private InputSelectableTwoInputOperator() {
        }

        public InputSelection nextSelection() {
            return null;
        }

        public void processElement1(StreamRecord<Integer> streamRecord) {
        }

        public void processElement2(StreamRecord<Integer> streamRecord) {
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/graph/StreamGraphGeneratorBatchExecutionTest$MultipleInputOperator.class */
    private static class MultipleInputOperator extends AbstractStreamOperatorV2<Integer> implements MultipleInputStreamOperator<Integer> {
        public MultipleInputOperator(StreamOperatorParameters<Integer> streamOperatorParameters, int i) {
            super(streamOperatorParameters, i);
        }

        public List<Input> getInputs() {
            return Collections.emptyList();
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/graph/StreamGraphGeneratorBatchExecutionTest$MultipleInputOperatorFactory.class */
    private static final class MultipleInputOperatorFactory extends AbstractStreamOperatorFactory<Integer> {
        private final int inputsCount;
        private final boolean selectable;

        private MultipleInputOperatorFactory(int i, boolean z) {
            this.inputsCount = i;
            this.selectable = z;
        }

        public <T extends StreamOperator<Integer>> T createStreamOperator(StreamOperatorParameters<Integer> streamOperatorParameters) {
            return this.selectable ? new InputSelectableMultipleInputOperator(streamOperatorParameters, this.inputsCount) : new MultipleInputOperator(streamOperatorParameters, this.inputsCount);
        }

        public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
            return this.selectable ? InputSelectableMultipleInputOperator.class : MultipleInputOperator.class;
        }
    }

    StreamGraphGeneratorBatchExecutionTest() {
    }

    @Test
    void testShuffleMode() {
        testGlobalStreamExchangeMode(RuntimeExecutionMode.AUTOMATIC, BatchShuffleMode.ALL_EXCHANGES_BLOCKING, GlobalStreamExchangeMode.ALL_EDGES_BLOCKING);
        testGlobalStreamExchangeMode(RuntimeExecutionMode.STREAMING, BatchShuffleMode.ALL_EXCHANGES_BLOCKING, GlobalStreamExchangeMode.ALL_EDGES_PIPELINED);
        testGlobalStreamExchangeMode(RuntimeExecutionMode.BATCH, BatchShuffleMode.ALL_EXCHANGES_PIPELINED, GlobalStreamExchangeMode.ALL_EDGES_PIPELINED);
        testGlobalStreamExchangeMode(RuntimeExecutionMode.BATCH, BatchShuffleMode.ALL_EXCHANGES_HYBRID_FULL, GlobalStreamExchangeMode.ALL_EDGES_HYBRID_FULL);
        testGlobalStreamExchangeMode(RuntimeExecutionMode.BATCH, BatchShuffleMode.ALL_EXCHANGES_HYBRID_SELECTIVE, GlobalStreamExchangeMode.ALL_EDGES_HYBRID_SELECTIVE);
    }

    @Test
    void testBatchJobType() {
        Assertions.assertThat(getStreamGraphInBatchMode(addDummyPipeline(StreamExecutionEnvironment.getExecutionEnvironment())).getJobType()).isEqualTo(JobType.BATCH);
    }

    @Test
    void testManagedMemoryWeights() {
        SingleOutputStreamOperator process = StreamExecutionEnvironment.getExecutionEnvironment().fromData(new Integer[]{1, 2}).keyBy((v0) -> {
            return v0.intValue();
        }).process(DUMMY_PROCESS_FUNCTION);
        StreamNode streamNode = getStreamGraphInBatchMode(process.sinkTo(new DiscardingSink())).getStreamNode(Integer.valueOf(process.getId()));
        HashMap hashMap = new HashMap();
        hashMap.put(ManagedMemoryUseCase.OPERATOR, Integer.valueOf(((MemorySize) ExecutionOptions.SORTED_INPUTS_MEMORY.defaultValue()).getMebiBytes()));
        Assertions.assertThat(streamNode.getManagedMemoryOperatorScopeUseCaseWeights()).isEqualTo(hashMap);
        Assertions.assertThat(streamNode.getManagedMemorySlotScopeUseCases()).containsOnly(new ManagedMemoryUseCase[]{ManagedMemoryUseCase.STATE_BACKEND});
    }

    @Test
    void testCustomManagedMemoryWeights() {
        SingleOutputStreamOperator process = StreamExecutionEnvironment.getExecutionEnvironment().fromData(new Integer[]{1, 2}).keyBy((v0) -> {
            return v0.intValue();
        }).process(DUMMY_PROCESS_FUNCTION);
        DataStreamSink<?> sinkTo = process.sinkTo(new DiscardingSink());
        Configuration configuration = new Configuration();
        configuration.set(ExecutionOptions.SORTED_INPUTS_MEMORY, MemorySize.ofMebiBytes(42L));
        StreamNode streamNode = getStreamGraphInBatchMode(sinkTo, configuration).getStreamNode(Integer.valueOf(process.getId()));
        HashMap hashMap = new HashMap();
        hashMap.put(ManagedMemoryUseCase.OPERATOR, 42);
        Assertions.assertThat(streamNode.getManagedMemoryOperatorScopeUseCaseWeights()).isEqualTo(hashMap);
        Assertions.assertThat(streamNode.getManagedMemorySlotScopeUseCases()).containsOnly(new ManagedMemoryUseCase[]{ManagedMemoryUseCase.STATE_BACKEND});
    }

    @Test
    void testOneInputTransformation() {
        SingleOutputStreamOperator process = StreamExecutionEnvironment.getExecutionEnvironment().fromData(new Integer[]{1, 2}).keyBy((v0) -> {
            return v0.intValue();
        }).process(DUMMY_PROCESS_FUNCTION);
        StreamGraph streamGraphInBatchMode = getStreamGraphInBatchMode(process.sinkTo(new DiscardingSink()));
        StreamNode streamNode = streamGraphInBatchMode.getStreamNode(Integer.valueOf(process.getId()));
        Assertions.assertThat((StreamConfig.InputRequirement) streamNode.getInputRequirements().get(0)).isEqualTo(StreamConfig.InputRequirement.SORTED);
        Assertions.assertThat(streamNode.getOperatorFactory().getChainingStrategy()).isEqualTo(ChainingStrategy.HEAD);
        Assertions.assertThat(streamGraphInBatchMode.getStateBackend()).isInstanceOf(BatchExecutionStateBackend.class);
        Assertions.assertThat(streamGraphInBatchMode.getTimerServiceProvider()).isNotNull();
    }

    @Test
    void testDisablingStateBackendOneInputTransformation() {
        SingleOutputStreamOperator process = StreamExecutionEnvironment.getExecutionEnvironment().fromData(new Integer[]{1, 2}).keyBy((v0) -> {
            return v0.intValue();
        }).process(DUMMY_PROCESS_FUNCTION);
        DataStreamSink<?> sinkTo = process.sinkTo(new DiscardingSink());
        Configuration configuration = new Configuration();
        configuration.set(ExecutionOptions.USE_BATCH_STATE_BACKEND, false);
        StreamGraph streamGraphInBatchMode = getStreamGraphInBatchMode(sinkTo, configuration);
        StreamNode streamNode = streamGraphInBatchMode.getStreamNode(Integer.valueOf(process.getId()));
        Assertions.assertThat((StreamConfig.InputRequirement) streamNode.getInputRequirements().get(0)).isEqualTo(StreamConfig.InputRequirement.SORTED);
        Assertions.assertThat(streamNode.getOperatorFactory().getChainingStrategy()).isEqualTo(ChainingStrategy.HEAD);
        Assertions.assertThat(streamGraphInBatchMode.getStateBackend()).isNull();
        Assertions.assertThat(streamGraphInBatchMode.getTimerServiceProvider()).isNull();
    }

    @Test
    void testDisablingSortingInputsOneInputTransformation() {
        SingleOutputStreamOperator process = StreamExecutionEnvironment.getExecutionEnvironment().fromData(new Integer[]{1, 2}).keyBy((v0) -> {
            return v0.intValue();
        }).process(DUMMY_PROCESS_FUNCTION);
        DataStreamSink<?> sinkTo = process.sinkTo(new DiscardingSink());
        Configuration configuration = new Configuration();
        configuration.set(ExecutionOptions.USE_BATCH_STATE_BACKEND, false);
        configuration.set(ExecutionOptions.SORT_INPUTS, false);
        StreamGraph streamGraphInBatchMode = getStreamGraphInBatchMode(sinkTo, configuration);
        Assertions.assertThat((StreamConfig.InputRequirement) streamGraphInBatchMode.getStreamNode(Integer.valueOf(process.getId())).getInputRequirements().get(0)).isNull();
        Assertions.assertThat(streamGraphInBatchMode.getStateBackend()).isNull();
        Assertions.assertThat(streamGraphInBatchMode.getTimerServiceProvider()).isNull();
    }

    @Test
    void testDisablingSortingInputsWithoutBatchStateBackendOneInputTransformation() {
        DataStreamSink sinkTo = StreamExecutionEnvironment.getExecutionEnvironment().fromData(new Integer[]{1, 2}).keyBy((v0) -> {
            return v0.intValue();
        }).process(DUMMY_PROCESS_FUNCTION).sinkTo(new DiscardingSink());
        Configuration configuration = new Configuration();
        configuration.set(ExecutionOptions.SORT_INPUTS, false);
        Assertions.assertThatThrownBy(() -> {
            getStreamGraphInBatchMode(sinkTo, configuration);
        }).isInstanceOf(IllegalStateException.class).hasMessageContaining("Batch state backend requires the sorted inputs to be enabled!");
    }

    @Test
    void testTwoInputTransformation() {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        SingleOutputStreamOperator process = executionEnvironment.fromData(new Integer[]{1, 2}).connect(executionEnvironment.fromData(new Integer[]{1, 2})).keyBy((v0) -> {
            return v0.intValue();
        }, (v0) -> {
            return v0.intValue();
        }).process(DUMMY_KEYED_CO_PROCESS_FUNCTION);
        StreamGraph streamGraphInBatchMode = getStreamGraphInBatchMode(process.sinkTo(new DiscardingSink()));
        StreamNode streamNode = streamGraphInBatchMode.getStreamNode(Integer.valueOf(process.getId()));
        Assertions.assertThat((StreamConfig.InputRequirement) streamNode.getInputRequirements().get(0)).isEqualTo(StreamConfig.InputRequirement.SORTED);
        Assertions.assertThat((StreamConfig.InputRequirement) streamNode.getInputRequirements().get(1)).isEqualTo(StreamConfig.InputRequirement.SORTED);
        Assertions.assertThat(streamNode.getOperatorFactory().getChainingStrategy()).isEqualTo(ChainingStrategy.HEAD);
        Assertions.assertThat(streamGraphInBatchMode.getStateBackend()).isInstanceOf(BatchExecutionStateBackend.class);
        Assertions.assertThat(streamGraphInBatchMode.getTimerServiceProvider()).isNotNull();
    }

    @Test
    void testDisablingStateBackendTwoInputTransformation() {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        SingleOutputStreamOperator process = executionEnvironment.fromData(new Integer[]{1, 2}).connect(executionEnvironment.fromData(new Integer[]{1, 2})).keyBy((v0) -> {
            return v0.intValue();
        }, (v0) -> {
            return v0.intValue();
        }).process(DUMMY_KEYED_CO_PROCESS_FUNCTION);
        DataStreamSink<?> sinkTo = process.sinkTo(new DiscardingSink());
        Configuration configuration = new Configuration();
        configuration.set(ExecutionOptions.USE_BATCH_STATE_BACKEND, false);
        StreamGraph streamGraphInBatchMode = getStreamGraphInBatchMode(sinkTo, configuration);
        StreamNode streamNode = streamGraphInBatchMode.getStreamNode(Integer.valueOf(process.getId()));
        Assertions.assertThat((StreamConfig.InputRequirement) streamNode.getInputRequirements().get(0)).isEqualTo(StreamConfig.InputRequirement.SORTED);
        Assertions.assertThat((StreamConfig.InputRequirement) streamNode.getInputRequirements().get(1)).isEqualTo(StreamConfig.InputRequirement.SORTED);
        Assertions.assertThat(streamNode.getOperatorFactory().getChainingStrategy()).isEqualTo(ChainingStrategy.HEAD);
        Assertions.assertThat(streamGraphInBatchMode.getStateBackend()).isNull();
        Assertions.assertThat(streamGraphInBatchMode.getTimerServiceProvider()).isNull();
    }

    @Test
    void testDisablingSortingInputsTwoInputTransformation() {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        SingleOutputStreamOperator process = executionEnvironment.fromData(new Integer[]{1, 2}).connect(executionEnvironment.fromData(new Integer[]{1, 2})).keyBy((v0) -> {
            return v0.intValue();
        }, (v0) -> {
            return v0.intValue();
        }).process(DUMMY_KEYED_CO_PROCESS_FUNCTION);
        DataStreamSink<?> sinkTo = process.sinkTo(new DiscardingSink());
        Configuration configuration = new Configuration();
        configuration.set(ExecutionOptions.USE_BATCH_STATE_BACKEND, false);
        configuration.set(ExecutionOptions.SORT_INPUTS, false);
        StreamGraph streamGraphInBatchMode = getStreamGraphInBatchMode(sinkTo, configuration);
        StreamNode streamNode = streamGraphInBatchMode.getStreamNode(Integer.valueOf(process.getId()));
        Assertions.assertThat((StreamConfig.InputRequirement) streamNode.getInputRequirements().get(0)).isNull();
        Assertions.assertThat((StreamConfig.InputRequirement) streamNode.getInputRequirements().get(1)).isNull();
        Assertions.assertThat(streamGraphInBatchMode.getStateBackend()).isNull();
        Assertions.assertThat(streamGraphInBatchMode.getTimerServiceProvider()).isNull();
    }

    @Test
    void testDisablingSortingInputsWithoutBatchStateBackendTwoInputTransformation() {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSink sinkTo = executionEnvironment.fromData(new Integer[]{1, 2}).connect(executionEnvironment.fromData(new Integer[]{1, 2})).keyBy((v0) -> {
            return v0.intValue();
        }, (v0) -> {
            return v0.intValue();
        }).process(DUMMY_KEYED_CO_PROCESS_FUNCTION).sinkTo(new DiscardingSink());
        Configuration configuration = new Configuration();
        configuration.set(ExecutionOptions.SORT_INPUTS, false);
        Assertions.assertThatThrownBy(() -> {
            getStreamGraphInBatchMode(sinkTo, configuration);
        }).isInstanceOf(IllegalStateException.class).hasMessageContaining("Batch state backend requires the sorted inputs to be enabled!");
    }

    @Test
    void testInputSelectableTwoInputTransformation() {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource fromData = executionEnvironment.fromData(new Integer[]{1, 2});
        DataStreamSink sinkTo = fromData.connect(executionEnvironment.fromData(new Integer[]{1, 2})).keyBy((v0) -> {
            return v0.intValue();
        }, (v0) -> {
            return v0.intValue();
        }).process(DUMMY_KEYED_CO_PROCESS_FUNCTION).connect(fromData).keyBy((v0) -> {
            return v0.intValue();
        }, (v0) -> {
            return v0.intValue();
        }).transform("operator", BasicTypeInfo.INT_TYPE_INFO, new InputSelectableTwoInputOperator()).sinkTo(new DiscardingSink());
        Assertions.assertThatThrownBy(() -> {
            getStreamGraphInBatchMode(sinkTo);
        }).isInstanceOf(IllegalStateException.class).hasMessageContaining("Batch state backend and sorting inputs are not supported in graphs with an InputSelectable operator.");
    }

    @Test
    void testMultiInputTransformation() {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource fromData = executionEnvironment.fromData(new Integer[]{1, 2});
        DataStreamSource fromData2 = executionEnvironment.fromData(new Integer[]{1, 2});
        DataStreamSource fromData3 = executionEnvironment.fromData(new Integer[]{1, 2});
        KeyedMultipleInputTransformation keyedMultipleInputTransformation = new KeyedMultipleInputTransformation("operator", new MultipleInputOperatorFactory(3, false), BasicTypeInfo.INT_TYPE_INFO, 1, BasicTypeInfo.INT_TYPE_INFO);
        keyedMultipleInputTransformation.addInput(fromData.getTransformation(), obj -> {
            return obj;
        });
        keyedMultipleInputTransformation.addInput(fromData2.getTransformation(), obj2 -> {
            return obj2;
        });
        keyedMultipleInputTransformation.addInput(fromData3.getTransformation(), obj3 -> {
            return obj3;
        });
        StreamGraph streamGraphInBatchMode = getStreamGraphInBatchMode(new MultipleConnectedStreams(executionEnvironment).transform(keyedMultipleInputTransformation).sinkTo(new DiscardingSink()));
        StreamNode streamNode = streamGraphInBatchMode.getStreamNode(Integer.valueOf(keyedMultipleInputTransformation.getId()));
        Assertions.assertThat((StreamConfig.InputRequirement) streamNode.getInputRequirements().get(0)).isEqualTo(StreamConfig.InputRequirement.SORTED);
        Assertions.assertThat((StreamConfig.InputRequirement) streamNode.getInputRequirements().get(1)).isEqualTo(StreamConfig.InputRequirement.SORTED);
        Assertions.assertThat(streamNode.getOperatorFactory().getChainingStrategy()).isEqualTo(ChainingStrategy.HEAD);
        Assertions.assertThat(streamGraphInBatchMode.getStateBackend()).isInstanceOf(BatchExecutionStateBackend.class);
        Assertions.assertThat(streamGraphInBatchMode.getTimerServiceProvider()).isNotNull();
    }

    @Test
    void testInputSelectableMultiInputTransformation() {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource fromData = executionEnvironment.fromData(new Integer[]{1, 2});
        DataStreamSource fromData2 = executionEnvironment.fromData(new Integer[]{1, 2});
        DataStreamSource fromData3 = executionEnvironment.fromData(new Integer[]{1, 2});
        KeyedMultipleInputTransformation keyedMultipleInputTransformation = new KeyedMultipleInputTransformation("operator", new MultipleInputOperatorFactory(3, true), BasicTypeInfo.INT_TYPE_INFO, 1, BasicTypeInfo.INT_TYPE_INFO);
        keyedMultipleInputTransformation.addInput(fromData.getTransformation(), obj -> {
            return obj;
        });
        keyedMultipleInputTransformation.addInput(fromData2.getTransformation(), obj2 -> {
            return obj2;
        });
        keyedMultipleInputTransformation.addInput(fromData3.getTransformation(), obj3 -> {
            return obj3;
        });
        DataStreamSink sinkTo = new MultipleConnectedStreams(executionEnvironment).transform(keyedMultipleInputTransformation).sinkTo(new DiscardingSink());
        Assertions.assertThatThrownBy(() -> {
            getStreamGraphInBatchMode(sinkTo);
        }).isInstanceOf(IllegalStateException.class).hasMessageContaining("Batch state backend and sorting inputs are not supported in graphs with an InputSelectable operator.");
    }

    private void testNoSupportForIterationsInBatchHelper(Transformation<?>... transformationArr) {
        ArrayList arrayList = new ArrayList();
        Collections.addAll(arrayList, transformationArr);
        Configuration configuration = new Configuration();
        configuration.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH);
        StreamGraphGenerator streamGraphGenerator = new StreamGraphGenerator(arrayList, new ExecutionConfig(), new CheckpointConfig(), configuration);
        Objects.requireNonNull(streamGraphGenerator);
        Assertions.assertThatThrownBy(streamGraphGenerator::generate).isInstanceOf(UnsupportedOperationException.class).hasMessageContaining("Iterations are not supported in BATCH execution mode.");
    }

    private void testGlobalStreamExchangeMode(RuntimeExecutionMode runtimeExecutionMode, BatchShuffleMode batchShuffleMode, GlobalStreamExchangeMode globalStreamExchangeMode) {
        DataStreamSink<Integer> addDummyPipeline = addDummyPipeline(StreamExecutionEnvironment.getExecutionEnvironment());
        Configuration configuration = new Configuration();
        configuration.set(ExecutionOptions.RUNTIME_MODE, runtimeExecutionMode);
        configuration.set(ExecutionOptions.BATCH_SHUFFLE_MODE, batchShuffleMode);
        Assertions.assertThat(new StreamGraphGenerator(Collections.singletonList(addDummyPipeline.getTransformation()), new ExecutionConfig(), new CheckpointConfig(), configuration).generate().getGlobalStreamExchangeMode()).isEqualTo(globalStreamExchangeMode);
    }

    private DataStreamSink<Integer> addDummyPipeline(StreamExecutionEnvironment streamExecutionEnvironment) {
        return streamExecutionEnvironment.fromData(new Integer[]{1, 2}).keyBy((v0) -> {
            return v0.intValue();
        }).process(DUMMY_PROCESS_FUNCTION).sinkTo(new DiscardingSink());
    }

    private StreamGraph getStreamGraphInBatchMode(DataStreamSink<?> dataStreamSink) {
        return getStreamGraphInBatchMode(dataStreamSink, new Configuration());
    }

    private StreamGraph getStreamGraphInBatchMode(DataStreamSink<?> dataStreamSink, Configuration configuration) {
        ExecutionConfig executionConfig = new ExecutionConfig();
        executionConfig.configure(configuration, StreamGraphGenerator.class.getClassLoader());
        CheckpointConfig checkpointConfig = new CheckpointConfig();
        checkpointConfig.configure(configuration);
        configuration.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH);
        return new StreamGraphGenerator(Collections.singletonList(dataStreamSink.getTransformation()), executionConfig, checkpointConfig, configuration).generate();
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1309233763:
                if (implMethodName.equals("lambda$testMultiInputTransformation$3558be8e$1")) {
                    z = 6;
                    break;
                }
                break;
            case -1309233762:
                if (implMethodName.equals("lambda$testMultiInputTransformation$3558be8e$2")) {
                    z = 4;
                    break;
                }
                break;
            case -1309233761:
                if (implMethodName.equals("lambda$testMultiInputTransformation$3558be8e$3")) {
                    z = 5;
                    break;
                }
                break;
            case 556050114:
                if (implMethodName.equals("intValue")) {
                    z = 2;
                    break;
                }
                break;
            case 1009636667:
                if (implMethodName.equals("lambda$testInputSelectableMultiInputTransformation$3558be8e$1")) {
                    z = 3;
                    break;
                }
                break;
            case 1009636668:
                if (implMethodName.equals("lambda$testInputSelectableMultiInputTransformation$3558be8e$2")) {
                    z = true;
                    break;
                }
                break;
            case 1009636669:
                if (implMethodName.equals("lambda$testInputSelectableMultiInputTransformation$3558be8e$3")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/api/graph/StreamGraphGeneratorBatchExecutionTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;")) {
                    return obj3 -> {
                        return obj3;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/api/graph/StreamGraphGeneratorBatchExecutionTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;")) {
                    return obj2 -> {
                        return obj2;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("java/lang/Integer") && serializedLambda.getImplMethodSignature().equals("()I")) {
                    return (v0) -> {
                        return v0.intValue();
                    };
                }
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("java/lang/Integer") && serializedLambda.getImplMethodSignature().equals("()I")) {
                    return (v0) -> {
                        return v0.intValue();
                    };
                }
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("java/lang/Integer") && serializedLambda.getImplMethodSignature().equals("()I")) {
                    return (v0) -> {
                        return v0.intValue();
                    };
                }
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("java/lang/Integer") && serializedLambda.getImplMethodSignature().equals("()I")) {
                    return (v0) -> {
                        return v0.intValue();
                    };
                }
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("java/lang/Integer") && serializedLambda.getImplMethodSignature().equals("()I")) {
                    return (v0) -> {
                        return v0.intValue();
                    };
                }
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("java/lang/Integer") && serializedLambda.getImplMethodSignature().equals("()I")) {
                    return (v0) -> {
                        return v0.intValue();
                    };
                }
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("java/lang/Integer") && serializedLambda.getImplMethodSignature().equals("()I")) {
                    return (v0) -> {
                        return v0.intValue();
                    };
                }
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("java/lang/Integer") && serializedLambda.getImplMethodSignature().equals("()I")) {
                    return (v0) -> {
                        return v0.intValue();
                    };
                }
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("java/lang/Integer") && serializedLambda.getImplMethodSignature().equals("()I")) {
                    return (v0) -> {
                        return v0.intValue();
                    };
                }
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("java/lang/Integer") && serializedLambda.getImplMethodSignature().equals("()I")) {
                    return (v0) -> {
                        return v0.intValue();
                    };
                }
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("java/lang/Integer") && serializedLambda.getImplMethodSignature().equals("()I")) {
                    return (v0) -> {
                        return v0.intValue();
                    };
                }
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("java/lang/Integer") && serializedLambda.getImplMethodSignature().equals("()I")) {
                    return (v0) -> {
                        return v0.intValue();
                    };
                }
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("java/lang/Integer") && serializedLambda.getImplMethodSignature().equals("()I")) {
                    return (v0) -> {
                        return v0.intValue();
                    };
                }
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("java/lang/Integer") && serializedLambda.getImplMethodSignature().equals("()I")) {
                    return (v0) -> {
                        return v0.intValue();
                    };
                }
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("java/lang/Integer") && serializedLambda.getImplMethodSignature().equals("()I")) {
                    return (v0) -> {
                        return v0.intValue();
                    };
                }
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("java/lang/Integer") && serializedLambda.getImplMethodSignature().equals("()I")) {
                    return (v0) -> {
                        return v0.intValue();
                    };
                }
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("java/lang/Integer") && serializedLambda.getImplMethodSignature().equals("()I")) {
                    return (v0) -> {
                        return v0.intValue();
                    };
                }
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("java/lang/Integer") && serializedLambda.getImplMethodSignature().equals("()I")) {
                    return (v0) -> {
                        return v0.intValue();
                    };
                }
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("java/lang/Integer") && serializedLambda.getImplMethodSignature().equals("()I")) {
                    return (v0) -> {
                        return v0.intValue();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/api/graph/StreamGraphGeneratorBatchExecutionTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;")) {
                    return obj -> {
                        return obj;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/api/graph/StreamGraphGeneratorBatchExecutionTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;")) {
                    return obj22 -> {
                        return obj22;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/api/graph/StreamGraphGeneratorBatchExecutionTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;")) {
                    return obj32 -> {
                        return obj32;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/api/graph/StreamGraphGeneratorBatchExecutionTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;")) {
                    return obj4 -> {
                        return obj4;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
