package org.apache.flink.runtime.jobmaster;

import java.io.IOException;
import java.util.Collections;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.io.network.partition.TestingJobMasterPartitionTracker;
import org.apache.flink.runtime.jobgraph.utils.JobGraphTestUtils;
import org.apache.flink.runtime.jobmaster.utils.JobMasterBuilder;
import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.LocalUnresolvedTaskManagerLocation;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:org/apache/flink/runtime/jobmaster/JobMasterPartitionReleaseTest.class */
public class JobMasterPartitionReleaseTest extends TestLogger {

    @ClassRule
    public static TemporaryFolder temporaryFolder = new TemporaryFolder();
    private static final Time testingTimeout = Time.seconds(10);
    private static TestingRpcService rpcService;
    private TestingFatalErrorHandler testingFatalErrorHandler;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/jobmaster/JobMasterPartitionReleaseTest$AllocationIdsResourceManagerGateway.class */
    public static final class AllocationIdsResourceManagerGateway extends TestingResourceManagerGateway {
        private final BlockingQueue<AllocationID> allocationIds;

        private AllocationIdsResourceManagerGateway() {
            this.allocationIds = new ArrayBlockingQueue(10);
            setRequestSlotConsumer(slotRequest -> {
                this.allocationIds.offer(slotRequest.getAllocationId());
            });
        }

        AllocationID takeAllocationId() {
            try {
                return this.allocationIds.take();
            } catch (InterruptedException e) {
                ExceptionUtils.rethrow(e);
                return null;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/jobmaster/JobMasterPartitionReleaseTest$TestSetup.class */
    public static class TestSetup implements AutoCloseable {
        private final TemporaryFolder temporaryFolder = new TemporaryFolder();
        private final LocalUnresolvedTaskManagerLocation localTaskManagerUnresolvedLocation = new LocalUnresolvedTaskManagerLocation();
        private final CompletableFuture<ResourceID> taskExecutorIdForStopTracking = new CompletableFuture<>();
        private final CompletableFuture<ResourceID> taskExecutorIdForPartitionRelease = new CompletableFuture<>();
        private final CompletableFuture<ResourceID> taskExecutorIdForPartitionReleaseOrPromote = new CompletableFuture<>();
        private JobMaster jobMaster;

        public TestSetup(TestingRpcService testingRpcService, FatalErrorHandler fatalErrorHandler, TaskExecutorGateway taskExecutorGateway) throws Exception {
            this.temporaryFolder.create();
            TestingHighAvailabilityServices testingHighAvailabilityServices = new TestingHighAvailabilityServices();
            testingHighAvailabilityServices.setCheckpointRecoveryFactory(new StandaloneCheckpointRecoveryFactory());
            SettableLeaderRetrievalService settableLeaderRetrievalService = new SettableLeaderRetrievalService(null, null);
            testingHighAvailabilityServices.setResourceManagerLeaderRetriever(settableLeaderRetrievalService);
            TestingJobMasterPartitionTracker testingJobMasterPartitionTracker = new TestingJobMasterPartitionTracker();
            CompletableFuture<ResourceID> completableFuture = this.taskExecutorIdForStopTracking;
            completableFuture.getClass();
            testingJobMasterPartitionTracker.setStopTrackingAllPartitionsConsumer((v1) -> {
                r1.complete(v1);
            });
            CompletableFuture<ResourceID> completableFuture2 = this.taskExecutorIdForPartitionRelease;
            completableFuture2.getClass();
            testingJobMasterPartitionTracker.setStopTrackingAndReleaseAllPartitionsConsumer((v1) -> {
                r1.complete(v1);
            });
            CompletableFuture<ResourceID> completableFuture3 = this.taskExecutorIdForPartitionReleaseOrPromote;
            completableFuture3.getClass();
            testingJobMasterPartitionTracker.setStopTrackingAndReleaseOrPromotePartitionsConsumer((v1) -> {
                r1.complete(v1);
            });
            Configuration configuration = new Configuration();
            configuration.setString(BlobServerOptions.STORAGE_DIRECTORY, this.temporaryFolder.newFolder().getAbsolutePath());
            this.jobMaster = new JobMasterBuilder(JobGraphTestUtils.createSingleVertexJobGraph(), testingRpcService).withConfiguration(configuration).withHighAvailabilityServices(testingHighAvailabilityServices).withJobManagerSharedServices(new TestingJobManagerSharedServicesBuilder().build()).withFatalErrorHandler(fatalErrorHandler).withHeartbeatServices(new HeartbeatServices(1000L, 5000000L)).withPartitionTrackerFactory(taskExecutorGatewayLookup -> {
                return testingJobMasterPartitionTracker;
            }).createJobMaster();
            this.jobMaster.start(JobMasterId.generate()).get();
            registerTaskExecutorAtJobMaster(testingRpcService, getJobMasterGateway(), taskExecutorGateway, settableLeaderRetrievalService);
        }

        private void registerTaskExecutorAtJobMaster(TestingRpcService testingRpcService, JobMasterGateway jobMasterGateway, TaskExecutorGateway taskExecutorGateway, SettableLeaderRetrievalService settableLeaderRetrievalService) throws ExecutionException, InterruptedException {
            AllocationIdsResourceManagerGateway allocationIdsResourceManagerGateway = new AllocationIdsResourceManagerGateway();
            testingRpcService.registerGateway(allocationIdsResourceManagerGateway.getAddress(), allocationIdsResourceManagerGateway);
            settableLeaderRetrievalService.notifyListener(allocationIdsResourceManagerGateway.getAddress(), allocationIdsResourceManagerGateway.m292getFencingToken().toUUID());
            testingRpcService.registerGateway(taskExecutorGateway.getAddress(), taskExecutorGateway);
            jobMasterGateway.registerTaskManager(taskExecutorGateway.getAddress(), this.localTaskManagerUnresolvedLocation, JobMasterPartitionReleaseTest.testingTimeout).get();
            jobMasterGateway.offerSlots(this.localTaskManagerUnresolvedLocation.getResourceID(), Collections.singleton(new SlotOffer(allocationIdsResourceManagerGateway.takeAllocationId(), 0, ResourceProfile.UNKNOWN)), JobMasterPartitionReleaseTest.testingTimeout).get();
        }

        public JobMasterGateway getJobMasterGateway() {
            return this.jobMaster.getSelfGateway(JobMasterGateway.class);
        }

        public ResourceID getTaskExecutorResourceID() {
            return this.localTaskManagerUnresolvedLocation.getResourceID();
        }

        public CompletableFuture<ResourceID> getStopTrackingPartitionsTargetResourceId() {
            return this.taskExecutorIdForStopTracking;
        }

        public CompletableFuture<ResourceID> getReleasePartitionsTargetResourceId() {
            return this.taskExecutorIdForPartitionRelease;
        }

        public CompletableFuture<ResourceID> getReleaseOrPromotePartitionsTargetResourceId() {
            return this.taskExecutorIdForPartitionReleaseOrPromote;
        }

        @Override // java.lang.AutoCloseable
        public void close() throws Exception {
            try {
                if (this.jobMaster != null) {
                    RpcUtils.terminateRpcEndpoint(this.jobMaster, JobMasterPartitionReleaseTest.testingTimeout);
                }
            } finally {
                this.temporaryFolder.delete();
            }
        }
    }

    @BeforeClass
    public static void setupClass() {
        rpcService = new TestingRpcService();
    }

    @Before
    public void setup() throws IOException {
        this.testingFatalErrorHandler = new TestingFatalErrorHandler();
    }

    @After
    public void teardown() throws Exception {
        if (this.testingFatalErrorHandler != null) {
            this.testingFatalErrorHandler.rethrowError();
        }
        rpcService.clearGateways();
    }

    @AfterClass
    public static void teardownClass() {
        if (rpcService != null) {
            rpcService.stopService();
            rpcService = null;
        }
    }

    @Test
    public void testPartitionTableCleanupOnDisconnect() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        TestSetup testSetup = new TestSetup(rpcService, this.testingFatalErrorHandler, new TestingTaskExecutorGatewayBuilder().setDisconnectJobManagerConsumer((jobID, th) -> {
            completableFuture.complete(jobID);
        }).createTestingTaskExecutorGateway());
        Throwable th2 = null;
        try {
            try {
                testSetup.jobMaster.getSelfGateway(JobMasterGateway.class).disconnectTaskManager(testSetup.getTaskExecutorResourceID(), new Exception("test"));
                completableFuture.get();
                MatcherAssert.assertThat(testSetup.getStopTrackingPartitionsTargetResourceId().get(), Matchers.equalTo(testSetup.getTaskExecutorResourceID()));
                if (testSetup != null) {
                    if (0 == 0) {
                        testSetup.close();
                        return;
                    }
                    try {
                        testSetup.close();
                    } catch (Throwable th3) {
                        th2.addSuppressed(th3);
                    }
                }
            } catch (Throwable th4) {
                th2 = th4;
                throw th4;
            }
        } catch (Throwable th5) {
            if (testSetup != null) {
                if (th2 != null) {
                    try {
                        testSetup.close();
                    } catch (Throwable th6) {
                        th2.addSuppressed(th6);
                    }
                } else {
                    testSetup.close();
                }
            }
            throw th5;
        }
    }

    @Test
    public void testPartitionReleaseOrPromotionOnJobSuccess() throws Exception {
        testPartitionReleaseOrPromotionOnJobTermination((v0) -> {
            return v0.getReleaseOrPromotePartitionsTargetResourceId();
        }, ExecutionState.FINISHED);
    }

    @Test
    public void testPartitionReleaseOrPromotionOnJobFailure() throws Exception {
        testPartitionReleaseOrPromotionOnJobTermination((v0) -> {
            return v0.getReleasePartitionsTargetResourceId();
        }, ExecutionState.FAILED);
    }

    private void testPartitionReleaseOrPromotionOnJobTermination(Function<TestSetup, CompletableFuture<ResourceID>> function, ExecutionState executionState) throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        TestSetup testSetup = new TestSetup(rpcService, this.testingFatalErrorHandler, new TestingTaskExecutorGatewayBuilder().setSubmitTaskConsumer((taskDeploymentDescriptor, jobMasterId) -> {
            completableFuture.complete(taskDeploymentDescriptor);
            return CompletableFuture.completedFuture(Acknowledge.get());
        }).createTestingTaskExecutorGateway());
        Throwable th = null;
        try {
            try {
                JobMasterGateway jobMasterGateway = testSetup.getJobMasterGateway();
                TaskDeploymentDescriptor taskDeploymentDescriptor2 = (TaskDeploymentDescriptor) completableFuture.get();
                jobMasterGateway.updateTaskExecutionState(new TaskExecutionState(taskDeploymentDescriptor2.getJobId(), taskDeploymentDescriptor2.getExecutionAttemptId(), executionState));
                MatcherAssert.assertThat(function.apply(testSetup).get(), Matchers.equalTo(testSetup.getTaskExecutorResourceID()));
                if (testSetup != null) {
                    if (0 == 0) {
                        testSetup.close();
                        return;
                    }
                    try {
                        testSetup.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (testSetup != null) {
                if (th != null) {
                    try {
                        testSetup.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    testSetup.close();
                }
            }
            throw th4;
        }
    }
}
