/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.taskexecutor;

import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.nio.file.Path;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.stream.StreamSupport;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.BatchExecutionOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.testutils.BlockerSync;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.blob.NoOpTaskExecutorBlobService;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.entrypoint.ClusterInformation;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
import org.apache.flink.runtime.io.network.partition.PartitionTestUtils;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.TaskExecutorPartitionInfo;
import org.apache.flink.runtime.io.network.partition.TaskExecutorPartitionTracker;
import org.apache.flink.runtime.io.network.partition.TaskExecutorPartitionTrackerImpl;
import org.apache.flink.runtime.io.network.partition.TestingTaskExecutorPartitionTracker;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess;
import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway;
import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.security.token.DelegationTokenReceiverRepository;
import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
import org.apache.flink.runtime.taskexecutor.DefaultJobTable;
import org.apache.flink.runtime.taskexecutor.JobTable;
import org.apache.flink.runtime.taskexecutor.SlotReport;
import org.apache.flink.runtime.taskexecutor.SlotStatus;
import org.apache.flink.runtime.taskexecutor.TaskExecutor;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
import org.apache.flink.runtime.taskexecutor.TaskExecutorResourceSpec;
import org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils;
import org.apache.flink.runtime.taskexecutor.TaskExecutorSubmissionTest;
import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration;
import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
import org.apache.flink.runtime.taskexecutor.TaskManagerServicesBuilder;
import org.apache.flink.runtime.taskexecutor.TaskSubmissionTestEnvironment;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutor;
import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
import org.apache.flink.runtime.taskexecutor.slot.TaskSlotUtils;
import org.apache.flink.runtime.taskmanager.NoOpTaskManagerActions;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.testutils.TestFileUtils;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.testutils.junit.utils.TempDirUtils;
import org.apache.flink.util.Reference;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.concurrent.Executors;
import org.apache.flink.util.function.TriConsumer;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.api.io.TempDir;

class TaskExecutorPartitionLifecycleTest {
    private static final Duration timeout = Duration.ofSeconds(10L);
    private static TestingRpcService rpc;
    private final TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices();
    private final SettableLeaderRetrievalService jobManagerLeaderRetriever = new SettableLeaderRetrievalService();
    private final SettableLeaderRetrievalService resourceManagerLeaderRetriever = new SettableLeaderRetrievalService();
    private final JobID jobId = new JobID();
    private Configuration configuration;
    private TestingHeartbeatServices heartbeatServices;
    private ResourceID jmResourceId;
    private Duration duration = Duration.ofSeconds(15L);
    private Duration longDuration = Duration.ofSeconds(30L);
    private CompletableFuture<Void> disconnectTaskManagerFuture;
    private final String jobMasterAddress = "jm";
    @TempDir
    private Path tempDir;
    @RegisterExtension
    private static final TestExecutorExtension<ScheduledExecutorService> TEST_EXECUTOR_SERVICE_RESOURCE;

    TaskExecutorPartitionLifecycleTest() {
    }

    @BeforeEach
    void setup() {
        this.haServices.setResourceManagerLeaderRetriever(this.resourceManagerLeaderRetriever);
        this.haServices.setJobMasterLeaderRetriever(this.jobId, this.jobManagerLeaderRetriever);
        this.configuration = new Configuration();
        this.heartbeatServices = new TestingHeartbeatServices(50000L, 50000L);
        this.jmResourceId = ResourceID.generate();
        this.disconnectTaskManagerFuture = new CompletableFuture();
    }

    @AfterEach
    void shutdown() {
        rpc.clearGateways();
    }

    @BeforeAll
    static void setupClass() {
        rpc = new TestingRpcService();
    }

    @AfterAll
    static void shutdownClass() throws ExecutionException, InterruptedException {
        rpc.closeAsync().get();
    }

    @Test
    void testJobMasterConnectionTerminationAfterExternalRelease() throws Exception {
        this.testJobMasterConnectionTerminationAfterExternalReleaseOrPromotion((TriConsumer<TaskExecutorGateway, JobID, ResultPartitionID>)((TriConsumer)(taskExecutorGateway, jobID, resultPartitionID) -> taskExecutorGateway.releasePartitions(jobID, Collections.singleton(resultPartitionID))));
    }

    @Test
    void testJobMasterConnectionTerminationAfterExternalPromotion() throws Exception {
        this.testJobMasterConnectionTerminationAfterExternalReleaseOrPromotion((TriConsumer<TaskExecutorGateway, JobID, ResultPartitionID>)((TriConsumer)(taskExecutorGateway, jobID, resultPartitionID) -> taskExecutorGateway.promotePartitions(jobID, Collections.singleton(resultPartitionID))));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testJobMasterConnectionTerminationAfterExternalReleaseOrPromotion(TriConsumer<TaskExecutorGateway, JobID, ResultPartitionID> releaseOrPromoteCall) throws Exception {
        CompletableFuture disconnectFuture = new CompletableFuture();
        TestingJobMasterGateway jobMasterGateway = new TestingJobMasterGatewayBuilder().setDisconnectTaskManagerFunction(resourceID -> {
            disconnectFuture.complete(null);
            return CompletableFuture.completedFuture(Acknowledge.get());
        }).build();
        DefaultJobTable jobTable = DefaultJobTable.create();
        TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder().setJobTable((JobTable)jobTable).setShuffleEnvironment((ShuffleEnvironment<?, ?>)new NettyShuffleEnvironmentBuilder().build()).setTaskSlotTable(TaskExecutorPartitionLifecycleTest.createTaskSlotTable()).build();
        TestingTaskExecutorPartitionTracker partitionTracker = new TestingTaskExecutorPartitionTracker();
        AtomicBoolean trackerIsTrackingPartitions = new AtomicBoolean(false);
        partitionTracker.setIsTrackingPartitionsForFunction(jobId -> trackerIsTrackingPartitions.get());
        CompletableFuture firstReleasePartitionsCallFuture = new CompletableFuture();
        partitionTracker.setStopTrackingAndReleasePartitionsConsumer(firstReleasePartitionsCallFuture::complete);
        ResultPartitionDeploymentDescriptor resultPartitionDeploymentDescriptor = PartitionTestUtils.createPartitionDeploymentDescriptor(ResultPartitionType.BLOCKING);
        ResultPartitionID resultPartitionId = resultPartitionDeploymentDescriptor.getShuffleDescriptor().getResultPartitionID();
        TestingTaskExecutor taskExecutor = this.createTestingTaskExecutor(taskManagerServices, partitionTracker);
        try {
            taskExecutor.start();
            taskExecutor.waitUntilStarted();
            TaskSubmissionTestEnvironment.registerJobMasterConnection((JobTable)jobTable, this.jobId, jobMasterGateway, new NoOpTaskManagerActions(), taskExecutor.getMainThreadExecutableForTesting());
            TaskExecutorGateway taskExecutorGateway = (TaskExecutorGateway)taskExecutor.getSelfGateway(TaskExecutorGateway.class);
            trackerIsTrackingPartitions.set(true);
            Assertions.assertThat(firstReleasePartitionsCallFuture).isNotDone();
            taskExecutorGateway.releasePartitions(this.jobId, Collections.singleton(new ResultPartitionID()));
            firstReleasePartitionsCallFuture.get();
            Assertions.assertThat(disconnectFuture).isNotDone();
            trackerIsTrackingPartitions.set(false);
            releaseOrPromoteCall.accept((Object)taskExecutorGateway, (Object)this.jobId, (Object)resultPartitionId);
            disconnectFuture.get();
        }
        catch (Throwable throwable) {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint[])new RpcEndpoint[]{taskExecutor});
            throw throwable;
        }
        RpcUtils.terminateRpcEndpoint((RpcEndpoint[])new RpcEndpoint[]{taskExecutor});
    }

    @Test
    void testPartitionReleaseAfterJobMasterDisconnect() throws Exception {
        CompletableFuture releasePartitionsForJobFuture = new CompletableFuture();
        this.testPartitionRelease(partitionTracker -> partitionTracker.setStopTrackingAndReleaseAllPartitionsConsumer(releasePartitionsForJobFuture::complete), (jobId, resultPartitionDeploymentDescriptor, taskExecutor, taskExecutorGateway) -> {
            taskExecutorGateway.disconnectJobManager(jobId, new Exception("test"));
            FlinkAssertions.assertThatFuture((CompletableFuture)releasePartitionsForJobFuture).eventuallySucceeds().isEqualTo((Object)jobId);
        });
    }

    @Test
    void testPartitionReleaseAfterReleaseCall() throws Exception {
        CompletableFuture releasePartitionsFuture = new CompletableFuture();
        this.testPartitionRelease(partitionTracker -> partitionTracker.setStopTrackingAndReleasePartitionsConsumer(releasePartitionsFuture::complete), (jobId, resultPartitionDeploymentDescriptor, taskExecutor, taskExecutorGateway) -> {
            ResultPartitionID resultPartitionId = resultPartitionDeploymentDescriptor.getShuffleDescriptor().getResultPartitionID();
            taskExecutorGateway.releasePartitions(jobId, Collections.singleton(resultPartitionId));
            Assertions.assertThat((Collection)((Collection)releasePartitionsFuture.get())).contains((Object[])new ResultPartitionID[]{resultPartitionId});
            Assertions.assertThat((Collection)((Collection)releasePartitionsFuture.get())).contains((Object[])new ResultPartitionID[]{resultPartitionId});
        });
    }

    @Test
    void testPartitionPromotion() throws Exception {
        CompletableFuture promotePartitionsFuture = new CompletableFuture();
        this.testPartitionRelease(partitionTracker -> partitionTracker.setPromotePartitionsConsumer(promotePartitionsFuture::complete), (jobId, resultPartitionDeploymentDescriptor, taskExecutor, taskExecutorGateway) -> {
            ResultPartitionID resultPartitionId = resultPartitionDeploymentDescriptor.getShuffleDescriptor().getResultPartitionID();
            taskExecutorGateway.promotePartitions(jobId, Collections.singleton(resultPartitionId));
            Assertions.assertThat((Collection)((Collection)promotePartitionsFuture.get())).contains((Object[])new ResultPartitionID[]{resultPartitionId});
        });
    }

    @Test
    void testClusterPartitionRelease() throws Exception {
        CompletableFuture releasePartitionsFuture = new CompletableFuture();
        this.testPartitionRelease(partitionTracker -> partitionTracker.setReleaseClusterPartitionsConsumer(releasePartitionsFuture::complete), (jobId, resultPartitionDeploymentDescriptor, taskExecutor, taskExecutorGateway) -> {
            IntermediateDataSetID dataSetId = resultPartitionDeploymentDescriptor.getResultId();
            taskExecutorGateway.releaseClusterPartitions(Collections.singleton(dataSetId), timeout);
            Assertions.assertThat((Collection)((Collection)releasePartitionsFuture.get())).contains((Object[])new IntermediateDataSetID[]{dataSetId});
        });
    }

    @Test
    void testEnableBatchJobRecoveryAndNotRetainPartitions() throws Exception {
        this.testJMCrashedAndPossibleRetainPartitions(false, true, releasePartitionsForJobFuture -> FlinkAssertions.assertThatFuture((CompletableFuture)releasePartitionsForJobFuture).willNotCompleteWithin(this.duration).eventuallySucceeds().isEqualTo((Object)this.jobId));
    }

    @Test
    void testEnableBatchJobRecoveryAndRetainPartitions() throws Exception {
        this.testJMCrashedAndPossibleRetainPartitions(true, true, releasePartitionsForJobFuture -> FlinkAssertions.assertThatFuture((CompletableFuture)releasePartitionsForJobFuture).willNotCompleteWithin(this.longDuration));
    }

    @Test
    void testDisableBatchJobRecoveryAndReleasePartitions() throws Exception {
        Consumer<CompletableFuture<JobID>> verifyAction = releasePartitionsForJobFuture -> FlinkAssertions.assertThatFuture((CompletableFuture)releasePartitionsForJobFuture).succeedsWithin(this.duration).isEqualTo((Object)this.jobId);
        this.testJMCrashedAndPossibleRetainPartitions(true, false, verifyAction);
    }

    private void testJMCrashedAndPossibleRetainPartitions(boolean enableRetainPartitions, boolean enableBatchJobRecovery, Consumer<CompletableFuture<JobID>> verifyAction) throws Exception {
        this.configuration.set(BatchExecutionOptions.JOB_RECOVERY_ENABLED, (Object)enableBatchJobRecovery);
        this.configuration.set(TaskManagerOptions.REGISTRATION_TIMEOUT, (Object)this.duration);
        this.configuration.set(TaskManagerOptions.SLOT_TIMEOUT, (Object)Duration.ofMinutes(5L));
        CompletableFuture releasePartitionsForJobFuture = new CompletableFuture();
        this.testPartitionRelease(partitionTracker -> partitionTracker.setStopTrackingAndReleaseAllPartitionsConsumer(releasePartitionsForJobFuture::complete), (jobId, resultPartitionDeploymentDescriptor, taskExecutor, taskExecutorGateway) -> {
            this.jobManagerLeaderRetriever.notifyListener(null, UUID.randomUUID());
            this.disconnectTaskManagerFuture.get();
            this.jobManagerLeaderRetriever.notifyListener("jm", UUID.randomUUID());
            if (enableRetainPartitions) {
                taskExecutor.getAndRetainPartitionWithMetrics(jobId);
            }
            verifyAction.accept(releasePartitionsForJobFuture);
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testBlockingLocalPartitionReleaseDoesNotBlockTaskExecutor() throws Exception {
        final BlockerSync sync = new BlockerSync();
        ResultPartitionManager blockingResultPartitionManager = new ResultPartitionManager(){

            public void releasePartition(ResultPartitionID partitionId, Throwable cause) {
                sync.blockNonInterruptible();
                super.releasePartition(partitionId, cause);
            }
        };
        NettyShuffleEnvironment shuffleEnvironment = new NettyShuffleEnvironmentBuilder().setResultPartitionManager(blockingResultPartitionManager).setIoExecutor(TEST_EXECUTOR_SERVICE_RESOURCE.getExecutor()).build();
        final CompletableFuture<ResultPartitionID> startTrackingFuture = new CompletableFuture<ResultPartitionID>();
        TaskExecutorPartitionTrackerImpl partitionTracker = new TaskExecutorPartitionTrackerImpl((ShuffleEnvironment)shuffleEnvironment){

            public void startTrackingPartition(JobID producingJobId, TaskExecutorPartitionInfo partitionInfo) {
                super.startTrackingPartition(producingJobId, partitionInfo);
                startTrackingFuture.complete(partitionInfo.getResultPartitionId());
            }
        };
        try {
            this.internalTestPartitionRelease((TaskExecutorPartitionTracker)partitionTracker, (ShuffleEnvironment<?, ?>)shuffleEnvironment, startTrackingFuture, (jobId, resultPartitionDeploymentDescriptor, taskExecutor, taskExecutorGateway) -> {
                IntermediateDataSetID dataSetId = resultPartitionDeploymentDescriptor.getResultId();
                taskExecutorGateway.releaseClusterPartitions(Collections.singleton(dataSetId), timeout);
                taskExecutorGateway.canBeReleased().get(5L, TimeUnit.SECONDS);
            });
        }
        finally {
            sync.releaseBlocker();
        }
    }

    private void testPartitionRelease(PartitionTrackerSetup partitionTrackerSetup, TestAction testAction) throws Exception {
        TestingTaskExecutorPartitionTracker partitionTracker = new TestingTaskExecutorPartitionTracker();
        CompletableFuture<ResultPartitionID> startTrackingFuture = new CompletableFuture<ResultPartitionID>();
        partitionTracker.setStartTrackingPartitionsConsumer((jobId, partitionInfo) -> startTrackingFuture.complete(partitionInfo.getResultPartitionId()));
        partitionTrackerSetup.accept(partitionTracker);
        this.internalTestPartitionRelease(partitionTracker, (ShuffleEnvironment<?, ?>)new NettyShuffleEnvironmentBuilder().build(), startTrackingFuture, testAction);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void internalTestPartitionRelease(TaskExecutorPartitionTracker partitionTracker, ShuffleEnvironment<?, ?> shuffleEnvironment, CompletableFuture<ResultPartitionID> startTrackingFuture, TestAction testAction) throws Exception {
        ResultPartitionDeploymentDescriptor taskResultPartitionDescriptor = PartitionTestUtils.createPartitionDeploymentDescriptor(ResultPartitionType.BLOCKING);
        ExecutionAttemptID eid1 = taskResultPartitionDescriptor.getShuffleDescriptor().getResultPartitionID().getProducerId();
        TaskDeploymentDescriptor taskDeploymentDescriptor = TaskExecutorSubmissionTest.createTaskDeploymentDescriptor(this.jobId, "job", eid1, (SerializedValue<ExecutionConfig>)new SerializedValue((Object)new ExecutionConfig()), "Sender", 1, 1, new Configuration(), new Configuration(), TestingInvokable.class.getName(), Collections.singletonList(taskResultPartitionDescriptor), Collections.emptyList(), Collections.emptyList(), Collections.emptyList());
        TaskSlotTable<Task> taskSlotTable = TaskExecutorPartitionLifecycleTest.createTaskSlotTable();
        TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager(false, false, Reference.owned((Object)new File[]{TempDirUtils.newFolder((Path)this.tempDir)}), Executors.directExecutor());
        TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder().setTaskSlotTable(taskSlotTable).setTaskStateManager(localStateStoresManager).setShuffleEnvironment(shuffleEnvironment).build();
        CompletableFuture taskFinishedFuture = new CompletableFuture();
        OneShotLatch slotOfferedLatch = new OneShotLatch();
        TestingJobMasterGateway jobMasterGateway = new TestingJobMasterGatewayBuilder().setRegisterTaskManagerFunction((ignoredJobId, ignoredTaskManagerRegistrationInformation) -> CompletableFuture.completedFuture(new JMTMRegistrationSuccess(this.jmResourceId))).setOfferSlotsFunction((resourceID, slotOffers) -> {
            slotOfferedLatch.trigger();
            return CompletableFuture.completedFuture(slotOffers);
        }).setUpdateTaskExecutionStateFunction(taskExecutionState -> {
            if (taskExecutionState.getExecutionState() == ExecutionState.FINISHED) {
                taskFinishedFuture.complete(null);
            }
            return CompletableFuture.completedFuture(Acknowledge.get());
        }).setDisconnectTaskManagerFunction(ignored -> {
            this.disconnectTaskManagerFuture.complete(null);
            return CompletableFuture.completedFuture(Acknowledge.get());
        }).build();
        TestingTaskExecutor taskExecutor = this.createTestingTaskExecutor(taskManagerServices, partitionTracker);
        CompletableFuture initialSlotReportFuture = new CompletableFuture();
        TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
        testingResourceManagerGateway.setSendSlotReportFunction(resourceIDInstanceIDSlotReportTuple3 -> {
            initialSlotReportFuture.complete((SlotReport)resourceIDInstanceIDSlotReportTuple3.f2);
            return CompletableFuture.completedFuture(Acknowledge.get());
        });
        testingResourceManagerGateway.setRegisterTaskExecutorFunction(input -> CompletableFuture.completedFuture(new TaskExecutorRegistrationSuccess(new InstanceID(), testingResourceManagerGateway.getOwnResourceId(), new ClusterInformation("blobServerHost", 55555), null)));
        try {
            taskExecutor.start();
            taskExecutor.waitUntilStarted();
            TaskExecutorGateway taskExecutorGateway = (TaskExecutorGateway)taskExecutor.getSelfGateway(TaskExecutorGateway.class);
            rpc.registerGateway("jm", (RpcGateway)jobMasterGateway);
            rpc.registerGateway(testingResourceManagerGateway.getAddress(), (RpcGateway)testingResourceManagerGateway);
            taskManagerServices.getJobLeaderService().addJob(this.jobId, "jm");
            this.jobManagerLeaderRetriever.notifyListener("jm", UUID.randomUUID());
            this.resourceManagerLeaderRetriever.notifyListener(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway.getFencingToken().toUUID());
            Optional slotStatusOptional = StreamSupport.stream(((SlotReport)initialSlotReportFuture.get()).spliterator(), false).findAny();
            Assertions.assertThat(slotStatusOptional).isPresent();
            SlotStatus slotStatus = (SlotStatus)slotStatusOptional.get();
            while (true) {
                try {
                    taskExecutorGateway.requestSlot(slotStatus.getSlotID(), this.jobId, taskDeploymentDescriptor.getAllocationId(), ResourceProfile.ZERO, "jm", testingResourceManagerGateway.getFencingToken(), timeout).get();
                }
                catch (Exception e) {
                    Thread.sleep(50L);
                    continue;
                }
                break;
            }
            TestingInvokable.sync = new BlockerSync();
            slotOfferedLatch.await();
            taskExecutorGateway.submitTask(taskDeploymentDescriptor, jobMasterGateway.getFencingToken(), timeout).get();
            TestingInvokable.sync.awaitBlocker();
            FlinkAssertions.assertThatFuture(startTrackingFuture).eventuallySucceeds().isEqualTo((Object)taskResultPartitionDescriptor.getShuffleDescriptor().getResultPartitionID());
            TestingInvokable.sync.releaseBlocker();
            taskFinishedFuture.get(timeout.toMillis(), TimeUnit.MILLISECONDS);
            testAction.accept(this.jobId, taskResultPartitionDescriptor, taskExecutor, taskExecutorGateway);
        }
        catch (Throwable throwable) {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint[])new RpcEndpoint[]{taskExecutor});
            throw throwable;
        }
        RpcUtils.terminateRpcEndpoint((RpcEndpoint[])new RpcEndpoint[]{taskExecutor});
        Assertions.assertThat((Collection)shuffleEnvironment.getPartitionsOccupyingLocalResources()).isEmpty();
    }

    private TestingTaskExecutor createTestingTaskExecutor(TaskManagerServices taskManagerServices, TaskExecutorPartitionTracker partitionTracker) throws IOException {
        return new TestingTaskExecutor(rpc, TaskManagerConfiguration.fromConfiguration((Configuration)this.configuration, (TaskExecutorResourceSpec)TaskExecutorResourceUtils.resourceSpecFromConfigForLocalExecution((Configuration)this.configuration), (String)InetAddress.getLoopbackAddress().getHostAddress(), (File)TestFileUtils.createTempDir()), this.haServices, taskManagerServices, ExternalResourceInfoProvider.NO_EXTERNAL_RESOURCES, this.heartbeatServices, UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), null, NoOpTaskExecutorBlobService.INSTANCE, new TestingFatalErrorHandler(), partitionTracker, new DelegationTokenReceiverRepository(this.configuration, null));
    }

    private static TaskSlotTable<Task> createTaskSlotTable() {
        return TaskSlotUtils.createTaskSlotTable(1, timeout, (ScheduledExecutorService)TEST_EXECUTOR_SERVICE_RESOURCE.getExecutor());
    }

    static {
        TEST_EXECUTOR_SERVICE_RESOURCE = TestingUtils.defaultExecutorExtension();
    }

    @FunctionalInterface
    private static interface TestAction {
        public void accept(JobID var1, ResultPartitionDeploymentDescriptor var2, TaskExecutor var3, TaskExecutorGateway var4) throws Exception;
    }

    @FunctionalInterface
    private static interface PartitionTrackerSetup {
        public void accept(TestingTaskExecutorPartitionTracker var1) throws Exception;
    }

    public static class TestingInvokable
    extends AbstractInvokable {
        static BlockerSync sync;

        public TestingInvokable(Environment environment) {
            super(environment);
        }

        public void invoke() throws Exception {
            sync.block();
        }
    }
}

