package org.apache.flink.runtime.resourcemanager.slotmanager;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.java.tuple.Tuple6;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
import org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManagerTestBase;
import org.apache.flink.runtime.slots.ResourceRequirement;
import org.apache.flink.runtime.slots.ResourceRequirements;
import org.apache.flink.runtime.taskexecutor.SlotReport;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
import org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException;
import org.apache.flink.runtime.testutils.SystemExitTrackingSecurityManager;
import org.apache.flink.util.function.FunctionUtils;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/resourcemanager/slotmanager/AbstractFineGrainedSlotManagerITCase.class */
public abstract class AbstractFineGrainedSlotManagerITCase extends FineGrainedSlotManagerTestBase {

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/resourcemanager/slotmanager/AbstractFineGrainedSlotManagerITCase$RequirementDeclarationScenario.class */
    public enum RequirementDeclarationScenario {
        TASK_EXECUTOR_REGISTRATION_BEFORE_REQUIREMENT_DECLARATION,
        TASK_EXECUTOR_REGISTRATION_AFTER_REQUIREMENT_DECLARATION
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/resourcemanager/slotmanager/AbstractFineGrainedSlotManagerITCase$SecondRequirementDeclarationTime.class */
    public enum SecondRequirementDeclarationTime {
        BEFORE_FREE,
        AFTER_FREE
    }

    @Test
    public void testRequirementDeclarationWithoutFreeSlotsTriggersWorkerAllocation() throws Exception {
        final ResourceRequirements createResourceRequirementsForSingleSlot = createResourceRequirementsForSingleSlot();
        final CompletableFuture completableFuture = new CompletableFuture();
        new FineGrainedSlotManagerTestBase.Context() { // from class: org.apache.flink.runtime.resourcemanager.slotmanager.AbstractFineGrainedSlotManagerITCase.1
            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super();
                TestingResourceActionsBuilder testingResourceActionsBuilder = this.resourceActionsBuilder;
                CompletableFuture completableFuture2 = completableFuture;
                completableFuture2.getClass();
                testingResourceActionsBuilder.setAllocateResourceConsumer((v1) -> {
                    r1.complete(v1);
                });
                ResourceRequirements resourceRequirements = createResourceRequirementsForSingleSlot;
                CompletableFuture completableFuture3 = completableFuture;
                runTest(() -> {
                    runInMainThread(() -> {
                        getSlotManager().processResourceRequirements(resourceRequirements);
                    });
                    FineGrainedSlotManagerTestBase.assertFutureCompleteAndReturn(completableFuture3);
                });
            }
        };
    }

    @Test
    public void testRequirementDeclarationWithFreeResource() throws Exception {
        testRequirementDeclaration(RequirementDeclarationScenario.TASK_EXECUTOR_REGISTRATION_BEFORE_REQUIREMENT_DECLARATION);
    }

    @Test
    public void testRequirementDeclarationWithPendingResource() throws Exception {
        testRequirementDeclaration(RequirementDeclarationScenario.TASK_EXECUTOR_REGISTRATION_AFTER_REQUIREMENT_DECLARATION);
    }

    private void testRequirementDeclaration(final RequirementDeclarationScenario requirementDeclarationScenario) throws Exception {
        ResourceID generate = ResourceID.generate();
        final JobID jobID = new JobID();
        final SlotID dynamicSlotID = SlotID.getDynamicSlotID(generate);
        final ResourceRequirements create = ResourceRequirements.create(jobID, "localhost", Collections.singleton(ResourceRequirement.create(DEFAULT_SLOT_RESOURCE_PROFILE, 1)));
        final CompletableFuture completableFuture = new CompletableFuture();
        final TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(generate, new TestingTaskExecutorGatewayBuilder().setRequestSlotFunction(tuple6 -> {
            completableFuture.complete(tuple6);
            return CompletableFuture.completedFuture(Acknowledge.get());
        }).createTestingTaskExecutorGateway());
        new FineGrainedSlotManagerTestBase.Context() { // from class: org.apache.flink.runtime.resourcemanager.slotmanager.AbstractFineGrainedSlotManagerITCase.2
            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super();
                RequirementDeclarationScenario requirementDeclarationScenario2 = requirementDeclarationScenario;
                TaskExecutorConnection taskExecutorConnection2 = taskExecutorConnection;
                ResourceRequirements resourceRequirements = create;
                CompletableFuture completableFuture2 = completableFuture;
                SlotID slotID = dynamicSlotID;
                JobID jobID2 = jobID;
                runTest(() -> {
                    if (requirementDeclarationScenario2 == RequirementDeclarationScenario.TASK_EXECUTOR_REGISTRATION_BEFORE_REQUIREMENT_DECLARATION) {
                        runInMainThread(() -> {
                            getSlotManager().registerTaskManager(taskExecutorConnection2, new SlotReport(), FineGrainedSlotManagerTestBase.DEFAULT_TOTAL_RESOURCE_PROFILE, FineGrainedSlotManagerTestBase.DEFAULT_SLOT_RESOURCE_PROFILE);
                        });
                    }
                    runInMainThread(() -> {
                        getSlotManager().processResourceRequirements(resourceRequirements);
                    });
                    if (requirementDeclarationScenario2 == RequirementDeclarationScenario.TASK_EXECUTOR_REGISTRATION_AFTER_REQUIREMENT_DECLARATION) {
                        runInMainThread(() -> {
                            getSlotManager().registerTaskManager(taskExecutorConnection2, new SlotReport(), FineGrainedSlotManagerTestBase.DEFAULT_TOTAL_RESOURCE_PROFILE, FineGrainedSlotManagerTestBase.DEFAULT_SLOT_RESOURCE_PROFILE);
                        });
                    }
                    Assert.assertThat(FineGrainedSlotManagerTestBase.assertFutureCompleteAndReturn(completableFuture2), Matchers.is(Matchers.equalTo(Tuple6.of(slotID, jobID2, ((Tuple6) FineGrainedSlotManagerTestBase.assertFutureCompleteAndReturn(completableFuture2)).f2, FineGrainedSlotManagerTestBase.DEFAULT_SLOT_RESOURCE_PROFILE, "localhost", getResourceManagerId()))));
                    Assert.assertEquals("The slot has not been allocated to the expected allocation id.", ((Tuple6) FineGrainedSlotManagerTestBase.assertFutureCompleteAndReturn(completableFuture2)).f2, ((TaskManagerSlotInformation) getTaskManagerTracker().getAllocatedOrPendingSlot((AllocationID) ((Tuple6) FineGrainedSlotManagerTestBase.assertFutureCompleteAndReturn(completableFuture2)).f2).get()).getAllocationId());
                });
            }
        };
    }

    @Test
    public void testDuplicateResourceRequirementDeclarationAfterSuccessfulAllocation() throws Exception {
        final ArrayList arrayList = new ArrayList();
        arrayList.add(new CompletableFuture());
        arrayList.add(new CompletableFuture());
        final ResourceRequirements createResourceRequirementsForSingleSlot = createResourceRequirementsForSingleSlot();
        new FineGrainedSlotManagerTestBase.Context() { // from class: org.apache.flink.runtime.resourcemanager.slotmanager.AbstractFineGrainedSlotManagerITCase.3
            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super();
                TestingResourceActionsBuilder testingResourceActionsBuilder = this.resourceActionsBuilder;
                List list = arrayList;
                testingResourceActionsBuilder.setAllocateResourceConsumer(workerResourceSpec -> {
                    if (((CompletableFuture) list.get(0)).isDone()) {
                        ((CompletableFuture) list.get(1)).complete(null);
                    } else {
                        ((CompletableFuture) list.get(0)).complete(null);
                    }
                });
                ResourceRequirements resourceRequirements = createResourceRequirementsForSingleSlot;
                List list2 = arrayList;
                runTest(() -> {
                    runInMainThread(() -> {
                        getSlotManager().processResourceRequirements(resourceRequirements);
                    });
                    FineGrainedSlotManagerTestBase.assertFutureCompleteAndReturn((CompletableFuture) list2.get(0));
                    runInMainThread(() -> {
                        getSlotManager().processResourceRequirements(resourceRequirements);
                    });
                    FineGrainedSlotManagerTestBase.assertFutureNotComplete((CompletableFuture) list2.get(1));
                });
            }
        };
    }

    @Test
    public void testResourceCanBeAllocatedForDifferentJobWithDeclarationBeforeSlotFree() throws Exception {
        testResourceCanBeAllocatedForDifferentJobAfterFree(SecondRequirementDeclarationTime.BEFORE_FREE);
    }

    @Test
    public void testResourceCanBeAllocatedForDifferentJobWithDeclarationAfterSlotFree() throws Exception {
        testResourceCanBeAllocatedForDifferentJobAfterFree(SecondRequirementDeclarationTime.AFTER_FREE);
    }

    private void testResourceCanBeAllocatedForDifferentJobAfterFree(final SecondRequirementDeclarationTime secondRequirementDeclarationTime) throws Exception {
        final CompletableFuture completableFuture = new CompletableFuture();
        final CompletableFuture completableFuture2 = new CompletableFuture();
        final ResourceRequirements createResourceRequirementsForSingleSlot = createResourceRequirementsForSingleSlot();
        final ResourceRequirements createResourceRequirementsForSingleSlot2 = createResourceRequirementsForSingleSlot();
        TestingTaskExecutorGateway createTestingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setRequestSlotFunction(tuple6 -> {
            if (completableFuture.isDone()) {
                completableFuture2.complete(tuple6.f2);
            } else {
                completableFuture.complete(tuple6.f2);
            }
            return CompletableFuture.completedFuture(Acknowledge.get());
        }).createTestingTaskExecutorGateway();
        final ResourceID generate = ResourceID.generate();
        final TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(generate, createTestingTaskExecutorGateway);
        final SlotReport slotReport = new SlotReport();
        new FineGrainedSlotManagerTestBase.Context() { // from class: org.apache.flink.runtime.resourcemanager.slotmanager.AbstractFineGrainedSlotManagerITCase.4
            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super();
                TaskExecutorConnection taskExecutorConnection2 = taskExecutorConnection;
                SlotReport slotReport2 = slotReport;
                ResourceRequirements resourceRequirements = createResourceRequirementsForSingleSlot;
                CompletableFuture completableFuture3 = completableFuture;
                SecondRequirementDeclarationTime secondRequirementDeclarationTime2 = secondRequirementDeclarationTime;
                ResourceRequirements resourceRequirements2 = createResourceRequirementsForSingleSlot2;
                ResourceID resourceID = generate;
                CompletableFuture completableFuture4 = completableFuture2;
                runTest(() -> {
                    runInMainThread(() -> {
                        getSlotManager().registerTaskManager(taskExecutorConnection2, slotReport2, FineGrainedSlotManagerTestBase.DEFAULT_TOTAL_RESOURCE_PROFILE, FineGrainedSlotManagerTestBase.DEFAULT_SLOT_RESOURCE_PROFILE);
                        getSlotManager().processResourceRequirements(resourceRequirements);
                    });
                    AllocationID allocationID = (AllocationID) FineGrainedSlotManagerTestBase.assertFutureCompleteAndReturn(completableFuture3);
                    Assert.assertEquals("The slot has not been allocated to the expected job id.", resourceRequirements.getJobId(), ((TaskManagerSlotInformation) getTaskManagerTracker().getAllocatedOrPendingSlot(allocationID).get()).getJobId());
                    if (secondRequirementDeclarationTime2 == SecondRequirementDeclarationTime.BEFORE_FREE) {
                        runInMainThread(() -> {
                            getSlotManager().processResourceRequirements(resourceRequirements2);
                        });
                    }
                    runInMainThread(() -> {
                        getSlotManager().processResourceRequirements(ResourceRequirements.create(resourceRequirements.getJobId(), resourceRequirements.getTargetAddress(), Collections.emptyList()));
                        getSlotManager().freeSlot(SlotID.getDynamicSlotID(resourceID), allocationID);
                    });
                    if (secondRequirementDeclarationTime2 == SecondRequirementDeclarationTime.AFTER_FREE) {
                        runInMainThread(() -> {
                            getSlotManager().processResourceRequirements(resourceRequirements2);
                        });
                    }
                    Assert.assertEquals("The slot has not been allocated to the expected job id.", resourceRequirements2.getJobId(), ((TaskManagerSlotInformation) getTaskManagerTracker().getAllocatedOrPendingSlot((AllocationID) FineGrainedSlotManagerTestBase.assertFutureCompleteAndReturn(completableFuture4)).get()).getJobId());
                });
            }
        };
    }

    @Test
    public void testRequestNewResources() throws Exception {
        final JobID jobID = new JobID();
        final ArrayList arrayList = new ArrayList();
        arrayList.add(new CompletableFuture());
        arrayList.add(new CompletableFuture());
        new FineGrainedSlotManagerTestBase.Context() { // from class: org.apache.flink.runtime.resourcemanager.slotmanager.AbstractFineGrainedSlotManagerITCase.5
            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super();
                TestingResourceActionsBuilder testingResourceActionsBuilder = this.resourceActionsBuilder;
                List list = arrayList;
                testingResourceActionsBuilder.setAllocateResourceConsumer(workerResourceSpec -> {
                    if (((CompletableFuture) list.get(0)).isDone()) {
                        ((CompletableFuture) list.get(1)).complete(null);
                    } else {
                        ((CompletableFuture) list.get(0)).complete(null);
                    }
                });
                JobID jobID2 = jobID;
                List list2 = arrayList;
                runTest(() -> {
                    runInMainThread(() -> {
                        getSlotManager().processResourceRequirements(FineGrainedSlotManagerTestBase.createResourceRequirements(jobID2, 2));
                    });
                    FineGrainedSlotManagerTestBase.assertFutureCompleteAndReturn((CompletableFuture) list2.get(0));
                    FineGrainedSlotManagerTestBase.assertFutureNotComplete((CompletableFuture) list2.get(1));
                    runInMainThread(() -> {
                        getSlotManager().processResourceRequirements(FineGrainedSlotManagerTestBase.createResourceRequirements(jobID2, 3));
                    });
                    FineGrainedSlotManagerTestBase.assertFutureCompleteAndReturn((CompletableFuture) list2.get(1));
                });
            }
        };
    }

    @Test
    public void testSlotRequestFailure() throws Exception {
        final JobID jobID = new JobID();
        final ResourceRequirements createResourceRequirementsForSingleSlot = createResourceRequirementsForSingleSlot(jobID);
        final CompletableFuture completableFuture = new CompletableFuture();
        Iterator it = Arrays.asList(completableFuture, CompletableFuture.completedFuture(Acknowledge.get())).iterator();
        final ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(2);
        final TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(ResourceID.generate(), new TestingTaskExecutorGatewayBuilder().setRequestSlotFunction(FunctionUtils.uncheckedFunction(tuple6 -> {
            arrayBlockingQueue.put(tuple6.f2);
            return (CompletableFuture) it.next();
        })).createTestingTaskExecutorGateway());
        final SlotReport slotReport = new SlotReport();
        new FineGrainedSlotManagerTestBase.Context() { // from class: org.apache.flink.runtime.resourcemanager.slotmanager.AbstractFineGrainedSlotManagerITCase.6
            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super();
                TaskExecutorConnection taskExecutorConnection2 = taskExecutorConnection;
                SlotReport slotReport2 = slotReport;
                ResourceRequirements resourceRequirements = createResourceRequirementsForSingleSlot;
                ArrayBlockingQueue arrayBlockingQueue2 = arrayBlockingQueue;
                CompletableFuture completableFuture2 = completableFuture;
                JobID jobID2 = jobID;
                runTest(() -> {
                    runInMainThread(() -> {
                        getSlotManager().registerTaskManager(taskExecutorConnection2, slotReport2, FineGrainedSlotManagerTestBase.DEFAULT_TOTAL_RESOURCE_PROFILE, FineGrainedSlotManagerTestBase.DEFAULT_SLOT_RESOURCE_PROFILE);
                        getSlotManager().processResourceRequirements(resourceRequirements);
                    });
                    AllocationID allocationID = (AllocationID) arrayBlockingQueue2.take();
                    Assert.assertThat(arrayBlockingQueue2, Matchers.is(Matchers.empty()));
                    runInMainThread(() -> {
                        completableFuture2.completeExceptionally(new SlotAllocationException("Test exception."));
                    });
                    AllocationID allocationID2 = (AllocationID) arrayBlockingQueue2.take();
                    Assert.assertThat(arrayBlockingQueue2, Matchers.is(Matchers.empty()));
                    Assert.assertEquals(jobID2, ((TaskManagerSlotInformation) getTaskManagerTracker().getAllocatedOrPendingSlot(allocationID2).get()).getJobId());
                    Assert.assertFalse(getTaskManagerTracker().getAllocatedOrPendingSlot(allocationID).isPresent());
                });
            }
        };
    }

    @Test
    public void testAllocationUpdatesIgnoredIfTaskExecutorUnregistered() throws Exception {
        final CompletableFuture completableFuture = new CompletableFuture();
        final CompletableFuture completableFuture2 = new CompletableFuture();
        TestingTaskExecutorGateway createTestingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setRequestSlotFunction(tuple6 -> {
            completableFuture2.complete(null);
            return completableFuture;
        }).createTestingTaskExecutorGateway();
        final SystemExitTrackingSecurityManager systemExitTrackingSecurityManager = new SystemExitTrackingSecurityManager();
        System.setSecurityManager(systemExitTrackingSecurityManager);
        final JobID jobID = new JobID();
        final TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(ResourceID.generate(), createTestingTaskExecutorGateway);
        final SlotReport slotReport = new SlotReport();
        new FineGrainedSlotManagerTestBase.Context() { // from class: org.apache.flink.runtime.resourcemanager.slotmanager.AbstractFineGrainedSlotManagerITCase.7
            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super();
                JobID jobID2 = jobID;
                TaskExecutorConnection taskExecutorConnection2 = taskExecutorConnection;
                SlotReport slotReport2 = slotReport;
                CompletableFuture completableFuture3 = completableFuture2;
                CompletableFuture completableFuture4 = completableFuture;
                SystemExitTrackingSecurityManager systemExitTrackingSecurityManager2 = systemExitTrackingSecurityManager;
                runTest(() -> {
                    runInMainThread(() -> {
                        getSlotManager().processResourceRequirements(FineGrainedSlotManagerTestBase.createResourceRequirements(jobID2, 1));
                        getSlotManager().registerTaskManager(taskExecutorConnection2, slotReport2, FineGrainedSlotManagerTestBase.DEFAULT_TOTAL_RESOURCE_PROFILE, FineGrainedSlotManagerTestBase.DEFAULT_SLOT_RESOURCE_PROFILE);
                    });
                    FineGrainedSlotManagerTestBase.assertFutureCompleteAndReturn(completableFuture3);
                    runInMainThread(() -> {
                        getSlotManager().unregisterTaskManager(taskExecutorConnection2.getInstanceID(), FineGrainedSlotManagerTestBase.TEST_EXCEPTION);
                        completableFuture4.complete(Acknowledge.get());
                    });
                    Assert.assertThat(Boolean.valueOf(systemExitTrackingSecurityManager2.getSystemExitFuture().isDone()), Matchers.is(false));
                });
            }
        };
        System.setSecurityManager(null);
    }

    @Test
    public void testAllocationUpdatesIgnoredIfSlotMarkedAsAllocatedAfterSlotReport() throws Exception {
        final CompletableFuture completableFuture = new CompletableFuture();
        TestingTaskExecutorGateway createTestingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setRequestSlotFunction(tuple6 -> {
            completableFuture.complete(tuple6.f2);
            return CompletableFuture.completedFuture(Acknowledge.get());
        }).createTestingTaskExecutorGateway();
        final SystemExitTrackingSecurityManager systemExitTrackingSecurityManager = new SystemExitTrackingSecurityManager();
        System.setSecurityManager(systemExitTrackingSecurityManager);
        final TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(ResourceID.generate(), createTestingTaskExecutorGateway);
        final SlotReport slotReport = new SlotReport();
        new FineGrainedSlotManagerTestBase.Context() { // from class: org.apache.flink.runtime.resourcemanager.slotmanager.AbstractFineGrainedSlotManagerITCase.8
            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super();
                TaskExecutorConnection taskExecutorConnection2 = taskExecutorConnection;
                SlotReport slotReport2 = slotReport;
                CompletableFuture completableFuture2 = completableFuture;
                SystemExitTrackingSecurityManager systemExitTrackingSecurityManager2 = systemExitTrackingSecurityManager;
                runTest(() -> {
                    runInMainThread(() -> {
                        getSlotManager().processResourceRequirements(FineGrainedSlotManagerTestBase.createResourceRequirements(new JobID(), 1));
                        getSlotManager().registerTaskManager(taskExecutorConnection2, slotReport2, FineGrainedSlotManagerTestBase.DEFAULT_TOTAL_RESOURCE_PROFILE, FineGrainedSlotManagerTestBase.DEFAULT_SLOT_RESOURCE_PROFILE);
                    });
                    AllocationID allocationID = (AllocationID) FineGrainedSlotManagerTestBase.assertFutureCompleteAndReturn(completableFuture2);
                    runInMainThread(() -> {
                        getSlotManager().reportSlotStatus(taskExecutorConnection2.getInstanceID(), new SlotReport(FineGrainedSlotManagerTestBase.createAllocatedSlotStatus(allocationID, FineGrainedSlotManagerTestBase.DEFAULT_SLOT_RESOURCE_PROFILE)));
                    });
                    Assert.assertThat(Boolean.valueOf(systemExitTrackingSecurityManager2.getSystemExitFuture().isDone()), Matchers.is(false));
                });
            }
        };
        System.setSecurityManager(null);
    }
}
