package org.apache.flink.runtime.scheduler;

import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.TestingDefaultExecutionGraphBuilder;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.shuffle.PartitionDescriptor;
import org.apache.flink.runtime.shuffle.ProducerDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleMaster;
import org.apache.flink.runtime.shuffle.TaskInputsOutputsDescriptor;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/scheduler/SsgNetworkMemoryCalculationUtilsTest.class */
public class SsgNetworkMemoryCalculationUtilsTest {
    private static final TestShuffleMaster SHUFFLE_MASTER = new TestShuffleMaster();
    private static final ResourceProfile DEFAULT_RESOURCE = ResourceProfile.fromResources(1.0d, 100);
    private JobGraph jobGraph;
    private ExecutionGraph executionGraph;
    private List<SlotSharingGroup> slotSharingGroups;

    /* loaded from: input_file:org/apache/flink/runtime/scheduler/SsgNetworkMemoryCalculationUtilsTest$TestShuffleMaster.class */
    private static class TestShuffleMaster implements ShuffleMaster<ShuffleDescriptor> {
        private TestShuffleMaster() {
        }

        public CompletableFuture<ShuffleDescriptor> registerPartitionWithProducer(JobID jobID, PartitionDescriptor partitionDescriptor, ProducerDescriptor producerDescriptor) {
            return null;
        }

        public void releasePartitionExternally(ShuffleDescriptor shuffleDescriptor) {
        }

        public MemorySize computeShuffleMemorySizeForTask(TaskInputsOutputsDescriptor taskInputsOutputsDescriptor) {
            return new MemorySize(computeRequiredShuffleMemoryBytes(taskInputsOutputsDescriptor.getInputChannelNums().values().stream().mapToInt((v0) -> {
                return v0.intValue();
            }).sum(), taskInputsOutputsDescriptor.getSubpartitionNums().values().stream().mapToInt((v0) -> {
                return v0.intValue();
            }).sum()));
        }

        static int computeRequiredShuffleMemoryBytes(int i, int i2) {
            return (i * 10000) + i2;
        }
    }

    @Test
    public void testGenerateEnrichedResourceProfile() throws Exception {
        setup(DEFAULT_RESOURCE);
        this.slotSharingGroups.forEach(slotSharingGroup -> {
            Map allVertices = this.executionGraph.getAllVertices();
            allVertices.getClass();
            SsgNetworkMemoryCalculationUtils.enrichNetworkMemory(slotSharingGroup, (v1) -> {
                return r1.get(v1);
            }, SHUFFLE_MASTER);
        });
        Assert.assertEquals(new MemorySize(TestShuffleMaster.computeRequiredShuffleMemoryBytes(0, 2) + TestShuffleMaster.computeRequiredShuffleMemoryBytes(1, 6)), this.slotSharingGroups.get(0).getResourceProfile().getNetworkMemory());
        Assert.assertEquals(new MemorySize(TestShuffleMaster.computeRequiredShuffleMemoryBytes(5, 0)), this.slotSharingGroups.get(1).getResourceProfile().getNetworkMemory());
    }

    @Test
    public void testGenerateUnknownResourceProfile() throws Exception {
        setup(ResourceProfile.UNKNOWN);
        this.slotSharingGroups.forEach(slotSharingGroup -> {
            Map allVertices = this.executionGraph.getAllVertices();
            allVertices.getClass();
            SsgNetworkMemoryCalculationUtils.enrichNetworkMemory(slotSharingGroup, (v1) -> {
                return r1.get(v1);
            }, SHUFFLE_MASTER);
        });
        Iterator<SlotSharingGroup> it = this.slotSharingGroups.iterator();
        while (it.hasNext()) {
            Assert.assertEquals(ResourceProfile.UNKNOWN, it.next().getResourceProfile());
        }
    }

    private void setup(ResourceProfile resourceProfile) throws Exception {
        this.slotSharingGroups = Arrays.asList(new SlotSharingGroup(), new SlotSharingGroup());
        Iterator<SlotSharingGroup> it = this.slotSharingGroups.iterator();
        while (it.hasNext()) {
            it.next().setResourceProfile(resourceProfile);
        }
        this.jobGraph = createJobGraph(this.slotSharingGroups);
        this.executionGraph = TestingDefaultExecutionGraphBuilder.newBuilder().setJobGraph(this.jobGraph).build();
    }

    private static JobGraph createJobGraph(List<SlotSharingGroup> list) {
        JobVertex jobVertex = new JobVertex("source");
        jobVertex.setInvokableClass(NoOpInvokable.class);
        jobVertex.setParallelism(4);
        jobVertex.setSlotSharingGroup(list.get(0));
        JobVertex jobVertex2 = new JobVertex("map");
        jobVertex2.setInvokableClass(NoOpInvokable.class);
        jobVertex2.setParallelism(5);
        jobVertex2.setSlotSharingGroup(list.get(0));
        JobVertex jobVertex3 = new JobVertex("sink");
        jobVertex3.setInvokableClass(NoOpInvokable.class);
        jobVertex3.setParallelism(6);
        jobVertex3.setSlotSharingGroup(list.get(1));
        jobVertex2.connectNewDataSetAsInput(jobVertex, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
        jobVertex3.connectNewDataSetAsInput(jobVertex2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        return JobGraphTestUtils.streamingJobGraph(jobVertex, jobVertex2, jobVertex3);
    }
}
