/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.jobmanager.scheduler;

import java.time.Duration;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.PartitionInfo;
import org.apache.flink.runtime.executiongraph.utils.ExecutionUtils;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.scheduler.DefaultScheduler;
import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder;
import org.apache.flink.runtime.scheduler.SchedulerBase;
import org.apache.flink.runtime.scheduler.TestExecutionSlotAllocatorFactory;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.shuffle.NettyShuffleDescriptor;
import org.apache.flink.runtime.shuffle.UnknownShuffleDescriptor;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.util.JobVertexConnectionUtils;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorResource;
import org.apache.flink.util.IterableUtils;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.TriConsumer;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;

public class UpdatePartitionConsumersTest
extends TestLogger {
    @ClassRule
    public static final TestExecutorResource<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorResource();
    private static final long TIMEOUT = 5000L;
    private JobGraph jobGraph;
    private JobVertex v1;
    private JobVertex v2;
    private JobVertex v3;
    private JobVertex v4;

    @Before
    public void setUp() {
        this.buildJobGraphWithBlockingEdgeWithinRegion();
    }

    private void buildJobGraphWithBlockingEdgeWithinRegion() {
        this.v1 = new JobVertex("v1");
        this.v1.setInvokableClass(AbstractInvokable.class);
        this.v1.setParallelism(1);
        this.v2 = new JobVertex("v2");
        this.v2.setInvokableClass(AbstractInvokable.class);
        this.v2.setParallelism(1);
        this.v3 = new JobVertex("v3");
        this.v3.setInvokableClass(AbstractInvokable.class);
        this.v3.setParallelism(1);
        this.v4 = new JobVertex("v4");
        this.v4.setInvokableClass(AbstractInvokable.class);
        this.v4.setParallelism(1);
        JobVertexConnectionUtils.connectNewDataSetAsInput(this.v2, this.v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        JobVertexConnectionUtils.connectNewDataSetAsInput(this.v3, this.v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        JobVertexConnectionUtils.connectNewDataSetAsInput(this.v4, this.v2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        JobVertexConnectionUtils.connectNewDataSetAsInput(this.v4, this.v3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
        this.jobGraph = JobGraphTestUtils.batchJobGraph(this.v1, this.v2, this.v3, this.v4);
    }

    @Test
    public void testUpdatePartitionConsumers() throws Exception {
        SimpleAckingTaskManagerGateway taskManagerGateway = new SimpleAckingTaskManagerGateway();
        ScheduledExecutorService mainThreadExecutorService = Executors.newSingleThreadScheduledExecutor();
        ComponentMainThreadExecutor singleThreadMainThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(mainThreadExecutorService);
        DefaultScheduler scheduler = new DefaultSchedulerBuilder(this.jobGraph, singleThreadMainThreadExecutor, (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor()).setExecutionSlotAllocatorFactory(new TestExecutionSlotAllocatorFactory(taskManagerGateway)).build();
        ExecutionVertex ev1 = scheduler.getExecutionVertex(new ExecutionVertexID(this.v1.getID(), 0));
        ExecutionVertex ev2 = scheduler.getExecutionVertex(new ExecutionVertexID(this.v2.getID(), 0));
        ExecutionVertex ev3 = scheduler.getExecutionVertex(new ExecutionVertexID(this.v3.getID(), 0));
        ExecutionVertex ev4 = scheduler.getExecutionVertex(new ExecutionVertexID(this.v4.getID(), 0));
        CompletableFuture ev4TddFuture = new CompletableFuture();
        taskManagerGateway.setSubmitConsumer(tdd -> {
            if (tdd.getExecutionAttemptId().equals((Object)ev4.getCurrentExecutionAttempt().getAttemptId())) {
                ev4TddFuture.complete(tdd);
            }
        });
        CompletableFuture.runAsync(() -> ((SchedulerBase)scheduler).startScheduling(), (Executor)singleThreadMainThreadExecutor).join();
        ExecutionUtils.waitForTaskDeploymentDescriptorsCreation(ev1, ev2, ev3, ev4);
        Assert.assertThat((Object)ev1.getExecutionState(), (Matcher)CoreMatchers.is((Object)ExecutionState.DEPLOYING));
        Assert.assertThat((Object)ev2.getExecutionState(), (Matcher)CoreMatchers.is((Object)ExecutionState.DEPLOYING));
        Assert.assertThat((Object)ev3.getExecutionState(), (Matcher)CoreMatchers.is((Object)ExecutionState.DEPLOYING));
        Assert.assertThat((Object)ev4.getExecutionState(), (Matcher)CoreMatchers.is((Object)ExecutionState.DEPLOYING));
        mainThreadExecutorService.execute(() -> this.lambda$testUpdatePartitionConsumers$1((SchedulerBase)scheduler, ev1, ev2, ev3, ev4));
        InputGateDeploymentDescriptor ev4Igdd2 = (InputGateDeploymentDescriptor)((TaskDeploymentDescriptor)ev4TddFuture.get(5000L, TimeUnit.MILLISECONDS)).getInputGates().get(1);
        Assert.assertThat((Object)ev4Igdd2.getShuffleDescriptors()[0], (Matcher)CoreMatchers.instanceOf(UnknownShuffleDescriptor.class));
        CompletableFuture updatePartitionFuture = new CompletableFuture();
        taskManagerGateway.setUpdatePartitionsConsumer((TriConsumer<ExecutionAttemptID, Iterable<PartitionInfo>, Duration>)((TriConsumer)(attemptId, partitionInfos, time) -> {
            Assert.assertThat((Object)attemptId, (Matcher)CoreMatchers.equalTo((Object)ev4.getCurrentExecutionAttempt().getAttemptId()));
            List partitionInfoList = IterableUtils.toStream((Iterable)partitionInfos).collect(Collectors.toList());
            Assert.assertThat(partitionInfoList, (Matcher)Matchers.hasSize((int)1));
            PartitionInfo partitionInfo = (PartitionInfo)partitionInfoList.get(0);
            Assert.assertThat((Object)partitionInfo.getIntermediateDataSetID(), (Matcher)CoreMatchers.equalTo((Object)((IntermediateDataSet)this.v3.getProducedDataSets().get(0)).getId()));
            Assert.assertThat((Object)partitionInfo.getShuffleDescriptor(), (Matcher)CoreMatchers.instanceOf(NettyShuffleDescriptor.class));
            updatePartitionFuture.complete(null);
        }));
        mainThreadExecutorService.execute(() -> this.lambda$testUpdatePartitionConsumers$3((SchedulerBase)scheduler, ev1, ev3));
        updatePartitionFuture.get(5000L, TimeUnit.MILLISECONDS);
    }

    private void updateState(SchedulerBase scheduler, ExecutionVertex vertex, ExecutionState state) {
        scheduler.updateTaskExecutionState(new TaskExecutionState(vertex.getCurrentExecutionAttempt().getAttemptId(), state));
    }

    private /* synthetic */ void lambda$testUpdatePartitionConsumers$3(SchedulerBase scheduler, ExecutionVertex ev1, ExecutionVertex ev3) {
        this.updateState(scheduler, ev1, ExecutionState.FINISHED);
        this.updateState(scheduler, ev3, ExecutionState.FINISHED);
    }

    private /* synthetic */ void lambda$testUpdatePartitionConsumers$1(SchedulerBase scheduler, ExecutionVertex ev1, ExecutionVertex ev2, ExecutionVertex ev3, ExecutionVertex ev4) {
        this.updateState(scheduler, ev1, ExecutionState.INITIALIZING);
        this.updateState(scheduler, ev1, ExecutionState.RUNNING);
        this.updateState(scheduler, ev2, ExecutionState.INITIALIZING);
        this.updateState(scheduler, ev2, ExecutionState.RUNNING);
        this.updateState(scheduler, ev3, ExecutionState.INITIALIZING);
        this.updateState(scheduler, ev3, ExecutionState.RUNNING);
        this.updateState(scheduler, ev4, ExecutionState.INITIALIZING);
        this.updateState(scheduler, ev4, ExecutionState.RUNNING);
    }
}

