/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.executiongraph;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.blob.BlobCacheSizeTracker;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.blob.BlobStore;
import org.apache.flink.runtime.blob.BlobView;
import org.apache.flink.runtime.blob.PermanentBlobCache;
import org.apache.flink.runtime.blob.VoidBlobStore;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.DefaultExecutionGraph;
import org.apache.flink.runtime.executiongraph.DefaultExecutionGraphDeploymentWithBlobCacheTest;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.TestingDefaultExecutionGraphBuilder;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.TestingLogicalSlot;
import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
import org.apache.flink.runtime.operators.BatchTask;
import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
import org.apache.flink.testutils.junit.utils.TempDirUtils;
import org.apache.flink.util.function.FunctionUtils;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

class DefaultExecutionGraphDeploymentWithSmallBlobCacheSizeLimitTest
extends DefaultExecutionGraphDeploymentWithBlobCacheTest {
    DefaultExecutionGraphDeploymentWithSmallBlobCacheSizeLimitTest() {
    }

    @Override
    @BeforeEach
    public void setupBlobServer() throws IOException {
        Configuration config = new Configuration();
        config.setInteger(BlobServerOptions.OFFLOAD_MINSIZE, 0);
        this.blobServer = new BlobServer(config, TempDirUtils.newFolder((Path)this.temporaryFolder), (BlobStore)new VoidBlobStore());
        this.blobServer.start();
        this.blobWriter = this.blobServer;
        InetSocketAddress serverAddress = new InetSocketAddress("localhost", this.blobServer.getPort());
        BlobCacheSizeTracker blobCacheSizeTracker = new BlobCacheSizeTracker(1L);
        this.blobCache = new PermanentBlobCache(config, TempDirUtils.newFolder((Path)this.temporaryFolder), (BlobView)new VoidBlobStore(), serverAddress, blobCacheSizeTracker);
    }

    @Test
    void testDeployMultipleTasksWithSmallBlobCacheSizeLimit() throws Exception {
        int numberOfVertices = 4;
        int parallelism = 10;
        ExecutionGraph eg = this.createAndSetupExecutionGraph(4, 10);
        SimpleAckingTaskManagerGateway taskManagerGateway = new SimpleAckingTaskManagerGateway();
        ArrayBlockingQueue tdds = new ArrayBlockingQueue(40);
        taskManagerGateway.setSubmitConsumer(FunctionUtils.uncheckedConsumer(taskDeploymentDescriptor -> {
            taskDeploymentDescriptor.loadBigData(this.blobCache);
            tdds.offer(taskDeploymentDescriptor);
        }));
        for (ExecutionJobVertex ejv : eg.getVerticesTopologically()) {
            for (ExecutionVertex ev : ejv.getTaskVertices()) {
                Assertions.assertThat((Comparable)ev.getExecutionState()).isEqualTo((Object)ExecutionState.CREATED);
                TestingLogicalSlot slot = new TestingLogicalSlotBuilder().setTaskManagerGateway(taskManagerGateway).createTestingLogicalSlot();
                Execution execution = ev.getCurrentExecutionAttempt();
                execution.transitionState(ExecutionState.SCHEDULED);
                execution.registerProducedPartitions(slot.getTaskManagerLocation()).get();
                ev.deployToSlot((LogicalSlot)slot);
                Assertions.assertThat((Comparable)ev.getExecutionState()).isEqualTo((Object)ExecutionState.DEPLOYING);
                TaskDeploymentDescriptor tdd = (TaskDeploymentDescriptor)tdds.take();
                Assertions.assertThat((Object)tdd).isNotNull();
                List igdds = tdd.getInputGates();
                Assertions.assertThat((List)igdds).hasSize(ev.getAllConsumedPartitionGroups().size());
                if (igdds.size() <= 0) continue;
                DefaultExecutionGraphDeploymentWithSmallBlobCacheSizeLimitTest.checkShuffleDescriptors((InputGateDeploymentDescriptor)igdds.get(0), ev.getConsumedPartitionGroup(0));
            }
        }
    }

    private ExecutionGraph createAndSetupExecutionGraph(int numberOfVertices, int parallelism) throws JobException, JobExecutionException {
        int i;
        ArrayList<JobVertex> vertices = new ArrayList<JobVertex>();
        for (i = 0; i < numberOfVertices; ++i) {
            JobVertex vertex = new JobVertex(String.format("v%d", i + 1), new JobVertexID());
            vertex.setParallelism(parallelism);
            vertex.setInvokableClass(BatchTask.class);
            vertices.add(vertex);
        }
        for (i = 1; i < numberOfVertices; ++i) {
            ((JobVertex)vertices.get(i)).connectNewDataSetAsInput((JobVertex)vertices.get(i - 1), DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
        }
        JobGraph jobGraph = JobGraphTestUtils.batchJobGraph(vertices.toArray(new JobVertex[0]));
        DirectScheduledExecutorService executor = new DirectScheduledExecutorService();
        DefaultExecutionGraph eg = TestingDefaultExecutionGraphBuilder.newBuilder().setJobGraph(jobGraph).setBlobWriter(this.blobWriter).build(executor);
        eg.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
        return eg;
    }

    private static void checkShuffleDescriptors(InputGateDeploymentDescriptor igdd, ConsumedPartitionGroup consumedPartitionGroup) {
        int idx = 0;
        for (IntermediateResultPartitionID consumedPartitionId : consumedPartitionGroup) {
            Assertions.assertThat((Object)igdd.getShuffleDescriptors()[idx++].getResultPartitionID().getPartitionId()).isEqualTo((Object)consumedPartitionId);
        }
    }
}

