/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler.adaptive;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Collectors;
import org.apache.flink.core.execution.CheckpointType;
import org.apache.flink.runtime.OperatorIDPair;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.failover.TestRestartBackoffTimeStrategy;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPool;
import org.apache.flink.runtime.scheduler.SchedulerNG;
import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
import org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler;
import org.apache.flink.runtime.scheduler.adaptive.AdaptiveSchedulerBuilder;
import org.apache.flink.runtime.scheduler.adaptive.AdaptiveSchedulerFactory;
import org.apache.flink.runtime.scheduler.adaptive.AdaptiveSchedulerTestBase;
import org.apache.flink.runtime.scheduler.adaptive.StateWithExecutionGraph;
import org.apache.flink.runtime.scheduler.adaptive.TestingStateTransitionManager;
import org.apache.flink.runtime.scheduler.adaptive.WaitingForResources;
import org.apache.flink.runtime.scheduler.adaptive.allocator.JobAllocationsInformation;
import org.apache.flink.runtime.scheduler.adaptive.allocator.SlotAllocator;
import org.apache.flink.runtime.scheduler.adaptive.allocator.TestingSlotAllocator;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ObjectAssert;
import org.junit.jupiter.api.Test;

public class LocalRecoveryTest
extends AdaptiveSchedulerTestBase {
    @Test
    void testStateSizeIsConsideredForLocalRecoveryOnRestart() throws Exception {
        JobGraph jobGraph = LocalRecoveryTest.createJobGraphWithCheckpointing(JOB_VERTEX);
        DeclarativeSlotPool slotPool = LocalRecoveryTest.getSlotPoolWithFreeSlots(4);
        ArrayList<JobAllocationsInformation> capturedAllocations = new ArrayList<JobAllocationsInformation>();
        boolean localRecoveryEnabled = true;
        String executionTarget = "local";
        boolean minimalTaskManagerPreferred = false;
        TestingSlotAllocator slotAllocator = TestingSlotAllocator.getArgumentCapturingDelegatingSlotAllocator((SlotAllocator)AdaptiveSchedulerFactory.createSlotSharingSlotAllocator((DeclarativeSlotPool)slotPool, (boolean)true, (String)"local", (boolean)false), capturedAllocations);
        this.scheduler = new AdaptiveSchedulerBuilder(jobGraph, this.singleThreadMainThreadExecutor, (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor()).setDeclarativeSlotPool(slotPool).setSlotAllocator(slotAllocator).setStateTransitionManagerFactory(LocalRecoveryTest.createAutoAdvanceStateTransitionManagerFactory()).setRestartBackoffTimeStrategy(new TestRestartBackoffTimeStrategy(true, 0L)).build();
        this.startTestInstanceInMainThread();
        SchedulerTestingUtils.waitForJobStatusRunning((SchedulerNG)this.scheduler);
        this.runInMainThread(() -> SchedulerTestingUtils.setAllExecutionsToRunning((SchedulerNG)this.scheduler));
        CompletableFuture completedCheckpointFuture = this.supplyInMainThread(() -> this.scheduler.triggerCheckpoint(CheckpointType.FULL));
        SchedulerTestingUtils.waitForCheckpointInProgress((SchedulerNG)this.scheduler);
        Map<OperatorID, OperatorSubtaskState> operatorStates = LocalRecoveryTest.generateFakeKeyedManagedStateForAllOperators(jobGraph);
        this.runInMainThread(() -> SchedulerTestingUtils.acknowledgePendingCheckpoint((SchedulerNG)this.scheduler, 1, operatorStates));
        CompletedCheckpoint completedCheckpoint = (CompletedCheckpoint)completedCheckpointFuture.join();
        SchedulerTestingUtils.waitForCompletedCheckpoint((SchedulerNG)this.scheduler);
        ((ObjectAssert)Assertions.assertThat((Object)completedCheckpoint).withFailMessage("Checkpoint shouldn't be null", new Object[0])).isNotNull();
        List executionAttemptIds = this.supplyInMainThread(this::getExecutionAttemptIDS);
        Assertions.assertThat((List)executionAttemptIds).hasSize(4);
        this.runInMainThread(() -> this.scheduler.updateTaskExecutionState(new TaskExecutionState((ExecutionAttemptID)executionAttemptIds.get(0), ExecutionState.FAILED, (Throwable)new Exception("Test exception for local recovery"))));
        this.runInMainThread(() -> {
            for (int idx = 1; idx < executionAttemptIds.size(); ++idx) {
                this.scheduler.updateTaskExecutionState(new TaskExecutionState((ExecutionAttemptID)executionAttemptIds.get(idx), ExecutionState.CANCELED));
            }
        });
        SchedulerTestingUtils.waitForJobStatusRunning((SchedulerNG)this.scheduler);
        Assertions.assertThat(capturedAllocations).hasSize(2);
        org.junit.jupiter.api.Assertions.assertTrue((boolean)((JobAllocationsInformation)capturedAllocations.get(0)).isEmpty());
        Assertions.assertThat((long)((JobAllocationsInformation.VertexAllocationInformation)((JobAllocationsInformation)capturedAllocations.get((int)1)).getAllocations((JobVertexID)LocalRecoveryTest.JOB_VERTEX.getID()).get((int)0)).stateSizeInBytes).isGreaterThan(0L);
    }

    private List<ExecutionAttemptID> getExecutionAttemptIDS() {
        Optional<ExecutionGraph> maybeExecutionGraph = this.scheduler.getState().as(StateWithExecutionGraph.class).map(StateWithExecutionGraph::getExecutionGraph);
        Assertions.assertThat(maybeExecutionGraph).isNotEmpty();
        ExecutionVertex[] taskVertices = Objects.requireNonNull(maybeExecutionGraph.get().getJobVertex(JOB_VERTEX.getID())).getTaskVertices();
        return Arrays.stream(taskVertices).map(ExecutionVertex::getCurrentExecutionAttempt).map(Execution::getAttemptId).collect(Collectors.toList());
    }

    private static AdaptiveScheduler.StateTransitionManagerFactory createAutoAdvanceStateTransitionManagerFactory() {
        return (context, ignoredClock, ignoredCooldown, ignoredResourceStabilizationTimeout, ignoredMaxTriggerDelay) -> TestingStateTransitionManager.withOnTriggerEventOnly(() -> {
            if (context instanceof WaitingForResources) {
                context.transitionToSubsequentState();
            }
        });
    }

    private static Map<OperatorID, OperatorSubtaskState> generateFakeKeyedManagedStateForAllOperators(JobGraph jobGraph) throws IOException {
        HashMap<OperatorID, OperatorSubtaskState> operatorStates = new HashMap<OperatorID, OperatorSubtaskState>();
        for (JobVertex jobVertex : jobGraph.getVertices()) {
            KeyGroupsStateHandle keyedStateHandle = CheckpointCoordinatorTestingUtils.generateKeyGroupState(jobVertex.getID(), KeyGroupRange.of((int)0, (int)(jobGraph.getMaximumParallelism() - 1)), false);
            for (OperatorIDPair operatorId : jobVertex.getOperatorIDs()) {
                operatorStates.put(operatorId.getGeneratedOperatorID(), OperatorSubtaskState.builder().setManagedKeyedState((KeyedStateHandle)keyedStateHandle).build());
            }
        }
        return operatorStates;
    }
}

