package org.apache.flink.streaming.api.graph;

import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.shaded.guava32.com.google.common.collect.Iterables;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
import org.apache.flink.streaming.api.graph.JobGraphGeneratorTestBase;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.transformations.MultipleInputTransformation;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/streaming/api/graph/AdaptiveGraphManagerTest.class */
public class AdaptiveGraphManagerTest extends JobGraphGeneratorTestBase {
    @Override // org.apache.flink.streaming.api.graph.JobGraphGeneratorTestBase
    JobGraph createJobGraph(StreamGraph streamGraph) {
        return generateJobGraphInLazilyMode(streamGraph);
    }

    @Override // org.apache.flink.streaming.api.graph.JobGraphGeneratorTestBase
    void verifyManagedMemoryFractionForUnknownResourceSpec(JobVertex jobVertex, JobVertex jobVertex2, JobVertex jobVertex3, Configuration configuration) {
        StreamConfig streamConfig = new StreamConfig(jobVertex.getConfiguration());
        verifyFractions(streamConfig, 0.3d, 0.0d, 0.0d, configuration);
        verifyFractions((StreamConfig) Iterables.getOnlyElement(streamConfig.getTransitiveChainedTaskConfigs(JobGraphGeneratorTestBase.class.getClassLoader()).values()), 0.3d, 0.4d, 0.0d, configuration);
        verifyFractions(new StreamConfig(jobVertex2.getConfiguration()), 0.0d, 1.0d, 0.0d, configuration);
        verifyFractions(new StreamConfig(jobVertex3.getConfiguration()), 1.0d, 0.0d, 0.0d, configuration);
    }

    @Test
    void testCreateJobVertexLazily() {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(1);
        executionEnvironment.fromData(new String[]{"a", "b", "c", "d", "e", "f"}).map(new MapFunction<String, Tuple2<String, String>>() { // from class: org.apache.flink.streaming.api.graph.AdaptiveGraphManagerTest.1
            public Tuple2<String, String> map(String str) {
                return new Tuple2<>(str, str);
            }
        }).keyBy(tuple2 -> {
            return (String) tuple2.f0;
        }).map(new MapFunction<Tuple2<String, String>, Tuple2<String, String>>() { // from class: org.apache.flink.streaming.api.graph.AdaptiveGraphManagerTest.2
            public Tuple2<String, String> map(Tuple2<String, String> tuple22) {
                return tuple22;
            }
        }).sinkTo(new DiscardingSink());
        StreamGraph streamGraph = executionEnvironment.getStreamGraph();
        streamGraph.setDynamic(true);
        AdaptiveGraphManager adaptiveGraphManager = new AdaptiveGraphManager(Thread.currentThread().getContextClassLoader(), streamGraph, (v0) -> {
            v0.run();
        });
        JobGraph jobGraph = adaptiveGraphManager.getJobGraph();
        List verticesSortedTopologicallyFromSources = jobGraph.getVerticesSortedTopologicallyFromSources();
        Assertions.assertThat(verticesSortedTopologicallyFromSources.size()).isEqualTo(1);
        while (!verticesSortedTopologicallyFromSources.isEmpty()) {
            ArrayList arrayList = new ArrayList();
            Iterator it = verticesSortedTopologicallyFromSources.iterator();
            while (it.hasNext()) {
                arrayList.addAll(adaptiveGraphManager.onJobVertexFinished(((JobVertex) it.next()).getID()));
            }
            verticesSortedTopologicallyFromSources = arrayList;
        }
        List verticesSortedTopologicallyFromSources2 = jobGraph.getVerticesSortedTopologicallyFromSources();
        List list = (List) streamGraph.getStreamNodes().stream().sorted(Comparator.comparingInt((v0) -> {
            return v0.getId();
        })).collect(Collectors.toList());
        Assertions.assertThat(verticesSortedTopologicallyFromSources2.size()).isEqualTo(2);
        Assertions.assertThat(adaptiveGraphManager.getPendingOperatorsCount()).isEqualTo(0);
        Assertions.assertThat(adaptiveGraphManager.getStreamNodeIdsByJobVertexId(((JobVertex) verticesSortedTopologicallyFromSources2.get(0)).getID())).isEqualTo(List.of(Integer.valueOf(((StreamNode) list.get(1)).getId()), Integer.valueOf(((StreamNode) list.get(0)).getId())));
        Assertions.assertThat(adaptiveGraphManager.getStreamNodeIdsByJobVertexId(((JobVertex) verticesSortedTopologicallyFromSources2.get(1)).getID())).isEqualTo(List.of(Integer.valueOf(((StreamNode) list.get(3)).getId()), Integer.valueOf(((StreamNode) list.get(2)).getId())));
        Assertions.assertThat(adaptiveGraphManager.getProducerStreamNodeId(((IntermediateDataSet) ((JobVertex) verticesSortedTopologicallyFromSources2.get(0)).getProducedDataSets().get(0)).getId())).isEqualTo(((StreamNode) list.get(1)).getId());
        Assertions.assertThat(adaptiveGraphManager.getOutputStreamEdges(((IntermediateDataSet) ((JobVertex) verticesSortedTopologicallyFromSources2.get(0)).getProducedDataSets().get(0)).getId())).isEqualTo(((StreamNode) list.get(1)).getOutEdges());
        Set vertexIds = adaptiveGraphManager.getStreamNodeForwardGroupByVertexId(((JobVertex) verticesSortedTopologicallyFromSources2.get(0)).getID()).getVertexIds();
        Assertions.assertThat(vertexIds.size()).isEqualTo(2);
        Assertions.assertThat(vertexIds.contains(Integer.valueOf(((StreamNode) list.get(0)).getId()))).isTrue();
        Assertions.assertThat(vertexIds.contains(Integer.valueOf(((StreamNode) list.get(1)).getId()))).isTrue();
    }

    @Test
    void testTheCorrectnessOfJobGraph() {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(1);
        executionEnvironment.fromData(new String[]{"a", "b", "c", "d", "e", "f"}).map(new MapFunction<String, Tuple2<String, String>>() { // from class: org.apache.flink.streaming.api.graph.AdaptiveGraphManagerTest.3
            public Tuple2<String, String> map(String str) {
                return new Tuple2<>(str, str);
            }
        }).keyBy(tuple2 -> {
            return (String) tuple2.f0;
        }).map(new MapFunction<Tuple2<String, String>, Tuple2<String, String>>() { // from class: org.apache.flink.streaming.api.graph.AdaptiveGraphManagerTest.4
            public Tuple2<String, String> map(Tuple2<String, String> tuple22) {
                return tuple22;
            }
        }).sinkTo(new DiscardingSink());
        StreamGraph streamGraph = executionEnvironment.getStreamGraph();
        Assertions.assertThat(isJobGraphEquivalent(generateJobGraphInLazilyMode(streamGraph), StreamingJobGraphGenerator.createJobGraph(streamGraph))).isEqualTo(true);
    }

    @Test
    void testSourceChain() {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setMaxParallelism(100);
        executionEnvironment.setParallelism(100);
        MultipleInputTransformation multipleInputTransformation = new MultipleInputTransformation("mit", new JobGraphGeneratorTestBase.UnusedOperatorFactory(), Types.LONG, -1);
        Transformation transformation = executionEnvironment.fromSource(new NumberSequenceSource(1L, 2L), WatermarkStrategy.noWatermarks(), "input1").setParallelism(100).getTransformation();
        Transformation transformation2 = executionEnvironment.fromSource(new NumberSequenceSource(1L, 2L), WatermarkStrategy.noWatermarks(), "input2").setParallelism(1).getTransformation();
        Transformation transformation3 = executionEnvironment.fromSource(new NumberSequenceSource(1L, 2L), WatermarkStrategy.noWatermarks(), "input3").setParallelism(1).getTransformation();
        multipleInputTransformation.addInput(transformation);
        multipleInputTransformation.addInput(transformation2);
        multipleInputTransformation.addInput(transformation3);
        multipleInputTransformation.setChainingStrategy(ChainingStrategy.HEAD_WITH_SOURCES);
        new DataStream(executionEnvironment, multipleInputTransformation).rebalance().sinkTo(new DiscardingSink()).name("sink");
        executionEnvironment.addOperator(multipleInputTransformation);
        StreamGraph streamGraph = executionEnvironment.getStreamGraph();
        streamGraph.setDynamic(true);
        Assertions.assertThat(createJobGraph(streamGraph).getVerticesSortedTopologicallyFromSources().size()).isEqualTo(4);
    }

    private static JobGraph generateJobGraphInLazilyMode(StreamGraph streamGraph) {
        AdaptiveGraphManager adaptiveGraphManager = new AdaptiveGraphManager(Thread.currentThread().getContextClassLoader(), streamGraph, (v0) -> {
            v0.run();
        });
        JobGraph jobGraph = adaptiveGraphManager.getJobGraph();
        List verticesSortedTopologicallyFromSources = jobGraph.getVerticesSortedTopologicallyFromSources();
        while (true) {
            List list = verticesSortedTopologicallyFromSources;
            if (list.isEmpty()) {
                return jobGraph;
            }
            ArrayList arrayList = new ArrayList();
            Iterator it = list.iterator();
            while (it.hasNext()) {
                arrayList.addAll(adaptiveGraphManager.onJobVertexFinished(((JobVertex) it.next()).getID()));
            }
            verticesSortedTopologicallyFromSources = arrayList;
        }
    }

    private static boolean isJobGraphEquivalent(JobGraph jobGraph, JobGraph jobGraph2) {
        Assertions.assertThat(jobGraph.getJobConfiguration()).isEqualTo(jobGraph2.getJobConfiguration());
        Assertions.assertThat(jobGraph.getJobType()).isEqualTo(jobGraph2.getJobType());
        Assertions.assertThat(jobGraph.isDynamic()).isEqualTo(jobGraph2.isDynamic());
        Assertions.assertThat(jobGraph.isApproximateLocalRecoveryEnabled()).isEqualTo(jobGraph2.isApproximateLocalRecoveryEnabled());
        Assertions.assertThat(jobGraph.getSerializedExecutionConfig()).isEqualTo(jobGraph2.getSerializedExecutionConfig());
        Assertions.assertThat(jobGraph.getCheckpointingSettings().toString()).isEqualTo(jobGraph2.getCheckpointingSettings().toString());
        Assertions.assertThat(jobGraph.getSavepointRestoreSettings()).isEqualTo(jobGraph2.getSavepointRestoreSettings());
        Assertions.assertThat(jobGraph.getUserJars()).isEqualTo(jobGraph2.getUserJars());
        Assertions.assertThat(jobGraph.getUserArtifacts()).isEqualTo(jobGraph2.getUserArtifacts());
        Assertions.assertThat(jobGraph.getUserJarBlobKeys()).isEqualTo(jobGraph2.getUserJarBlobKeys());
        Assertions.assertThat(jobGraph.getClasspaths()).isEqualTo(jobGraph2.getClasspaths());
        Assertions.assertThat(jobGraph.getJobStatusHooks()).isEqualTo(jobGraph2.getJobStatusHooks());
        List verticesSortedTopologicallyFromSources = jobGraph.getVerticesSortedTopologicallyFromSources();
        List verticesSortedTopologicallyFromSources2 = jobGraph2.getVerticesSortedTopologicallyFromSources();
        Assertions.assertThat(verticesSortedTopologicallyFromSources.size()).isEqualTo(verticesSortedTopologicallyFromSources2.size());
        for (int i = 1; i < verticesSortedTopologicallyFromSources.size(); i++) {
            JobVertex jobVertex = (JobVertex) verticesSortedTopologicallyFromSources.get(i);
            JobVertex jobVertex2 = (JobVertex) verticesSortedTopologicallyFromSources2.get(i);
            Assertions.assertThat(jobVertex.getID()).isEqualTo(jobVertex2.getID());
            Assertions.assertThat(jobVertex.getInputs().size()).isEqualTo(jobVertex2.getInputs().size());
            Assertions.assertThat(jobVertex.getParallelism()).isEqualTo(jobVertex2.getParallelism());
            Assertions.assertThat(jobVertex.getMaxParallelism()).isEqualTo(jobVertex2.getMaxParallelism());
            Assertions.assertThat(jobVertex.getMinResources()).isEqualTo(jobVertex2.getMinResources());
            Assertions.assertThat(jobVertex.getPreferredResources()).isEqualTo(jobVertex2.getPreferredResources());
            Assertions.assertThat(jobVertex.getInvokableClassName()).isEqualTo(jobVertex2.getInvokableClassName());
            Assertions.assertThat(jobVertex.getName()).isEqualTo(jobVertex2.getName());
            Assertions.assertThat(jobVertex.getOperatorName()).isEqualTo(jobVertex2.getOperatorName());
            Assertions.assertThat(jobVertex.isSupportsConcurrentExecutionAttempts()).isEqualTo(jobVertex2.isSupportsConcurrentExecutionAttempts());
            Assertions.assertThat(jobVertex.isAnyOutputBlocking()).isEqualTo(jobVertex2.isAnyOutputBlocking());
            Assertions.assertThat(jobVertex.isParallelismConfigured()).isEqualTo(jobVertex2.isParallelismConfigured());
        }
        return true;
    }

    @Override // org.apache.flink.streaming.api.graph.JobGraphGeneratorTestBase
    public /* bridge */ /* synthetic */ JobGraph createGraphWithMultipleInputs(boolean z, String str, String[] strArr) {
        return super.createGraphWithMultipleInputs(z, str, strArr);
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -612272042:
                if (implMethodName.equals("lambda$testCreateJobVertexLazily$9747a172$1")) {
                    z = true;
                    break;
                }
                break;
            case 1298449686:
                if (implMethodName.equals("lambda$testTheCorrectnessOfJobGraph$9747a172$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/api/graph/AdaptiveGraphManagerTest") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/api/java/tuple/Tuple2;)Ljava/lang/String;")) {
                    return tuple2 -> {
                        return (String) tuple2.f0;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/api/graph/AdaptiveGraphManagerTest") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/api/java/tuple/Tuple2;)Ljava/lang/String;")) {
                    return tuple22 -> {
                        return (String) tuple22.f0;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
