package org.apache.flink.runtime.jobmaster;

import java.io.File;
import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.heartbeat.HeartbeatServicesImpl;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.io.network.partition.AbstractPartitionTrackerTest;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.TestingJobMasterPartitionTracker;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobmaster.utils.JobMasterBuilder;
import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.LocalUnresolvedTaskManagerLocation;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.testutils.TestingUtils;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/flink/runtime/jobmaster/JobMasterPartitionReleaseTest.class */
public class JobMasterPartitionReleaseTest {

    @TempDir
    private static File temporaryFolder;
    private static final Duration testingTimeout = Duration.ofSeconds(10);
    private static TestingRpcService rpcService;
    private TestingFatalErrorHandler testingFatalErrorHandler;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/jobmaster/JobMasterPartitionReleaseTest$TestSetup.class */
    public static class TestSetup implements AutoCloseable {
        private final LocalUnresolvedTaskManagerLocation localTaskManagerUnresolvedLocation = new LocalUnresolvedTaskManagerLocation();
        private final CompletableFuture<ResourceID> taskExecutorIdForStopTracking = new CompletableFuture<>();
        private final CompletableFuture<Collection<ResultPartitionID>> partitionsForRelease = new CompletableFuture<>();
        private final CompletableFuture<Collection<ResultPartitionID>> clusterPartitionsForPromote = new CompletableFuture<>();
        private final JobMaster jobMaster;
        private final TestingJobMasterPartitionTracker partitionTracker;

        public TestSetup(TestingRpcService testingRpcService, FatalErrorHandler fatalErrorHandler, TaskExecutorGateway taskExecutorGateway) throws Exception {
            TestingHighAvailabilityServices testingHighAvailabilityServices = new TestingHighAvailabilityServices();
            testingHighAvailabilityServices.setCheckpointRecoveryFactory(new StandaloneCheckpointRecoveryFactory());
            testingHighAvailabilityServices.setResourceManagerLeaderRetriever(new SettableLeaderRetrievalService(null, null));
            this.partitionTracker = new TestingJobMasterPartitionTracker();
            TestingJobMasterPartitionTracker testingJobMasterPartitionTracker = this.partitionTracker;
            CompletableFuture<ResourceID> completableFuture = this.taskExecutorIdForStopTracking;
            Objects.requireNonNull(completableFuture);
            testingJobMasterPartitionTracker.setStopTrackingAllPartitionsConsumer((v1) -> {
                r1.complete(v1);
            });
            TestingJobMasterPartitionTracker testingJobMasterPartitionTracker2 = this.partitionTracker;
            CompletableFuture<Collection<ResultPartitionID>> completableFuture2 = this.partitionsForRelease;
            Objects.requireNonNull(completableFuture2);
            testingJobMasterPartitionTracker2.setStopTrackingAndReleasePartitionsConsumer((v1) -> {
                r1.complete(v1);
            });
            TestingJobMasterPartitionTracker testingJobMasterPartitionTracker3 = this.partitionTracker;
            CompletableFuture<Collection<ResultPartitionID>> completableFuture3 = this.clusterPartitionsForPromote;
            Objects.requireNonNull(completableFuture3);
            testingJobMasterPartitionTracker3.setStopTrackingAndPromotePartitionsConsumer((v1) -> {
                r1.complete(v1);
            });
            Configuration configuration = new Configuration();
            configuration.set(BlobServerOptions.STORAGE_DIRECTORY, JobMasterPartitionReleaseTest.temporaryFolder.getAbsolutePath());
            HeartbeatServices heartbeatServicesImpl = new HeartbeatServicesImpl(1000L, 5000000L);
            JobGraph singleNoOpJobGraph = JobGraphTestUtils.singleNoOpJobGraph();
            this.jobMaster = new JobMasterBuilder(singleNoOpJobGraph, testingRpcService).withConfiguration(configuration).withHighAvailabilityServices(testingHighAvailabilityServices).withFatalErrorHandler(fatalErrorHandler).withHeartbeatServices(heartbeatServicesImpl).withPartitionTrackerFactory(taskExecutorGatewayLookup -> {
                return this.partitionTracker;
            }).createJobMaster();
            this.jobMaster.start();
            registerTaskExecutorAtJobMaster(testingRpcService, getJobMasterGateway(), singleNoOpJobGraph.getJobID(), taskExecutorGateway);
        }

        private void registerTaskExecutorAtJobMaster(TestingRpcService testingRpcService, JobMasterGateway jobMasterGateway, JobID jobID, TaskExecutorGateway taskExecutorGateway) throws ExecutionException, InterruptedException {
            testingRpcService.registerGateway(taskExecutorGateway.getAddress(), taskExecutorGateway);
            jobMasterGateway.registerTaskManager(jobID, TaskManagerRegistrationInformation.create(taskExecutorGateway.getAddress(), this.localTaskManagerUnresolvedLocation, TestingUtils.zeroUUID()), JobMasterPartitionReleaseTest.testingTimeout).get();
            jobMasterGateway.offerSlots(this.localTaskManagerUnresolvedLocation.getResourceID(), Collections.singleton(new SlotOffer(new AllocationID(), 0, ResourceProfile.ANY)), JobMasterPartitionReleaseTest.testingTimeout).get();
        }

        public TestingJobMasterPartitionTracker getPartitionTracker() {
            return this.partitionTracker;
        }

        public JobMasterGateway getJobMasterGateway() {
            return this.jobMaster.getSelfGateway(JobMasterGateway.class);
        }

        public ResourceID getTaskExecutorResourceID() {
            return this.localTaskManagerUnresolvedLocation.getResourceID();
        }

        public CompletableFuture<ResourceID> getStopTrackingPartitionsTargetResourceId() {
            return this.taskExecutorIdForStopTracking;
        }

        public CompletableFuture<Collection<ResultPartitionID>> getPartitionsForRelease() {
            return this.partitionsForRelease;
        }

        public CompletableFuture<Collection<ResultPartitionID>> getPartitionsForReleaseOrPromote() {
            return this.partitionsForRelease.thenCombine((CompletionStage) this.clusterPartitionsForPromote, (collection, collection2) -> {
                HashSet hashSet = new HashSet();
                hashSet.addAll(collection);
                hashSet.addAll(collection2);
                return hashSet;
            });
        }

        @Override // java.lang.AutoCloseable
        public void close() throws Exception {
            if (this.jobMaster != null) {
                RpcUtils.terminateRpcEndpoint(new RpcEndpoint[]{this.jobMaster});
            }
        }
    }

    JobMasterPartitionReleaseTest() {
    }

    @BeforeAll
    private static void setupClass() {
        rpcService = new TestingRpcService();
    }

    @BeforeEach
    private void setup() throws IOException {
        this.testingFatalErrorHandler = new TestingFatalErrorHandler();
    }

    @AfterEach
    private void teardown() throws Exception {
        if (this.testingFatalErrorHandler != null) {
            this.testingFatalErrorHandler.rethrowError();
        }
        rpcService.clearGateways();
    }

    @AfterAll
    private static void teardownClass() {
        if (rpcService != null) {
            rpcService.closeAsync();
            rpcService = null;
        }
    }

    @Test
    void testPartitionTableCleanupOnDisconnect() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        TestSetup testSetup = new TestSetup(rpcService, this.testingFatalErrorHandler, new TestingTaskExecutorGatewayBuilder().setDisconnectJobManagerConsumer((jobID, th) -> {
            completableFuture.complete(jobID);
        }).createTestingTaskExecutorGateway());
        try {
            testSetup.jobMaster.getSelfGateway(JobMasterGateway.class).disconnectTaskManager(testSetup.getTaskExecutorResourceID(), new Exception("test"));
            completableFuture.get();
            FlinkAssertions.assertThatFuture(testSetup.getStopTrackingPartitionsTargetResourceId()).eventuallySucceeds().isEqualTo(testSetup.getTaskExecutorResourceID());
            testSetup.close();
        } catch (Throwable th2) {
            try {
                testSetup.close();
            } catch (Throwable th3) {
                th2.addSuppressed(th3);
            }
            throw th2;
        }
    }

    @Test
    void testPartitionReleaseOrPromotionOnJobSuccess() throws Exception {
        testPartitionReleaseOrPromotionOnJobTermination((v0) -> {
            return v0.getPartitionsForReleaseOrPromote();
        }, ExecutionState.FINISHED);
    }

    @Test
    void testPartitionReleaseOrPromotionOnJobFailure() throws Exception {
        testPartitionReleaseOrPromotionOnJobTermination((v0) -> {
            return v0.getPartitionsForRelease();
        }, ExecutionState.FAILED);
    }

    private void testPartitionReleaseOrPromotionOnJobTermination(Function<TestSetup, CompletableFuture<Collection<ResultPartitionID>>> function, ExecutionState executionState) throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        TestSetup testSetup = new TestSetup(rpcService, this.testingFatalErrorHandler, new TestingTaskExecutorGatewayBuilder().setSubmitTaskConsumer((taskDeploymentDescriptor, jobMasterId) -> {
            completableFuture.complete(taskDeploymentDescriptor);
            return CompletableFuture.completedFuture(Acknowledge.get());
        }).createTestingTaskExecutorGateway());
        try {
            ResultPartitionID resultPartitionID = new ResultPartitionID();
            ResultPartitionID resultPartitionID2 = new ResultPartitionID();
            testSetup.getPartitionTracker().setGetAllTrackedPartitionsSupplier(() -> {
                return Arrays.asList(AbstractPartitionTrackerTest.createResultPartitionDeploymentDescriptor(resultPartitionID, true), AbstractPartitionTrackerTest.createResultPartitionDeploymentDescriptor(resultPartitionID2, false));
            });
            testSetup.getJobMasterGateway().updateTaskExecutionState(new TaskExecutionState(((TaskDeploymentDescriptor) completableFuture.get()).getExecutionAttemptId(), executionState));
            Assertions.assertThat(function.apply(testSetup).get()).containsExactlyInAnyOrder(new ResultPartitionID[]{resultPartitionID, resultPartitionID2});
            testSetup.close();
        } catch (Throwable th) {
            try {
                testSetup.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }
}
