/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.graph;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.shaded.guava33.com.google.common.collect.Iterables;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
import org.apache.flink.streaming.api.graph.AdaptiveGraphManager;
import org.apache.flink.streaming.api.graph.JobGraphGeneratorTestBase;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.graph.StreamNode;
import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.transformations.MultipleInputTransformation;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.apache.flink.streaming.api.transformations.StreamExchangeMode;
import org.apache.flink.streaming.runtime.partitioner.ForwardForConsecutiveHashPartitioner;
import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

public class AdaptiveGraphManagerTest
extends JobGraphGeneratorTestBase {
    @Override
    JobGraph createJobGraph(StreamGraph streamGraph) {
        return AdaptiveGraphManagerTest.generateJobGraphInLazilyMode(streamGraph);
    }

    @Override
    void verifyManagedMemoryFractionForUnknownResourceSpec(JobVertex vertex1, JobVertex vertex2, JobVertex vertex3, Configuration taskManagerConfig) {
        StreamConfig sourceConfig = new StreamConfig(vertex1.getConfiguration());
        AdaptiveGraphManagerTest.verifyFractions(sourceConfig, 0.3, 0.0, 0.0, taskManagerConfig);
        StreamConfig map1Config = (StreamConfig)Iterables.getOnlyElement(sourceConfig.getTransitiveChainedTaskConfigs(JobGraphGeneratorTestBase.class.getClassLoader()).values());
        AdaptiveGraphManagerTest.verifyFractions(map1Config, 0.3, 0.4, 0.0, taskManagerConfig);
        StreamConfig map2Config = new StreamConfig(vertex2.getConfiguration());
        AdaptiveGraphManagerTest.verifyFractions(map2Config, 0.0, 1.0, 0.0, taskManagerConfig);
        StreamConfig map3Config = new StreamConfig(vertex3.getConfiguration());
        AdaptiveGraphManagerTest.verifyFractions(map3Config, 1.0, 0.0, 0.0, taskManagerConfig);
    }

    @Test
    void testCreateJobVertexLazily() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        SingleOutputStreamOperator input = env.fromData((Object[])new String[]{"a", "b", "c", "d", "e", "f"}).map((MapFunction)new MapFunction<String, Tuple2<String, String>>(){

            public Tuple2<String, String> map(String value) {
                return new Tuple2((Object)value, (Object)value);
            }
        });
        SingleOutputStreamOperator result = input.keyBy((KeySelector & Serializable)x -> (String)x.f0).map((MapFunction)new MapFunction<Tuple2<String, String>, Tuple2<String, String>>(){

            public Tuple2<String, String> map(Tuple2<String, String> value) {
                return value;
            }
        });
        result.sinkTo((Sink)new DiscardingSink());
        StreamGraph streamGraph = env.getStreamGraph();
        streamGraph.setDynamic(true);
        AdaptiveGraphManager adaptiveGraphManager = new AdaptiveGraphManager(Thread.currentThread().getContextClassLoader(), streamGraph, Runnable::run);
        JobGraph jobGraph = adaptiveGraphManager.getJobGraph();
        ArrayList jobVertices = jobGraph.getVerticesSortedTopologicallyFromSources();
        Assertions.assertThat((int)jobVertices.size()).isEqualTo(1);
        while (!jobVertices.isEmpty()) {
            ArrayList newJobVertices = new ArrayList();
            for (JobVertex jobVertex : jobVertices) {
                newJobVertices.addAll(adaptiveGraphManager.onJobVertexFinished(jobVertex.getID()));
            }
            jobVertices = newJobVertices;
        }
        jobVertices = jobGraph.getVerticesSortedTopologicallyFromSources();
        List streamNodes = streamGraph.getStreamNodes().stream().sorted(Comparator.comparingInt(StreamNode::getId)).collect(Collectors.toList());
        Assertions.assertThat((int)jobVertices.size()).isEqualTo(2);
        Assertions.assertThat((int)adaptiveGraphManager.getPendingOperatorsCount()).isEqualTo(0);
        Assertions.assertThat((List)adaptiveGraphManager.getStreamNodeIdsByJobVertexId(((JobVertex)jobVertices.get(0)).getID())).isEqualTo(List.of(Integer.valueOf(((StreamNode)streamNodes.get(1)).getId()), Integer.valueOf(((StreamNode)streamNodes.get(0)).getId())));
        Assertions.assertThat((List)adaptiveGraphManager.getStreamNodeIdsByJobVertexId(((JobVertex)jobVertices.get(1)).getID())).isEqualTo(List.of(Integer.valueOf(((StreamNode)streamNodes.get(3)).getId()), Integer.valueOf(((StreamNode)streamNodes.get(2)).getId())));
        Assertions.assertThat((Integer)adaptiveGraphManager.getProducerStreamNodeId(((IntermediateDataSet)((JobVertex)jobVertices.get(0)).getProducedDataSets().get(0)).getId())).isEqualTo(((StreamNode)streamNodes.get(1)).getId());
        Assertions.assertThat((List)adaptiveGraphManager.getOutputStreamEdges(((IntermediateDataSet)((JobVertex)jobVertices.get(0)).getProducedDataSets().get(0)).getId())).isEqualTo((Object)((StreamNode)streamNodes.get(1)).getOutEdges());
        Set forwardGroup = adaptiveGraphManager.getStreamNodeForwardGroupByVertexId(((JobVertex)jobVertices.get(0)).getID()).getVertexIds();
        Assertions.assertThat((int)forwardGroup.size()).isEqualTo(2);
        Assertions.assertThat((boolean)forwardGroup.contains(((StreamNode)streamNodes.get(0)).getId())).isTrue();
        Assertions.assertThat((boolean)forwardGroup.contains(((StreamNode)streamNodes.get(1)).getId())).isTrue();
    }

    @Test
    void testTheCorrectnessOfJobGraph() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        SingleOutputStreamOperator input = env.fromData((Object[])new String[]{"a", "b", "c", "d", "e", "f"}).map((MapFunction)new MapFunction<String, Tuple2<String, String>>(){

            public Tuple2<String, String> map(String value) {
                return new Tuple2((Object)value, (Object)value);
            }
        });
        SingleOutputStreamOperator result = input.keyBy((KeySelector & Serializable)x -> (String)x.f0).map((MapFunction)new MapFunction<Tuple2<String, String>, Tuple2<String, String>>(){

            public Tuple2<String, String> map(Tuple2<String, String> value) {
                return value;
            }
        });
        result.sinkTo((Sink)new DiscardingSink());
        StreamGraph streamGraph1 = env.getStreamGraph(false);
        JobGraph jobGraph1 = AdaptiveGraphManagerTest.generateJobGraphInLazilyMode(streamGraph1);
        StreamGraph streamGraph2 = env.getStreamGraph(false);
        JobGraph jobGraph2 = StreamingJobGraphGenerator.createJobGraph((StreamGraph)streamGraph2);
        Assertions.assertThat((boolean)AdaptiveGraphManagerTest.isJobGraphEquivalent(jobGraph1, jobGraph2)).isEqualTo(true);
    }

    @Test
    void testSourceChain() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setMaxParallelism(100);
        env.setParallelism(100);
        MultipleInputTransformation transform = new MultipleInputTransformation("mit", (StreamOperatorFactory)new JobGraphGeneratorTestBase.UnusedOperatorFactory(), Types.LONG, -1);
        Transformation input1 = env.fromSource((Source)new NumberSequenceSource(1L, 2L), WatermarkStrategy.noWatermarks(), "input1").setParallelism(100).getTransformation();
        Transformation input2 = env.fromSource((Source)new NumberSequenceSource(1L, 2L), WatermarkStrategy.noWatermarks(), "input2").setParallelism(1).getTransformation();
        Transformation input3 = env.fromSource((Source)new NumberSequenceSource(1L, 2L), WatermarkStrategy.noWatermarks(), "input3").setParallelism(1).getTransformation();
        transform.addInput(input1);
        transform.addInput(input2);
        transform.addInput(input3);
        transform.setChainingStrategy(ChainingStrategy.HEAD_WITH_SOURCES);
        DataStream dataStream = new DataStream(env, (Transformation)transform);
        dataStream.rebalance().sinkTo((Sink)new DiscardingSink()).name("sink");
        env.addOperator((Transformation)transform);
        StreamGraph streamGraph = env.getStreamGraph();
        streamGraph.setDynamic(true);
        JobGraph jobGraph = this.createJobGraph(streamGraph);
        Assertions.assertThat((int)jobGraph.getVerticesSortedTopologicallyFromSources().size()).isEqualTo(4);
    }

    @Test
    void testForwardForConsecutiveHashPartitionerChain() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
        env.setParallelism(1);
        DataStreamSource source = env.fromData((Object[])new Integer[]{1, 2, 3});
        DataStream forward = new DataStream(env, (Transformation)new PartitionTransformation(source.getTransformation(), (StreamPartitioner)new ForwardForConsecutiveHashPartitioner((StreamPartitioner)new RebalancePartitioner()), StreamExchangeMode.BATCH));
        forward.print();
        StreamGraph streamGraph1 = env.getStreamGraph(false);
        streamGraph1.setDynamic(true);
        JobGraph jobGraph1 = AdaptiveGraphManagerTest.generateJobGraphInLazilyMode(streamGraph1);
        StreamGraph streamGraph2 = env.getStreamGraph(false);
        streamGraph2.setDynamic(true);
        JobGraph jobGraph2 = StreamingJobGraphGenerator.createJobGraph((StreamGraph)streamGraph2);
        Assertions.assertThat((boolean)AdaptiveGraphManagerTest.isJobGraphEquivalent(jobGraph1, jobGraph2)).isTrue();
    }

    private static JobGraph generateJobGraphInLazilyMode(StreamGraph streamGraph) {
        AdaptiveGraphManager adaptiveGraphManager = new AdaptiveGraphManager(Thread.currentThread().getContextClassLoader(), streamGraph, Runnable::run);
        JobGraph jobGraph = adaptiveGraphManager.getJobGraph();
        ArrayList jobVertices = jobGraph.getVerticesSortedTopologicallyFromSources();
        while (!jobVertices.isEmpty()) {
            ArrayList newJobVertices = new ArrayList();
            for (JobVertex jobVertex : jobVertices) {
                newJobVertices.addAll(adaptiveGraphManager.onJobVertexFinished(jobVertex.getID()));
            }
            jobVertices = newJobVertices;
        }
        return jobGraph;
    }

    private static boolean isJobGraphEquivalent(JobGraph jobGraph1, JobGraph jobGraph2) {
        Assertions.assertThat((Object)jobGraph1.getJobConfiguration()).isEqualTo((Object)jobGraph2.getJobConfiguration());
        Assertions.assertThat((Comparable)jobGraph1.getJobType()).isEqualTo((Object)jobGraph2.getJobType());
        Assertions.assertThat((boolean)jobGraph1.isDynamic()).isEqualTo(jobGraph2.isDynamic());
        Assertions.assertThat((boolean)jobGraph1.isApproximateLocalRecoveryEnabled()).isEqualTo(jobGraph2.isApproximateLocalRecoveryEnabled());
        Assertions.assertThat((Object)jobGraph1.getSerializedExecutionConfig()).isEqualTo((Object)jobGraph2.getSerializedExecutionConfig());
        Assertions.assertThat((String)jobGraph1.getCheckpointingSettings().toString()).isEqualTo(jobGraph2.getCheckpointingSettings().toString());
        Assertions.assertThat((Object)jobGraph1.getSavepointRestoreSettings()).isEqualTo((Object)jobGraph2.getSavepointRestoreSettings());
        Assertions.assertThat((List)jobGraph1.getUserJars()).isEqualTo((Object)jobGraph2.getUserJars());
        Assertions.assertThat((Map)jobGraph1.getUserArtifacts()).isEqualTo((Object)jobGraph2.getUserArtifacts());
        Assertions.assertThat((List)jobGraph1.getUserJarBlobKeys()).isEqualTo((Object)jobGraph2.getUserJarBlobKeys());
        Assertions.assertThat((List)jobGraph1.getClasspaths()).isEqualTo((Object)jobGraph2.getClasspaths());
        Assertions.assertThat((List)jobGraph1.getJobStatusHooks()).isEqualTo((Object)jobGraph2.getJobStatusHooks());
        List vertices1 = jobGraph1.getVerticesSortedTopologicallyFromSources();
        List vertices2 = jobGraph2.getVerticesSortedTopologicallyFromSources();
        Assertions.assertThat((int)vertices1.size()).isEqualTo(vertices2.size());
        for (int i = 1; i < vertices1.size(); ++i) {
            JobVertex vertex1 = (JobVertex)vertices1.get(i);
            JobVertex vertex2 = (JobVertex)vertices2.get(i);
            Assertions.assertThat((Comparable)vertex1.getID()).isEqualTo((Object)vertex2.getID());
            Assertions.assertThat((int)vertex1.getInputs().size()).isEqualTo(vertex2.getInputs().size());
            Assertions.assertThat((int)vertex1.getParallelism()).isEqualTo(vertex2.getParallelism());
            Assertions.assertThat((int)vertex1.getMaxParallelism()).isEqualTo(vertex2.getMaxParallelism());
            Assertions.assertThat((Object)vertex1.getMinResources()).isEqualTo((Object)vertex2.getMinResources());
            Assertions.assertThat((Object)vertex1.getPreferredResources()).isEqualTo((Object)vertex2.getPreferredResources());
            Assertions.assertThat((String)vertex1.getInvokableClassName()).isEqualTo(vertex2.getInvokableClassName());
            Assertions.assertThat((String)vertex1.getName()).isEqualTo(vertex2.getName());
            Assertions.assertThat((String)vertex1.getOperatorName()).isEqualTo(vertex2.getOperatorName());
            Assertions.assertThat((boolean)vertex1.isSupportsConcurrentExecutionAttempts()).isEqualTo(vertex2.isSupportsConcurrentExecutionAttempts());
            Assertions.assertThat((boolean)vertex1.isAnyOutputBlocking()).isEqualTo(vertex2.isAnyOutputBlocking());
            Assertions.assertThat((boolean)vertex1.isParallelismConfigured()).isEqualTo(vertex2.isParallelismConfigured());
        }
        return true;
    }
}

