package org.apache.flink.runtime.scheduler.adaptivebatch;

import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
import org.apache.flink.runtime.jobgraph.JobEdge;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmaster.event.ExecutionJobVertexFinishedEvent;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.graph.StreamGraphContext;
import org.apache.flink.streaming.api.graph.StreamNode;
import org.apache.flink.streaming.api.graph.util.ImmutableStreamEdge;
import org.apache.flink.streaming.api.graph.util.StreamEdgeUpdateRequestInfo;
import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
import org.apache.flink.streaming.runtime.partitioner.RescalePartitioner;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.util.DynamicCodeLoadingException;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

/* loaded from: input_file:org/apache/flink/runtime/scheduler/adaptivebatch/DefaultAdaptiveExecutionHandlerTest.class */
class DefaultAdaptiveExecutionHandlerTest {

    @RegisterExtension
    static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorExtension();

    /* loaded from: input_file:org/apache/flink/runtime/scheduler/adaptivebatch/DefaultAdaptiveExecutionHandlerTest$TestingStreamGraphOptimizerStrategy.class */
    public static final class TestingStreamGraphOptimizerStrategy implements StreamGraphOptimizationStrategy {
        private static final Set<String> convertToReBalanceEdgeIds = new HashSet();

        public boolean onOperatorsFinished(OperatorsFinished operatorsFinished, StreamGraphContext streamGraphContext) {
            List finishedStreamNodeIds = operatorsFinished.getFinishedStreamNodeIds();
            ArrayList arrayList = new ArrayList();
            Iterator it = finishedStreamNodeIds.iterator();
            while (it.hasNext()) {
                for (ImmutableStreamEdge immutableStreamEdge : streamGraphContext.getStreamGraph().getStreamNode((Integer) it.next()).getOutEdges()) {
                    if (convertToReBalanceEdgeIds.contains(immutableStreamEdge.getEdgeId())) {
                        StreamEdgeUpdateRequestInfo streamEdgeUpdateRequestInfo = new StreamEdgeUpdateRequestInfo(immutableStreamEdge.getEdgeId(), Integer.valueOf(immutableStreamEdge.getSourceId()), Integer.valueOf(immutableStreamEdge.getTargetId()));
                        streamEdgeUpdateRequestInfo.withOutputPartitioner(new RebalancePartitioner());
                        arrayList.add(streamEdgeUpdateRequestInfo);
                    }
                }
            }
            return streamGraphContext.modifyStreamEdge(arrayList);
        }
    }

    DefaultAdaptiveExecutionHandlerTest() {
    }

    @Test
    void testGetJobGraph() throws DynamicCodeLoadingException {
        JobGraph jobGraph = createAdaptiveExecutionHandler().getJobGraph();
        Assertions.assertThat(jobGraph).isNotNull();
        Assertions.assertThat(jobGraph.getNumberOfVertices()).isOne();
        Assertions.assertThat(((JobVertex) jobGraph.getVertices().iterator().next()).getName()).contains(new CharSequence[]{"Source"});
    }

    @Test
    void testHandleJobEvent() throws DynamicCodeLoadingException {
        ArrayList arrayList = new ArrayList();
        AtomicInteger atomicInteger = new AtomicInteger();
        DefaultAdaptiveExecutionHandler createAdaptiveExecutionHandler = createAdaptiveExecutionHandler((list, i) -> {
            arrayList.addAll(list);
            atomicInteger.set(i);
        }, createStreamGraph());
        createAdaptiveExecutionHandler.handleJobEvent(new ExecutionJobVertexFinishedEvent(((JobVertex) createAdaptiveExecutionHandler.getJobGraph().getVerticesSortedTopologicallyFromSources().stream().filter(jobVertex -> {
            return jobVertex.getName().contains("Source");
        }).findFirst().get()).getID(), Collections.emptyMap()));
        Assertions.assertThat(arrayList).hasSize(1);
        Assertions.assertThat(((JobVertex) arrayList.get(0)).getName()).contains(new CharSequence[]{"Map"});
        Assertions.assertThat(atomicInteger.get()).isOne();
        createAdaptiveExecutionHandler.handleJobEvent(new ExecutionJobVertexFinishedEvent(((JobVertex) arrayList.get(0)).getID(), Collections.emptyMap()));
        Assertions.assertThat(arrayList).hasSize(2);
        Assertions.assertThat(((JobVertex) arrayList.get(1)).getName()).contains(new CharSequence[]{"Sink"});
        Assertions.assertThat(atomicInteger.get()).isZero();
    }

    @Test
    void testOptimizeStreamGraph() throws DynamicCodeLoadingException {
        StreamGraph createStreamGraph = createStreamGraph();
        StreamNode streamNode = (StreamNode) createStreamGraph.getStreamNodes().stream().filter(streamNode2 -> {
            return streamNode2.getOperatorName().contains("Source");
        }).findFirst().get();
        StreamNode streamNode3 = (StreamNode) createStreamGraph.getStreamNodes().stream().filter(streamNode4 -> {
            return streamNode4.getOperatorName().contains("Map");
        }).findFirst().get();
        Assertions.assertThat(((StreamEdge) streamNode.getOutEdges().get(0)).getPartitioner()).isInstanceOf(ForwardPartitioner.class);
        Assertions.assertThat(((StreamEdge) streamNode3.getOutEdges().get(0)).getPartitioner()).isInstanceOf(RescalePartitioner.class);
        createStreamGraph.getJobConfiguration().set(StreamGraphOptimizationStrategy.STREAM_GRAPH_OPTIMIZATION_STRATEGY, Collections.singletonList(TestingStreamGraphOptimizerStrategy.class.getName()));
        TestingStreamGraphOptimizerStrategy.convertToReBalanceEdgeIds.add(((StreamEdge) streamNode.getOutEdges().get(0)).getEdgeId());
        TestingStreamGraphOptimizerStrategy.convertToReBalanceEdgeIds.add(((StreamEdge) streamNode3.getOutEdges().get(0)).getEdgeId());
        DefaultAdaptiveExecutionHandler createAdaptiveExecutionHandler = createAdaptiveExecutionHandler((list, i) -> {
        }, createStreamGraph);
        JobGraph jobGraph = createAdaptiveExecutionHandler.getJobGraph();
        JobVertex jobVertex = (JobVertex) jobGraph.getVertices().iterator().next();
        createAdaptiveExecutionHandler.handleJobEvent(new ExecutionJobVertexFinishedEvent(jobVertex.getID(), Collections.emptyMap()));
        Assertions.assertThat(((IntermediateDataSet) jobVertex.getProducedDataSets().get(0)).getConsumers()).hasSize(1);
        Assertions.assertThat(((JobEdge) ((IntermediateDataSet) jobVertex.getProducedDataSets().get(0)).getConsumers().get(0)).getShipStrategyName()).isEqualToIgnoringCase("forward");
        Iterator it = jobGraph.getVertices().iterator();
        it.next();
        JobVertex jobVertex2 = (JobVertex) it.next();
        createAdaptiveExecutionHandler.handleJobEvent(new ExecutionJobVertexFinishedEvent(jobVertex2.getID(), Collections.emptyMap()));
        Assertions.assertThat(((IntermediateDataSet) jobVertex2.getProducedDataSets().get(0)).getConsumers()).hasSize(1);
        Assertions.assertThat(((JobEdge) ((IntermediateDataSet) jobVertex2.getProducedDataSets().get(0)).getConsumers().get(0)).getShipStrategyName()).isEqualToIgnoringCase("rebalance");
    }

    @Test
    void testGetInitialParallelismAndNotifyJobVertexParallelismDecided() throws DynamicCodeLoadingException {
        DefaultAdaptiveExecutionHandler createAdaptiveExecutionHandler = createAdaptiveExecutionHandler((list, i) -> {
        }, createStreamGraph());
        JobGraph jobGraph = createAdaptiveExecutionHandler.getJobGraph();
        JobVertex jobVertex = (JobVertex) jobGraph.getVerticesSortedTopologicallyFromSources().stream().filter(jobVertex2 -> {
            return jobVertex2.getName().contains("Source");
        }).findFirst().get();
        Assertions.assertThat(createAdaptiveExecutionHandler.getInitialParallelism(jobVertex.getID())).isEqualTo(jobVertex.getParallelism());
        int nextInt = 1 + new Random().nextInt(8);
        createAdaptiveExecutionHandler.notifyJobVertexParallelismDecided(jobVertex.getID(), nextInt);
        createAdaptiveExecutionHandler.handleJobEvent(new ExecutionJobVertexFinishedEvent(jobVertex.getID(), Collections.emptyMap()));
        Assertions.assertThat(createAdaptiveExecutionHandler.getInitialParallelism(((JobVertex) jobGraph.getVerticesSortedTopologicallyFromSources().stream().filter(jobVertex3 -> {
            return jobVertex3.getName().contains("Map");
        }).findFirst().get()).getID())).isEqualTo(nextInt);
    }

    private DefaultAdaptiveExecutionHandler createAdaptiveExecutionHandler() throws DynamicCodeLoadingException {
        return createAdaptiveExecutionHandler((list, i) -> {
        }, createStreamGraph());
    }

    private StreamGraph createStreamGraph() {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.disableOperatorChaining();
        executionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH);
        executionEnvironment.fromSequence(0L, 1L).name("Source").forward().map(l -> {
            return l;
        }).name("Map").rescale().print().name("Sink").disableChaining();
        executionEnvironment.setParallelism(1);
        return executionEnvironment.getStreamGraph();
    }

    private DefaultAdaptiveExecutionHandler createAdaptiveExecutionHandler(JobGraphUpdateListener jobGraphUpdateListener, StreamGraph streamGraph) throws DynamicCodeLoadingException {
        DefaultAdaptiveExecutionHandler defaultAdaptiveExecutionHandler = new DefaultAdaptiveExecutionHandler(getClass().getClassLoader(), streamGraph, EXECUTOR_RESOURCE.getExecutor());
        defaultAdaptiveExecutionHandler.registerJobGraphUpdateListener(jobGraphUpdateListener);
        return defaultAdaptiveExecutionHandler;
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -817825389:
                if (implMethodName.equals("lambda$createStreamGraph$49000eda$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("map") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/runtime/scheduler/adaptivebatch/DefaultAdaptiveExecutionHandlerTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Long;)Ljava/lang/Long;")) {
                    return l -> {
                        return l;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
