package org.apache.flink.runtime.jobmaster;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.attribute.FileAttribute;
import java.time.Duration;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.RestartStrategyOptions;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.core.io.InputSplitSource;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.blocklist.BlockedNode;
import org.apache.flink.runtime.blocklist.DefaultBlocklistHandler;
import org.apache.flink.runtime.checkpoint.CheckpointProperties;
import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats;
import org.apache.flink.runtime.checkpoint.PerJobCheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AccessExecution;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.executiongraph.ResultPartitionBytes;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.heartbeat.HeartbeatServicesImpl;
import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.TestingJobMasterPartitionTracker;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobResourceRequirements;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolFactory;
import org.apache.flink.runtime.jobmaster.slotpool.FreeSlotTracker;
import org.apache.flink.runtime.jobmaster.slotpool.FreeSlotTrackerTestUtils;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolService;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolServiceFactory;
import org.apache.flink.runtime.jobmaster.slotpool.TestingSlotPoolServiceBuilder;
import org.apache.flink.runtime.jobmaster.utils.JobMasterBuilder;
import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.registration.RegistrationResponse;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.rpc.exceptions.RecipientUnreachableException;
import org.apache.flink.runtime.scheduler.DefaultSchedulerFactory;
import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
import org.apache.flink.runtime.scheduler.TestingPhysicalSlot;
import org.apache.flink.runtime.scheduler.TestingSchedulerNG;
import org.apache.flink.runtime.scheduler.TestingSchedulerNGFactory;
import org.apache.flink.runtime.shuffle.DefaultPartitionWithMetrics;
import org.apache.flink.runtime.shuffle.DefaultShuffleMetrics;
import org.apache.flink.runtime.shuffle.NettyShuffleDescriptor;
import org.apache.flink.runtime.shuffle.PartitionWithMetrics;
import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.LocalUnresolvedTaskManagerLocation;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.taskmanager.UnresolvedTaskManagerLocation;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.util.JobVertexConnectionUtils;
import org.apache.flink.runtime.util.NettyShuffleDescriptorBuilder;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.streaming.util.RestartStrategyUtils;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.TestLoggerExtension;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.function.SupplierWithException;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;

/* JADX INFO: Access modifiers changed from: package-private */
@ExtendWith({TestLoggerExtension.class})
/* loaded from: input_file:org/apache/flink/runtime/jobmaster/JobMasterTest.class */
public class JobMasterTest {

    @TempDir
    private Path temporaryFolder;
    private static final long fastHeartbeatInterval = 1;
    private static final long heartbeatInterval = 1000;
    private static final long heartbeatTimeout = 5000000;
    private static TestingRpcService rpcService;
    private static HeartbeatServices fastHeartbeatServices;
    private static HeartbeatServices heartbeatServices;
    private Configuration configuration;
    private ResourceID jmResourceId;
    private JobMasterId jobMasterId;
    private TestingHighAvailabilityServices haServices;
    private SettableLeaderRetrievalService rmLeaderRetrievalService;
    private TestingFatalErrorHandler testingFatalErrorHandler;
    private static final TestingInputSplit[] EMPTY_TESTING_INPUT_SPLITS = new TestingInputSplit[0];
    private static final long fastHeartbeatTimeout = 10;
    private static final Duration testingTimeout = Duration.ofSeconds(fastHeartbeatTimeout);
    private static final JobGraph jobGraph = JobGraphTestUtils.singleNoOpJobGraph();

    /* loaded from: input_file:org/apache/flink/runtime/jobmaster/JobMasterTest$DummyCheckpointStorageLocation.class */
    private static final class DummyCheckpointStorageLocation implements CompletedCheckpointStorageLocation {
        private static final long serialVersionUID = 164095949572620688L;

        private DummyCheckpointStorageLocation() {
        }

        public String getExternalPointer() {
            return null;
        }

        public StreamStateHandle getMetadataHandle() {
            return null;
        }

        public void disposeStorageLocation() throws IOException {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/jobmaster/JobMasterTest$TestingInputSplit.class */
    public static final class TestingInputSplit implements InputSplit {
        private static final long serialVersionUID = -5404803705463116083L;
        private final int splitNumber;

        TestingInputSplit(int i) {
            this.splitNumber = i;
        }

        public int getSplitNumber() {
            return this.splitNumber;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            return obj != null && getClass() == obj.getClass() && this.splitNumber == ((TestingInputSplit) obj).splitNumber;
        }

        public int hashCode() {
            return Objects.hash(Integer.valueOf(this.splitNumber));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/jobmaster/JobMasterTest$TestingInputSplitSource.class */
    public static final class TestingInputSplitSource implements InputSplitSource<TestingInputSplit> {
        private static final long serialVersionUID = -2344684048759139086L;
        private final List<TestingInputSplit> inputSplits;

        private TestingInputSplitSource(List<TestingInputSplit> list) {
            this.inputSplits = list;
        }

        /* renamed from: createInputSplits, reason: merged with bridge method [inline-methods] */
        public TestingInputSplit[] m226createInputSplits(int i) {
            return (TestingInputSplit[]) this.inputSplits.toArray(JobMasterTest.EMPTY_TESTING_INPUT_SPLITS);
        }

        public InputSplitAssigner getInputSplitAssigner(TestingInputSplit[] testingInputSplitArr) {
            return new DefaultInputSplitAssigner(testingInputSplitArr);
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/jobmaster/JobMasterTest$TestingSlotPool.class */
    private static final class TestingSlotPool implements SlotPool, SlotPoolService {
        private final JobID jobId;
        private final OneShotLatch hasReceivedSlotOffers;
        private final Map<ResourceID, Collection<PhysicalSlot>> registeredSlots = new HashMap(16);

        private TestingSlotPool(JobID jobID, OneShotLatch oneShotLatch) {
            this.jobId = jobID;
            this.hasReceivedSlotOffers = oneShotLatch;
        }

        public void start(JobMasterId jobMasterId, String str) {
        }

        public void close() {
            clear();
        }

        private void clear() {
            this.registeredSlots.clear();
        }

        public void connectToResourceManager(ResourceManagerGateway resourceManagerGateway) {
            throw new UnsupportedOperationException("TestingSlotPool does not support this operation.");
        }

        public void disconnectResourceManager() {
            throw new UnsupportedOperationException("TestingSlotPool does not support this operation.");
        }

        public boolean registerTaskManager(ResourceID resourceID) {
            this.registeredSlots.computeIfAbsent(resourceID, resourceID2 -> {
                return new ArrayList(16);
            });
            return true;
        }

        public boolean releaseTaskManager(ResourceID resourceID, Exception exc) {
            this.registeredSlots.remove(resourceID);
            return true;
        }

        public void releaseFreeSlotsOnTaskManager(ResourceID resourceID, Exception exc) {
            throw new UnsupportedOperationException("TestingSlotPool does not support this operation.");
        }

        public Collection<SlotOffer> offerSlots(TaskManagerLocation taskManagerLocation, TaskManagerGateway taskManagerGateway, Collection<SlotOffer> collection) {
            this.hasReceivedSlotOffers.trigger();
            Collection collection2 = (Collection) Optional.ofNullable(this.registeredSlots.get(taskManagerLocation.getResourceID())).orElseThrow(() -> {
                return new FlinkRuntimeException("TaskManager not registered.");
            });
            int size = collection2.size();
            Iterator<SlotOffer> it = collection.iterator();
            while (it.hasNext()) {
                collection2.add(TestingPhysicalSlot.builder().withAllocationID(it.next().getAllocationId()).withTaskManagerLocation(taskManagerLocation).withTaskManagerGateway(taskManagerGateway).withPhysicalSlotNumber(size).build());
                size++;
            }
            return collection;
        }

        public Optional<ResourceID> failAllocation(@Nullable ResourceID resourceID, AllocationID allocationID, Exception exc) {
            throw new UnsupportedOperationException("TestingSlotPool does not support this operation.");
        }

        public FreeSlotTracker getFreeSlotTracker() {
            return FreeSlotTrackerTestUtils.createDefaultFreeSlotTracker((Map) this.registeredSlots.values().stream().flatMap((v0) -> {
                return v0.stream();
            }).collect(Collectors.toMap((v0) -> {
                return v0.getAllocationId();
            }, physicalSlot -> {
                return physicalSlot;
            })));
        }

        public Collection<SlotInfo> getAllocatedSlotsInformation() {
            return Collections.emptyList();
        }

        public Optional<PhysicalSlot> allocateAvailableSlot(@Nonnull SlotRequestId slotRequestId, @Nonnull AllocationID allocationID, @Nonnull ResourceProfile resourceProfile) {
            throw new UnsupportedOperationException("TestingSlotPool does not support this operation.");
        }

        @Nonnull
        public CompletableFuture<PhysicalSlot> requestNewAllocatedSlot(@Nonnull SlotRequestId slotRequestId, @Nonnull ResourceProfile resourceProfile, @Nonnull Collection<AllocationID> collection, @Nullable Duration duration) {
            return new CompletableFuture<>();
        }

        @Nonnull
        public CompletableFuture<PhysicalSlot> requestNewAllocatedBatchSlot(@Nonnull SlotRequestId slotRequestId, @Nonnull ResourceProfile resourceProfile, @Nonnull Collection<AllocationID> collection) {
            return new CompletableFuture<>();
        }

        public void disableBatchSlotRequestTimeoutCheck() {
        }

        public AllocatedSlotReport createAllocatedSlotReport(ResourceID resourceID) {
            return new AllocatedSlotReport(this.jobId, (List) this.registeredSlots.getOrDefault(resourceID, Collections.emptyList()).stream().map(physicalSlot -> {
                return new AllocatedSlotInfo(physicalSlot.getPhysicalSlotNumber(), physicalSlot.getAllocationId());
            }).collect(Collectors.toList()));
        }

        public void setIsJobRestarting(boolean z) {
        }

        public void releaseSlot(@Nonnull SlotRequestId slotRequestId, @Nullable Throwable th) {
            throw new UnsupportedOperationException("TestingSlotPool does not support this operation.");
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/jobmaster/JobMasterTest$TestingSlotPoolFactory.class */
    private static final class TestingSlotPoolFactory implements SlotPoolServiceFactory {
        private final OneShotLatch hasReceivedSlotOffers;

        public TestingSlotPoolFactory(OneShotLatch oneShotLatch) {
            this.hasReceivedSlotOffers = oneShotLatch;
        }

        @Nonnull
        public SlotPoolService createSlotPoolService(@Nonnull JobID jobID, DeclarativeSlotPoolFactory declarativeSlotPoolFactory, @Nonnull ComponentMainThreadExecutor componentMainThreadExecutor) {
            return new TestingSlotPool(jobID, this.hasReceivedSlotOffers);
        }
    }

    JobMasterTest() {
    }

    @BeforeAll
    static void setupAll() {
        rpcService = new TestingRpcService();
        fastHeartbeatServices = new HeartbeatServicesImpl(fastHeartbeatInterval, fastHeartbeatTimeout, -1);
        heartbeatServices = new HeartbeatServicesImpl(heartbeatInterval, heartbeatTimeout, 1);
    }

    @BeforeEach
    void setup() throws IOException {
        this.configuration = new Configuration();
        this.haServices = new TestingHighAvailabilityServices();
        this.jobMasterId = JobMasterId.generate();
        this.jmResourceId = ResourceID.generate();
        this.testingFatalErrorHandler = new TestingFatalErrorHandler();
        this.haServices.setCheckpointRecoveryFactory(new StandaloneCheckpointRecoveryFactory());
        this.rmLeaderRetrievalService = new SettableLeaderRetrievalService(null, null);
        this.haServices.setResourceManagerLeaderRetriever(this.rmLeaderRetrievalService);
        this.configuration.set(BlobServerOptions.STORAGE_DIRECTORY, Files.createTempDirectory(this.temporaryFolder, UUID.randomUUID().toString(), new FileAttribute[0]).toString());
    }

    @AfterEach
    void teardown() throws Exception {
        if (this.testingFatalErrorHandler != null) {
            this.testingFatalErrorHandler.rethrowError();
        }
        rpcService.clearGateways();
    }

    @AfterAll
    static void teardownAll() {
        if (rpcService != null) {
            rpcService.closeAsync();
            rpcService = null;
        }
    }

    @Test
    void testTaskManagerRegistrationTriggersHeartbeating() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        LocalUnresolvedTaskManagerLocation localUnresolvedTaskManagerLocation = new LocalUnresolvedTaskManagerLocation();
        RpcGateway createTestingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setHeartbeatJobManagerFunction((resourceID, allocatedSlotReport) -> {
            completableFuture.complete(resourceID);
            return FutureUtils.completedVoidFuture();
        }).createTestingTaskExecutorGateway();
        rpcService.registerGateway(createTestingTaskExecutorGateway.getAddress(), createTestingTaskExecutorGateway);
        JobMaster createJobMaster = new JobMasterBuilder(jobGraph, rpcService).withResourceId(this.jmResourceId).withConfiguration(this.configuration).withHighAvailabilityServices(this.haServices).withHeartbeatServices(new HeartbeatServicesImpl(fastHeartbeatInterval, 10000L)).createJobMaster();
        try {
            createJobMaster.start();
            createJobMaster.getSelfGateway(JobMasterGateway.class).registerTaskManager(jobGraph.getJobID(), TaskManagerRegistrationInformation.create(createTestingTaskExecutorGateway.getAddress(), localUnresolvedTaskManagerLocation, TestingUtils.zeroUUID()), testingTimeout).get();
            Assertions.assertThat((ResourceID) completableFuture.join()).satisfiesAnyOf(new ThrowingConsumer[]{resourceID2 -> {
                Assertions.assertThat(resourceID2).isNull();
            }, resourceID3 -> {
                Assertions.assertThat(resourceID3).isEqualTo(this.jmResourceId);
            }});
            if (createJobMaster != null) {
                createJobMaster.close();
            }
        } catch (Throwable th) {
            if (createJobMaster != null) {
                try {
                    createJobMaster.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    void testHeartbeatTimeoutWithTaskManager() throws Exception {
        runHeartbeatTest(new TestingTaskExecutorGatewayBuilder().setHeartbeatJobManagerFunction((resourceID, allocatedSlotReport) -> {
            return FutureUtils.completedVoidFuture();
        }), fastHeartbeatServices);
    }

    private void runHeartbeatTest(TestingTaskExecutorGatewayBuilder testingTaskExecutorGatewayBuilder, HeartbeatServices heartbeatServices2) throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        LocalUnresolvedTaskManagerLocation localUnresolvedTaskManagerLocation = new LocalUnresolvedTaskManagerLocation();
        RpcGateway createTestingTaskExecutorGateway = testingTaskExecutorGatewayBuilder.setDisconnectJobManagerConsumer((jobID, th) -> {
            completableFuture.complete(jobID);
        }).createTestingTaskExecutorGateway();
        rpcService.registerGateway(createTestingTaskExecutorGateway.getAddress(), createTestingTaskExecutorGateway);
        JobMaster createJobMaster = new JobMasterBuilder(jobGraph, rpcService).withResourceId(this.jmResourceId).withConfiguration(this.configuration).withHighAvailabilityServices(this.haServices).withHeartbeatServices(heartbeatServices2).createJobMaster();
        try {
            createJobMaster.start();
            createJobMaster.getSelfGateway(JobMasterGateway.class).registerTaskManager(jobGraph.getJobID(), TaskManagerRegistrationInformation.create(createTestingTaskExecutorGateway.getAddress(), localUnresolvedTaskManagerLocation, TestingUtils.zeroUUID()), testingTimeout).get();
            Assertions.assertThat((JobID) completableFuture.get(testingTimeout.toMillis(), TimeUnit.MILLISECONDS)).isEqualTo(jobGraph.getJobID());
            if (createJobMaster != null) {
                createJobMaster.close();
            }
        } catch (Throwable th2) {
            if (createJobMaster != null) {
                try {
                    createJobMaster.close();
                } catch (Throwable th3) {
                    th2.addSuppressed(th3);
                }
            }
            throw th2;
        }
    }

    @Test
    void testTaskManagerBecomesUnreachableTriggersDisconnect() throws Exception {
        runHeartbeatTest(new TestingTaskExecutorGatewayBuilder().setHeartbeatJobManagerFunction((resourceID, allocatedSlotReport) -> {
            return FutureUtils.completedExceptionally(new RecipientUnreachableException("sender", "recipient", "test heartbeat target is unreachable"));
        }), heartbeatServices);
    }

    @Test
    void testAllocatedSlotReportDoesNotContainStaleInformation() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        LocalUnresolvedTaskManagerLocation localUnresolvedTaskManagerLocation = new LocalUnresolvedTaskManagerLocation();
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        OneShotLatch oneShotLatch = new OneShotLatch();
        RpcGateway createTestingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setHeartbeatJobManagerFunction((resourceID, allocatedSlotReport) -> {
            try {
                if (oneShotLatch.isTriggered()) {
                    Assertions.assertThat(allocatedSlotReport.getAllocatedSlotInfos()).hasSize(1);
                } else {
                    Assertions.assertThat(allocatedSlotReport.getAllocatedSlotInfos()).isEmpty();
                }
            } catch (AssertionError e) {
                completableFuture.completeExceptionally(e);
            }
            if (atomicBoolean.get()) {
                completableFuture.complete(null);
            }
            return FutureUtils.completedVoidFuture();
        }).createTestingTaskExecutorGateway();
        rpcService.registerGateway(createTestingTaskExecutorGateway.getAddress(), createTestingTaskExecutorGateway);
        JobGraph singleNoOpJobGraph = JobGraphTestUtils.singleNoOpJobGraph();
        JobMaster createJobMaster = new JobMasterBuilder(singleNoOpJobGraph, rpcService).withHeartbeatServices(new HeartbeatServicesImpl(5L, heartbeatInterval)).withSlotPoolServiceSchedulerFactory(DefaultSlotPoolServiceSchedulerFactory.create(new TestingSlotPoolFactory(oneShotLatch), new DefaultSchedulerFactory())).createJobMaster();
        try {
            createJobMaster.start();
            JobMasterGateway selfGateway = createJobMaster.getSelfGateway(JobMasterGateway.class);
            selfGateway.registerTaskManager(singleNoOpJobGraph.getJobID(), TaskManagerRegistrationInformation.create(createTestingTaskExecutorGateway.getAddress(), localUnresolvedTaskManagerLocation, TestingUtils.zeroUUID()), testingTimeout).get();
            SlotOffer slotOffer = new SlotOffer(new AllocationID(), 0, ResourceProfile.ANY);
            Assertions.assertThat((Collection) selfGateway.offerSlots(localUnresolvedTaskManagerLocation.getResourceID(), Collections.singleton(slotOffer), testingTimeout).get()).containsExactly(new SlotOffer[]{slotOffer});
            atomicBoolean.set(true);
            completableFuture.get();
            if (createJobMaster != null) {
                createJobMaster.close();
            }
        } catch (Throwable th) {
            if (createJobMaster != null) {
                try {
                    createJobMaster.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    void testHeartbeatTimeoutWithResourceManager() throws Exception {
        ResourceManagerId generate = ResourceManagerId.generate();
        RpcGateway testingResourceManagerGateway = new TestingResourceManagerGateway(generate, new ResourceID("rm"), "rm", "localhost");
        CompletableFuture completableFuture = new CompletableFuture();
        CompletableFuture completableFuture2 = new CompletableFuture();
        CountDownLatch countDownLatch = new CountDownLatch(2);
        testingResourceManagerGateway.setRegisterJobManagerFunction((jobMasterId, resourceID, str, jobID) -> {
            completableFuture.complete(Tuple3.of(jobMasterId, resourceID, jobID));
            countDownLatch.countDown();
            return CompletableFuture.completedFuture(testingResourceManagerGateway.getJobMasterRegistrationSuccess());
        });
        testingResourceManagerGateway.setDisconnectJobManagerConsumer(tuple3 -> {
            completableFuture2.complete((JobID) tuple3.f0);
        });
        rpcService.registerGateway("rm", testingResourceManagerGateway);
        JobMaster createJobMaster = new JobMasterBuilder(jobGraph, rpcService).withJobMasterId(this.jobMasterId).withResourceId(this.jmResourceId).withConfiguration(this.configuration).withHighAvailabilityServices(this.haServices).withHeartbeatServices(fastHeartbeatServices).createJobMaster();
        try {
            createJobMaster.start();
            this.rmLeaderRetrievalService.notifyListener("rm", generate.toUUID());
            Tuple3 tuple32 = (Tuple3) completableFuture.get(testingTimeout.toMillis(), TimeUnit.MILLISECONDS);
            Assertions.assertThat((JobMasterId) tuple32.f0).isEqualTo(this.jobMasterId);
            Assertions.assertThat((ResourceID) tuple32.f1).isEqualTo(this.jmResourceId);
            Assertions.assertThat((JobID) tuple32.f2).isEqualTo(jobGraph.getJobID());
            Assertions.assertThat((JobID) completableFuture2.get(testingTimeout.toMillis(), TimeUnit.MILLISECONDS)).isEqualTo(jobGraph.getJobID());
            countDownLatch.await();
            if (createJobMaster != null) {
                createJobMaster.close();
            }
        } catch (Throwable th) {
            if (createJobMaster != null) {
                try {
                    createJobMaster.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    void testResourceManagerBecomesUnreachableTriggersDisconnect() throws Exception {
        ResourceManagerId generate = ResourceManagerId.generate();
        ResourceID resourceID = new ResourceID("rm");
        RpcGateway testingResourceManagerGateway = new TestingResourceManagerGateway(generate, resourceID, "rm", "localhost");
        CompletableFuture completableFuture = new CompletableFuture();
        CountDownLatch countDownLatch = new CountDownLatch(2);
        ArrayDeque arrayDeque = new ArrayDeque(2);
        arrayDeque.add(CompletableFuture.completedFuture(testingResourceManagerGateway.getJobMasterRegistrationSuccess()));
        arrayDeque.add(new CompletableFuture());
        testingResourceManagerGateway.setRegisterJobManagerFunction((jobMasterId, resourceID2, str, jobID) -> {
            countDownLatch.countDown();
            return (CompletableFuture) arrayDeque.poll();
        });
        testingResourceManagerGateway.setDisconnectJobManagerConsumer(tuple3 -> {
            completableFuture.complete((JobID) tuple3.f0);
        });
        testingResourceManagerGateway.setJobMasterHeartbeatFunction(resourceID3 -> {
            return FutureUtils.completedExceptionally(new RecipientUnreachableException("sender", "recipient", "resource manager is unreachable"));
        });
        rpcService.registerGateway("rm", testingResourceManagerGateway);
        JobMaster createJobMaster = new JobMasterBuilder(jobGraph, rpcService).withJobMasterId(this.jobMasterId).withResourceId(this.jmResourceId).withConfiguration(this.configuration).withHighAvailabilityServices(this.haServices).withHeartbeatServices(heartbeatServices).createJobMaster();
        try {
            createJobMaster.start();
            this.rmLeaderRetrievalService.notifyListener("rm", generate.toUUID());
            JobMasterGateway selfGateway = createJobMaster.getSelfGateway(JobMasterGateway.class);
            CommonTestUtils.waitUntilCondition((SupplierWithException<Boolean, Exception>) () -> {
                selfGateway.heartbeatFromResourceManager(resourceID);
                return Boolean.valueOf(completableFuture.isDone());
            }, 50L);
            Assertions.assertThat((JobID) completableFuture.join()).isEqualTo(jobGraph.getJobID());
            countDownLatch.await();
            if (createJobMaster != null) {
                createJobMaster.close();
            }
        } catch (Throwable th) {
            if (createJobMaster != null) {
                try {
                    createJobMaster.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    void testRestoringFromSavepoint() throws Exception {
        JobGraph createJobGraphWithCheckpointing = createJobGraphWithCheckpointing(SavepointRestoreSettings.forPath(createSavepoint(42L).getAbsolutePath(), true));
        StandaloneCompletedCheckpointStore standaloneCompletedCheckpointStore = new StandaloneCompletedCheckpointStore(1);
        this.haServices.setCheckpointRecoveryFactory(PerJobCheckpointRecoveryFactory.withoutCheckpointStoreRecovery(i -> {
            return standaloneCompletedCheckpointStore;
        }));
        JobMaster createJobMaster = new JobMasterBuilder(createJobGraphWithCheckpointing, rpcService).withHighAvailabilityServices(this.haServices).createJobMaster();
        try {
            createJobMaster.start();
            OneShotLatch oneShotLatch = new OneShotLatch();
            registerSlotsAtJobMaster(1, (JobMasterGateway) createJobMaster.getSelfGateway(JobMasterGateway.class), createJobGraphWithCheckpointing.getJobID(), new TestingTaskExecutorGatewayBuilder().setSubmitTaskConsumer((taskDeploymentDescriptor, jobMasterId) -> {
                oneShotLatch.trigger();
                return CompletableFuture.completedFuture(Acknowledge.get());
            }).createTestingTaskExecutorGateway(), new LocalUnresolvedTaskManagerLocation());
            oneShotLatch.await();
            CompletedCheckpoint latestCheckpoint = standaloneCompletedCheckpointStore.getLatestCheckpoint();
            Assertions.assertThat(latestCheckpoint).isNotNull();
            Assertions.assertThat(latestCheckpoint.getCheckpointID()).isEqualTo(42L);
            if (createJobMaster != null) {
                createJobMaster.close();
            }
        } catch (Throwable th) {
            if (createJobMaster != null) {
                try {
                    createJobMaster.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    void testCheckpointPrecedesSavepointRecovery() throws Exception {
        JobGraph createJobGraphWithCheckpointing = createJobGraphWithCheckpointing(SavepointRestoreSettings.forPath(createSavepoint(42L).getAbsolutePath(), true));
        CompletedCheckpoint completedCheckpoint = new CompletedCheckpoint(createJobGraphWithCheckpointing.getJobID(), fastHeartbeatInterval, fastHeartbeatInterval, fastHeartbeatInterval, Collections.emptyMap(), (Collection) null, CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), new DummyCheckpointStorageLocation(), (CompletedCheckpointStats) null);
        StandaloneCompletedCheckpointStore standaloneCompletedCheckpointStore = new StandaloneCompletedCheckpointStore(1);
        standaloneCompletedCheckpointStore.addCheckpointAndSubsumeOldestOne(completedCheckpoint, new CheckpointsCleaner(), () -> {
        });
        this.haServices.setCheckpointRecoveryFactory(PerJobCheckpointRecoveryFactory.withoutCheckpointStoreRecovery(i -> {
            return standaloneCompletedCheckpointStore;
        }));
        JobMaster createJobMaster = new JobMasterBuilder(createJobGraphWithCheckpointing, rpcService).createJobMaster();
        try {
            CompletedCheckpoint latestCheckpoint = standaloneCompletedCheckpointStore.getLatestCheckpoint();
            Assertions.assertThat(latestCheckpoint).isNotNull();
            Assertions.assertThat(latestCheckpoint.getCheckpointID()).isEqualTo(fastHeartbeatInterval);
            if (createJobMaster != null) {
                createJobMaster.close();
            }
        } catch (Throwable th) {
            if (createJobMaster != null) {
                try {
                    createJobMaster.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    void testCloseUnestablishedResourceManagerConnection() throws Exception {
        JobMaster createJobMaster = new JobMasterBuilder(jobGraph, rpcService).withConfiguration(this.configuration).withHighAvailabilityServices(this.haServices).createJobMaster();
        try {
            createJobMaster.start();
            TestingResourceManagerGateway createAndRegisterTestingResourceManagerGateway = createAndRegisterTestingResourceManagerGateway();
            TestingResourceManagerGateway createAndRegisterTestingResourceManagerGateway2 = createAndRegisterTestingResourceManagerGateway();
            OneShotLatch oneShotLatch = new OneShotLatch();
            OneShotLatch oneShotLatch2 = new OneShotLatch();
            createAndRegisterTestingResourceManagerGateway.setRegisterJobManagerFunction((jobMasterId, resourceID, str, jobID) -> {
                oneShotLatch.trigger();
                return CompletableFuture.completedFuture(createAndRegisterTestingResourceManagerGateway.getJobMasterRegistrationSuccess());
            });
            createAndRegisterTestingResourceManagerGateway2.setRegisterJobManagerFunction((jobMasterId2, resourceID2, str2, jobID2) -> {
                oneShotLatch2.trigger();
                return CompletableFuture.completedFuture(createAndRegisterTestingResourceManagerGateway2.getJobMasterRegistrationSuccess());
            });
            notifyResourceManagerLeaderListeners(createAndRegisterTestingResourceManagerGateway);
            oneShotLatch.await();
            notifyResourceManagerLeaderListeners(createAndRegisterTestingResourceManagerGateway2);
            oneShotLatch2.await();
            if (createJobMaster != null) {
                createJobMaster.close();
            }
        } catch (Throwable th) {
            if (createJobMaster != null) {
                try {
                    createJobMaster.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    void testReconnectionAfterDisconnect() throws Exception {
        JobMaster createJobMaster = new JobMasterBuilder(jobGraph, rpcService).withJobMasterId(this.jobMasterId).withConfiguration(this.configuration).withHighAvailabilityServices(this.haServices).withHeartbeatServices(heartbeatServices).createJobMaster();
        try {
            createJobMaster.start();
            JobMasterGateway selfGateway = createJobMaster.getSelfGateway(JobMasterGateway.class);
            TestingResourceManagerGateway createAndRegisterTestingResourceManagerGateway = createAndRegisterTestingResourceManagerGateway();
            ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(1);
            createAndRegisterTestingResourceManagerGateway.setRegisterJobManagerFunction((jobMasterId, resourceID, str, jobID) -> {
                arrayBlockingQueue.offer(jobMasterId);
                return CompletableFuture.completedFuture(createAndRegisterTestingResourceManagerGateway.getJobMasterRegistrationSuccess());
            });
            ResourceManagerId m373getFencingToken = createAndRegisterTestingResourceManagerGateway.m373getFencingToken();
            notifyResourceManagerLeaderListeners(createAndRegisterTestingResourceManagerGateway);
            Assertions.assertThat((JobMasterId) arrayBlockingQueue.take()).isEqualTo(this.jobMasterId);
            Assertions.assertThat(arrayBlockingQueue).isEmpty();
            selfGateway.disconnectResourceManager(m373getFencingToken, new FlinkException("Test exception"));
            Assertions.assertThat((JobMasterId) arrayBlockingQueue.take()).isEqualTo(this.jobMasterId);
            if (createJobMaster != null) {
                createJobMaster.close();
            }
        } catch (Throwable th) {
            if (createJobMaster != null) {
                try {
                    createJobMaster.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    void testResourceManagerConnectionAfterStart() throws Exception {
        JobMaster createJobMaster = new JobMasterBuilder(jobGraph, rpcService).withJobMasterId(this.jobMasterId).withConfiguration(this.configuration).withHighAvailabilityServices(this.haServices).withHeartbeatServices(heartbeatServices).createJobMaster();
        try {
            TestingResourceManagerGateway createAndRegisterTestingResourceManagerGateway = createAndRegisterTestingResourceManagerGateway();
            ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(1);
            createAndRegisterTestingResourceManagerGateway.setRegisterJobManagerFunction((jobMasterId, resourceID, str, jobID) -> {
                arrayBlockingQueue.offer(jobMasterId);
                return CompletableFuture.completedFuture(createAndRegisterTestingResourceManagerGateway.getJobMasterRegistrationSuccess());
            });
            notifyResourceManagerLeaderListeners(createAndRegisterTestingResourceManagerGateway);
            createJobMaster.start();
            Assertions.assertThat((JobMasterId) arrayBlockingQueue.take()).isEqualTo(this.jobMasterId);
            if (createJobMaster != null) {
                createJobMaster.close();
            }
        } catch (Throwable th) {
            if (createJobMaster != null) {
                try {
                    createJobMaster.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Tag("org.apache.flink.testutils.junit.FailsWithAdaptiveScheduler")
    @Test
    void testRequestNextInputSplitWithLocalFailover() throws Exception {
        this.configuration.set(JobManagerOptions.EXECUTION_FAILOVER_STRATEGY, "region");
        runRequestNextInputSplitTest(list -> {
            return (Collection) list.get(0);
        });
    }

    @Test
    void testRequestNextInputSplitWithGlobalFailover() throws Exception {
        this.configuration.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 1);
        this.configuration.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_DELAY, Duration.ofSeconds(0L));
        this.configuration.set(JobManagerOptions.EXECUTION_FAILOVER_STRATEGY, "full");
        runRequestNextInputSplitTest(this::flattenCollection);
    }

    private void runRequestNextInputSplitTest(Function<List<List<InputSplit>>, Collection<InputSplit>> function) throws Exception {
        ArrayList arrayList = new ArrayList(4);
        for (int i = 0; i < 4; i++) {
            arrayList.add(new TestingInputSplit(i));
        }
        TestingInputSplitSource testingInputSplitSource = new TestingInputSplitSource(arrayList);
        JobVertex jobVertex = new JobVertex("source");
        jobVertex.setParallelism(2);
        jobVertex.setInputSplitSource(testingInputSplitSource);
        jobVertex.setInvokableClass(AbstractInvokable.class);
        JobGraph build = JobGraphBuilder.newStreamingJobGraphBuilder().addJobVertex(jobVertex).build();
        RestartStrategyUtils.configureFixedDelayRestartStrategy(build, 100, 0L);
        JobMaster createJobMaster = new JobMasterBuilder(build, rpcService).withConfiguration(this.configuration).withHighAvailabilityServices(this.haServices).withHeartbeatServices(heartbeatServices).createJobMaster();
        try {
            createJobMaster.start();
            JobMasterGateway jobMasterGateway = (JobMasterGateway) createJobMaster.getSelfGateway(JobMasterGateway.class);
            registerSlotsRequiredForJobExecution(jobMasterGateway, build.getJobID(), 2);
            waitUntilAllExecutionsAreScheduledOrDeployed(jobMasterGateway);
            JobVertexID id = jobVertex.getID();
            List<AccessExecution> executions = getExecutions(jobMasterGateway, id);
            ExecutionAttemptID attemptId = executions.get(0).getAttemptId();
            ArrayList arrayList2 = new ArrayList(2);
            Iterator<AccessExecution> it = executions.iterator();
            while (it.hasNext()) {
                arrayList2.add(getInputSplits(2, getInputSplitSupplier(id, jobMasterGateway, it.next().getAttemptId())));
            }
            Assertions.assertThat(flattenCollection(arrayList2)).containsExactlyInAnyOrder((InputSplit[]) arrayList.toArray(EMPTY_TESTING_INPUT_SPLITS));
            jobMasterGateway.updateTaskExecutionState(new TaskExecutionState(attemptId, ExecutionState.FAILED)).get();
            waitUntilAllExecutionsAreScheduledOrDeployed(jobMasterGateway);
            Assertions.assertThat(getRemainingInputSplits(getInputSplitSupplier(id, jobMasterGateway, getFirstExecution(jobMasterGateway, id).getAttemptId()))).containsExactlyInAnyOrder((InputSplit[]) function.apply(arrayList2).toArray(EMPTY_TESTING_INPUT_SPLITS));
            if (createJobMaster != null) {
                createJobMaster.close();
            }
        } catch (Throwable th) {
            if (createJobMaster != null) {
                try {
                    createJobMaster.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Nonnull
    private List<InputSplit> flattenCollection(List<List<InputSplit>> list) {
        return (List) list.stream().flatMap((v0) -> {
            return v0.stream();
        }).collect(Collectors.toList());
    }

    @Nonnull
    private Supplier<SerializedInputSplit> getInputSplitSupplier(JobVertexID jobVertexID, JobMasterGateway jobMasterGateway, ExecutionAttemptID executionAttemptID) {
        return () -> {
            return getInputSplit(jobMasterGateway, jobVertexID, executionAttemptID);
        };
    }

    private void waitUntilAllExecutionsAreScheduledOrDeployed(JobMasterGateway jobMasterGateway) throws Exception {
        CommonTestUtils.waitUntilCondition(() -> {
            Collection<AccessExecution> executions = getExecutions(jobMasterGateway);
            return Boolean.valueOf(!executions.isEmpty() && executions.stream().allMatch(accessExecution -> {
                return accessExecution.getState() == ExecutionState.SCHEDULED || accessExecution.getState() == ExecutionState.DEPLOYING;
            }));
        });
    }

    private static AccessExecution getFirstExecution(JobMasterGateway jobMasterGateway, JobVertexID jobVertexID) {
        List<AccessExecution> executions = getExecutions(jobMasterGateway, jobVertexID);
        Assertions.assertThat(executions.size()).isGreaterThanOrEqualTo(1);
        return executions.get(0);
    }

    private static Collection<AccessExecution> getExecutions(JobMasterGateway jobMasterGateway) {
        return (Collection) requestExecutionGraph(jobMasterGateway).getArchivedExecutionGraph().getAllVertices().values().stream().flatMap(accessExecutionJobVertex -> {
            return Arrays.stream(accessExecutionJobVertex.getTaskVertices());
        }).map((v0) -> {
            return v0.getCurrentExecutionAttempt();
        }).collect(Collectors.toList());
    }

    private static List<AccessExecution> getExecutions(JobMasterGateway jobMasterGateway, JobVertexID jobVertexID) {
        return (List) ((List) Optional.ofNullable((AccessExecutionJobVertex) requestExecutionGraph(jobMasterGateway).getArchivedExecutionGraph().getAllVertices().get(jobVertexID)).map(accessExecutionJobVertex -> {
            return Arrays.asList(accessExecutionJobVertex.getTaskVertices());
        }).orElse(Collections.emptyList())).stream().map((v0) -> {
            return v0.getCurrentExecutionAttempt();
        }).collect(Collectors.toList());
    }

    private static ExecutionGraphInfo requestExecutionGraph(JobMasterGateway jobMasterGateway) {
        try {
            return (ExecutionGraphInfo) jobMasterGateway.requestJob(testingTimeout).get();
        } catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    @Nonnull
    private static List<InputSplit> getInputSplits(int i, Supplier<SerializedInputSplit> supplier) throws Exception {
        ArrayList arrayList = new ArrayList(i);
        for (int i2 = 0; i2 < i; i2++) {
            SerializedInputSplit serializedInputSplit = supplier.get();
            Assertions.assertThat(serializedInputSplit.isEmpty()).isFalse();
            arrayList.add((InputSplit) InstantiationUtil.deserializeObject(serializedInputSplit.getInputSplitData(), ClassLoader.getSystemClassLoader()));
        }
        return arrayList;
    }

    private List<InputSplit> getRemainingInputSplits(Supplier<SerializedInputSplit> supplier) throws Exception {
        ArrayList arrayList = new ArrayList(16);
        boolean z = true;
        while (z) {
            SerializedInputSplit serializedInputSplit = supplier.get();
            if (serializedInputSplit.isEmpty()) {
                z = false;
            } else {
                InputSplit inputSplit = (InputSplit) InstantiationUtil.deserializeObject(serializedInputSplit.getInputSplitData(), ClassLoader.getSystemClassLoader());
                if (inputSplit == null) {
                    z = false;
                } else {
                    arrayList.add(inputSplit);
                }
            }
        }
        return arrayList;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static SerializedInputSplit getInputSplit(JobMasterGateway jobMasterGateway, JobVertexID jobVertexID, ExecutionAttemptID executionAttemptID) {
        try {
            return (SerializedInputSplit) jobMasterGateway.requestNextInputSplit(jobVertexID, executionAttemptID).get();
        } catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    @Test
    void testRequestPartitionState() throws Exception {
        JobGraph producerConsumerJobGraph = producerConsumerJobGraph();
        JobMaster createJobMaster = new JobMasterBuilder(producerConsumerJobGraph, rpcService).withConfiguration(this.configuration).withHighAvailabilityServices(this.haServices).withHeartbeatServices(heartbeatServices).createJobMaster();
        try {
            createJobMaster.start();
            CompletableFuture completableFuture = new CompletableFuture();
            TestingTaskExecutorGateway createTestingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setSubmitTaskConsumer((taskDeploymentDescriptor, jobMasterId) -> {
                completableFuture.complete(taskDeploymentDescriptor);
                return CompletableFuture.completedFuture(Acknowledge.get());
            }).createTestingTaskExecutorGateway();
            LocalUnresolvedTaskManagerLocation localUnresolvedTaskManagerLocation = new LocalUnresolvedTaskManagerLocation();
            JobMasterGateway jobMasterGateway = (JobMasterGateway) createJobMaster.getSelfGateway(JobMasterGateway.class);
            Assertions.assertThat(registerSlotsAtJobMaster(1, jobMasterGateway, producerConsumerJobGraph.getJobID(), createTestingTaskExecutorGateway, localUnresolvedTaskManagerLocation)).hasSize(1);
            TaskDeploymentDescriptor taskDeploymentDescriptor2 = (TaskDeploymentDescriptor) completableFuture.get();
            Assertions.assertThat(taskDeploymentDescriptor2.getProducedPartitions()).hasSize(1);
            ResultPartitionDeploymentDescriptor resultPartitionDeploymentDescriptor = (ResultPartitionDeploymentDescriptor) taskDeploymentDescriptor2.getProducedPartitions().iterator().next();
            ExecutionAttemptID executionAttemptId = taskDeploymentDescriptor2.getExecutionAttemptId();
            ExecutionAttemptID clone = InstantiationUtil.clone(executionAttemptId);
            jobMasterGateway.updateTaskExecutionState(SchedulerTestingUtils.createFinishedTaskExecutionState(executionAttemptId)).get();
            ResultPartitionID resultPartitionID = new ResultPartitionID(resultPartitionDeploymentDescriptor.getPartitionId(), clone);
            Assertions.assertThat((ExecutionState) jobMasterGateway.requestPartitionState(resultPartitionDeploymentDescriptor.getResultId(), resultPartitionID).get()).isEqualTo(ExecutionState.FINISHED);
            CompletableFuture requestPartitionState = jobMasterGateway.requestPartitionState(resultPartitionDeploymentDescriptor.getResultId(), new ResultPartitionID());
            Objects.requireNonNull(requestPartitionState);
            Assertions.assertThatThrownBy(requestPartitionState::get).hasRootCauseInstanceOf(IllegalArgumentException.class);
            CompletableFuture requestPartitionState2 = jobMasterGateway.requestPartitionState(new IntermediateDataSetID(), resultPartitionID);
            Objects.requireNonNull(requestPartitionState2);
            Assertions.assertThatThrownBy(requestPartitionState2::get).hasRootCauseInstanceOf(IllegalArgumentException.class);
            CompletableFuture requestPartitionState3 = jobMasterGateway.requestPartitionState(resultPartitionDeploymentDescriptor.getResultId(), new ResultPartitionID(resultPartitionDeploymentDescriptor.getPartitionId(), ExecutionGraphTestUtils.createExecutionAttemptId()));
            Objects.requireNonNull(requestPartitionState3);
            Assertions.assertThatThrownBy(requestPartitionState3::get).hasRootCauseInstanceOf(PartitionProducerDisposedException.class);
            if (createJobMaster != null) {
                createJobMaster.close();
            }
        } catch (Throwable th) {
            if (createJobMaster != null) {
                try {
                    createJobMaster.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private void notifyResourceManagerLeaderListeners(TestingResourceManagerGateway testingResourceManagerGateway) {
        this.rmLeaderRetrievalService.notifyListener(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway.m373getFencingToken().toUUID());
    }

    @Test
    void testTriggerSavepointTimeout() throws Exception {
        JobMaster createJobMaster = new JobMasterBuilder(jobGraph, rpcService).withFatalErrorHandler(this.testingFatalErrorHandler).withSlotPoolServiceSchedulerFactory(DefaultSlotPoolServiceSchedulerFactory.create(TestingSlotPoolServiceBuilder.newBuilder(), new TestingSchedulerNGFactory(TestingSchedulerNG.newBuilder().setTriggerSavepointFunction((str, bool, savepointFormatType) -> {
            return new CompletableFuture();
        }).build()))).createJobMaster();
        try {
            createJobMaster.start();
            JobMasterGateway selfGateway = createJobMaster.getSelfGateway(JobMasterGateway.class);
            CompletableFuture triggerSavepoint = selfGateway.triggerSavepoint("/tmp", false, SavepointFormatType.CANONICAL, Duration.ofMillis(fastHeartbeatInterval));
            CompletableFuture triggerSavepoint2 = selfGateway.triggerSavepoint("/tmp", false, SavepointFormatType.CANONICAL, RpcUtils.INF_TIMEOUT);
            Assertions.assertThatThrownBy(() -> {
                triggerSavepoint.get(testingTimeout.toMillis(), TimeUnit.MILLISECONDS);
            }).hasRootCauseInstanceOf(TimeoutException.class);
            Assertions.assertThat(triggerSavepoint2).isNotDone();
            if (createJobMaster != null) {
                createJobMaster.close();
            }
        } catch (Throwable th) {
            if (createJobMaster != null) {
                try {
                    createJobMaster.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    void testReleasingTaskExecutorIfNoMoreSlotsRegistered() throws Exception {
        JobGraph createSingleVertexJobWithRestartStrategy = createSingleVertexJobWithRestartStrategy();
        JobMaster createJobMaster = new JobMasterBuilder(createSingleVertexJobWithRestartStrategy, rpcService).withConfiguration(this.configuration).withHighAvailabilityServices(this.haServices).withHeartbeatServices(heartbeatServices).createJobMaster();
        try {
            CompletableFuture completableFuture = new CompletableFuture();
            CompletableFuture completableFuture2 = new CompletableFuture();
            TaskExecutorGateway createTestingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setFreeSlotFunction((allocationID, th) -> {
                completableFuture2.complete(allocationID);
                return CompletableFuture.completedFuture(Acknowledge.get());
            }).setDisconnectJobManagerConsumer((jobID, th2) -> {
                completableFuture.complete(jobID);
            }).createTestingTaskExecutorGateway();
            LocalUnresolvedTaskManagerLocation localUnresolvedTaskManagerLocation = new LocalUnresolvedTaskManagerLocation();
            createJobMaster.start();
            JobMasterGateway selfGateway = createJobMaster.getSelfGateway(JobMasterGateway.class);
            Collection<SlotOffer> registerSlotsAtJobMaster = registerSlotsAtJobMaster(1, selfGateway, createSingleVertexJobWithRestartStrategy.getJobID(), createTestingTaskExecutorGateway, localUnresolvedTaskManagerLocation);
            Assertions.assertThat(registerSlotsAtJobMaster).hasSize(1);
            AllocationID allocationId = registerSlotsAtJobMaster.iterator().next().getAllocationId();
            selfGateway.failSlot(localUnresolvedTaskManagerLocation.getResourceID(), allocationId, new FlinkException("Fail allocation test exception"));
            Assertions.assertThat((AllocationID) completableFuture2.get()).isEqualTo(allocationId);
            Assertions.assertThat((JobID) completableFuture.get()).isEqualTo(createSingleVertexJobWithRestartStrategy.getJobID());
            if (createJobMaster != null) {
                createJobMaster.close();
            }
        } catch (Throwable th3) {
            if (createJobMaster != null) {
                try {
                    createJobMaster.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }

    @Test
    void testTaskExecutorNotReleasedOnFailedAllocationIfPartitionIsAllocated() throws Exception {
        new TestingJobManagerSharedServicesBuilder().build();
        JobGraph singleNoOpJobGraph = JobGraphTestUtils.singleNoOpJobGraph();
        LocalUnresolvedTaskManagerLocation localUnresolvedTaskManagerLocation = new LocalUnresolvedTaskManagerLocation();
        AtomicBoolean atomicBoolean = new AtomicBoolean(true);
        TestingJobMasterPartitionTracker testingJobMasterPartitionTracker = new TestingJobMasterPartitionTracker();
        testingJobMasterPartitionTracker.setIsTrackingPartitionsForFunction(resourceID -> {
            return Boolean.valueOf(atomicBoolean.get());
        });
        JobMaster createJobMaster = new JobMasterBuilder(singleNoOpJobGraph, rpcService).withConfiguration(this.configuration).withHighAvailabilityServices(this.haServices).withHeartbeatServices(heartbeatServices).withPartitionTrackerFactory(taskExecutorGatewayLookup -> {
            return testingJobMasterPartitionTracker;
        }).createJobMaster();
        try {
            CompletableFuture completableFuture = new CompletableFuture();
            CompletableFuture completableFuture2 = new CompletableFuture();
            TaskExecutorGateway createTestingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setFreeSlotFunction((allocationID, th) -> {
                completableFuture2.complete(allocationID);
                return CompletableFuture.completedFuture(Acknowledge.get());
            }).setDisconnectJobManagerConsumer((jobID, th2) -> {
                completableFuture.complete(jobID);
            }).createTestingTaskExecutorGateway();
            createJobMaster.start();
            JobMasterGateway selfGateway = createJobMaster.getSelfGateway(JobMasterGateway.class);
            Collection<SlotOffer> registerSlotsAtJobMaster = registerSlotsAtJobMaster(1, selfGateway, singleNoOpJobGraph.getJobID(), createTestingTaskExecutorGateway, localUnresolvedTaskManagerLocation);
            Assertions.assertThat(registerSlotsAtJobMaster).hasSize(1);
            AllocationID allocationId = registerSlotsAtJobMaster.iterator().next().getAllocationId();
            selfGateway.failSlot(localUnresolvedTaskManagerLocation.getResourceID(), allocationId, new FlinkException("Fail allocation test exception"));
            Assertions.assertThat((AllocationID) completableFuture2.get()).isEqualTo(allocationId);
            selfGateway.requestJobStatus(Duration.ofSeconds(5L)).get();
            Assertions.assertThat(completableFuture).isNotDone();
            if (createJobMaster != null) {
                createJobMaster.close();
            }
        } catch (Throwable th3) {
            if (createJobMaster != null) {
                try {
                    createJobMaster.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }

    @Test
    void testJobMasterAggregatesValuesCorrectly() throws Exception {
        JobMaster createJobMaster = new JobMasterBuilder(jobGraph, rpcService).withConfiguration(this.configuration).withHighAvailabilityServices(this.haServices).withHeartbeatServices(heartbeatServices).createJobMaster();
        try {
            createJobMaster.start();
            JobMasterGateway selfGateway = createJobMaster.getSelfGateway(JobMasterGateway.class);
            AggregateFunction<Integer, Integer, Integer> createAggregateFunction = createAggregateFunction();
            ClosureCleaner.clean(createAggregateFunction, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
            byte[] serializeObject = InstantiationUtil.serializeObject(createAggregateFunction);
            Assertions.assertThat(selfGateway.updateGlobalAggregate("agg1", 1, serializeObject).get()).isEqualTo(1);
            Assertions.assertThat(selfGateway.updateGlobalAggregate("agg1", 2, serializeObject).get()).isEqualTo(3);
            Assertions.assertThat(selfGateway.updateGlobalAggregate("agg1", 3, serializeObject).get()).isEqualTo(6);
            Assertions.assertThat(selfGateway.updateGlobalAggregate("agg1", 4, serializeObject).get()).isEqualTo(10);
            Assertions.assertThat(selfGateway.updateGlobalAggregate("agg2", 10, serializeObject).get()).isEqualTo(10);
            Assertions.assertThat(selfGateway.updateGlobalAggregate("agg2", 23, serializeObject).get()).isEqualTo(33);
            if (createJobMaster != null) {
                createJobMaster.close();
            }
        } catch (Throwable th) {
            if (createJobMaster != null) {
                try {
                    createJobMaster.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private AggregateFunction<Integer, Integer, Integer> createAggregateFunction() {
        return new AggregateFunction<Integer, Integer, Integer>() { // from class: org.apache.flink.runtime.jobmaster.JobMasterTest.1
            /* renamed from: createAccumulator, reason: merged with bridge method [inline-methods] */
            public Integer m225createAccumulator() {
                return 0;
            }

            public Integer add(Integer num, Integer num2) {
                return Integer.valueOf(num2.intValue() + num.intValue());
            }

            public Integer getResult(Integer num) {
                return num;
            }

            public Integer merge(Integer num, Integer num2) {
                return add(num, num2);
            }
        };
    }

    @Nonnull
    private TestingResourceManagerGateway createAndRegisterTestingResourceManagerGateway() {
        RpcGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
        rpcService.registerGateway(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway);
        return testingResourceManagerGateway;
    }

    @Test
    void testJobFailureWhenGracefulTaskExecutorTermination() throws Exception {
        runJobFailureWhenTaskExecutorTerminatesTest(heartbeatServices, (localUnresolvedTaskManagerLocation, jobMasterGateway) -> {
            jobMasterGateway.disconnectTaskManager(localUnresolvedTaskManagerLocation.getResourceID(), new FlinkException("Test disconnectTaskManager exception."));
        });
    }

    @Test
    void testJobFailureWhenTaskExecutorHeartbeatTimeout() throws Exception {
        TestingHeartbeatServices testingHeartbeatServices = new TestingHeartbeatServices(heartbeatInterval, heartbeatTimeout);
        runJobFailureWhenTaskExecutorTerminatesTest(testingHeartbeatServices, (localUnresolvedTaskManagerLocation, jobMasterGateway) -> {
            testingHeartbeatServices.triggerHeartbeatTimeout(this.jmResourceId, localUnresolvedTaskManagerLocation.getResourceID());
        });
    }

    @Test
    void testJobMasterRejectsTaskExecutorRegistrationIfJobIdsAreNotEqual() throws Exception {
        JobMaster createJobMaster = new JobMasterBuilder(jobGraph, rpcService).createJobMaster();
        try {
            createJobMaster.start();
            Assertions.assertThat((RegistrationResponse) createJobMaster.registerTaskManager(new JobID(), TaskManagerRegistrationInformation.create("foobar", new LocalUnresolvedTaskManagerLocation(), TestingUtils.zeroUUID()), testingTimeout).get()).isInstanceOf(JMTMRegistrationRejection.class);
            if (createJobMaster != null) {
                createJobMaster.close();
            }
        } catch (Throwable th) {
            if (createJobMaster != null) {
                try {
                    createJobMaster.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    void testJobMasterAcknowledgesDuplicateTaskExecutorRegistrations() throws Exception {
        JobMaster createJobMaster = new JobMasterBuilder(jobGraph, rpcService).createJobMaster();
        try {
            RpcGateway createTestingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway();
            rpcService.registerGateway(createTestingTaskExecutorGateway.getAddress(), createTestingTaskExecutorGateway);
            createJobMaster.start();
            TaskManagerRegistrationInformation create = TaskManagerRegistrationInformation.create(createTestingTaskExecutorGateway.getAddress(), new LocalUnresolvedTaskManagerLocation(), UUID.randomUUID());
            CompletableFuture registerTaskManager = createJobMaster.registerTaskManager(jobGraph.getJobID(), create, testingTimeout);
            CompletableFuture registerTaskManager2 = createJobMaster.registerTaskManager(jobGraph.getJobID(), create, testingTimeout);
            Assertions.assertThat((RegistrationResponse) registerTaskManager.get()).isInstanceOf(JMTMRegistrationSuccess.class);
            Assertions.assertThat((RegistrationResponse) registerTaskManager2.get()).isInstanceOf(JMTMRegistrationSuccess.class);
            if (createJobMaster != null) {
                createJobMaster.close();
            }
        } catch (Throwable th) {
            if (createJobMaster != null) {
                try {
                    createJobMaster.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    void testJobMasterDisconnectsOldTaskExecutorIfNewSessionIsSeen() throws Exception {
        JobMaster createJobMaster = new JobMasterBuilder(jobGraph, rpcService).createJobMaster();
        try {
            CompletableFuture completableFuture = new CompletableFuture();
            RpcGateway createTestingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setAddress("firstTaskExecutor").setDisconnectJobManagerConsumer((jobID, th) -> {
                completableFuture.complete(null);
            }).createTestingTaskExecutorGateway();
            RpcGateway createTestingTaskExecutorGateway2 = new TestingTaskExecutorGatewayBuilder().setAddress("secondTaskExecutor").createTestingTaskExecutorGateway();
            rpcService.registerGateway(createTestingTaskExecutorGateway.getAddress(), createTestingTaskExecutorGateway);
            rpcService.registerGateway(createTestingTaskExecutorGateway2.getAddress(), createTestingTaskExecutorGateway2);
            createJobMaster.start();
            LocalUnresolvedTaskManagerLocation localUnresolvedTaskManagerLocation = new LocalUnresolvedTaskManagerLocation();
            Assertions.assertThat((RegistrationResponse) createJobMaster.registerTaskManager(jobGraph.getJobID(), TaskManagerRegistrationInformation.create(createTestingTaskExecutorGateway.getAddress(), localUnresolvedTaskManagerLocation, UUID.randomUUID()), testingTimeout).get()).isInstanceOf(JMTMRegistrationSuccess.class);
            Assertions.assertThat((RegistrationResponse) createJobMaster.registerTaskManager(jobGraph.getJobID(), TaskManagerRegistrationInformation.create(createTestingTaskExecutorGateway2.getAddress(), localUnresolvedTaskManagerLocation, UUID.randomUUID()), testingTimeout).get()).isInstanceOf(JMTMRegistrationSuccess.class);
            completableFuture.get();
            if (createJobMaster != null) {
                createJobMaster.close();
            }
        } catch (Throwable th2) {
            if (createJobMaster != null) {
                try {
                    createJobMaster.close();
                } catch (Throwable th3) {
                    th2.addSuppressed(th3);
                }
            }
            throw th2;
        }
    }

    @Test
    void testJobMasterOnlyTerminatesAfterTheSchedulerHasClosed() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        JobMaster createJobMaster = new JobMasterBuilder(jobGraph, rpcService).withSlotPoolServiceSchedulerFactory(DefaultSlotPoolServiceSchedulerFactory.create(TestingSlotPoolServiceBuilder.newBuilder(), new TestingSchedulerNGFactory(TestingSchedulerNG.newBuilder().setCloseAsyncSupplier(() -> {
            return completableFuture;
        }).build()))).createJobMaster();
        try {
            createJobMaster.start();
            CompletableFuture closeAsync = createJobMaster.closeAsync();
            Assertions.assertThatThrownBy(() -> {
                closeAsync.get(fastHeartbeatTimeout, TimeUnit.MILLISECONDS);
            }).as("Expected TimeoutException because the JobMaster should not terminate.", new Object[0]).isInstanceOf(TimeoutException.class);
            completableFuture.complete(null);
            closeAsync.get();
            if (createJobMaster != null) {
                createJobMaster.close();
            }
        } catch (Throwable th) {
            if (createJobMaster != null) {
                try {
                    createJobMaster.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    void testJobMasterAcceptsSlotsWhenJobIsRestarting() throws Exception {
        this.configuration.set(RestartStrategyOptions.RESTART_STRATEGY, RestartStrategyOptions.RestartStrategyType.FIXED_DELAY.getMainValue());
        this.configuration.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_DELAY, Duration.ofDays(fastHeartbeatInterval));
        JobMaster createJobMaster = new JobMasterBuilder(jobGraph, rpcService).withConfiguration(this.configuration).createJobMaster();
        try {
            createJobMaster.start();
            JobMasterGateway selfGateway = createJobMaster.getSelfGateway(JobMasterGateway.class);
            LocalUnresolvedTaskManagerLocation localUnresolvedTaskManagerLocation = new LocalUnresolvedTaskManagerLocation();
            registerSlotsAtJobMaster(1, selfGateway, jobGraph.getJobID(), new TestingTaskExecutorGatewayBuilder().setAddress("firstTaskManager").createTestingTaskExecutorGateway(), localUnresolvedTaskManagerLocation);
            CommonTestUtils.waitUntilCondition(() -> {
                return Boolean.valueOf(selfGateway.requestJobStatus(testingTimeout).get() == JobStatus.RUNNING);
            });
            selfGateway.disconnectTaskManager(localUnresolvedTaskManagerLocation.getResourceID(), new FlinkException("Test exception."));
            CommonTestUtils.waitUntilCondition(() -> {
                return Boolean.valueOf(selfGateway.requestJobStatus(testingTimeout).get() == JobStatus.RESTARTING);
            });
            Assertions.assertThat(registerSlotsAtJobMaster(1, selfGateway, jobGraph.getJobID(), new TestingTaskExecutorGatewayBuilder().setAddress("secondTaskManager").createTestingTaskExecutorGateway(), new LocalUnresolvedTaskManagerLocation())).hasSize(1);
            if (createJobMaster != null) {
                createJobMaster.close();
            }
        } catch (Throwable th) {
            if (createJobMaster != null) {
                try {
                    createJobMaster.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    void testGetPartitionWithMetrics() throws Exception {
        JobVertex jobVertex = new JobVertex("jobVertex");
        jobVertex.setInvokableClass(NoOpInvokable.class);
        jobVertex.setParallelism(1);
        JobGraph batchJobGraph = JobGraphTestUtils.batchJobGraph(jobVertex);
        JobMaster createJobMaster = new JobMasterBuilder(batchJobGraph, rpcService).withConfiguration(this.configuration).withHighAvailabilityServices(this.haServices).withHeartbeatServices(heartbeatServices).withBlocklistHandlerFactory(new DefaultBlocklistHandler.Factory(Duration.ofMillis(100L))).createJobMaster();
        try {
            createJobMaster.start();
            JobMasterGateway jobMasterGateway = (JobMasterGateway) createJobMaster.getSelfGateway(JobMasterGateway.class);
            DefaultShuffleMetrics defaultShuffleMetrics = new DefaultShuffleMetrics(new ResultPartitionBytes(new long[]{fastHeartbeatInterval, 2, 3}));
            NettyShuffleDescriptor buildLocal = NettyShuffleDescriptorBuilder.newBuilder().buildLocal();
            List singletonList = Collections.singletonList(new DefaultPartitionWithMetrics(buildLocal, defaultShuffleMetrics));
            DefaultShuffleMetrics defaultShuffleMetrics2 = new DefaultShuffleMetrics(new ResultPartitionBytes(new long[]{4, 5, 6}));
            NettyShuffleDescriptor buildLocal2 = NettyShuffleDescriptorBuilder.newBuilder().buildLocal();
            List singletonList2 = Collections.singletonList(new DefaultPartitionWithMetrics(buildLocal2, defaultShuffleMetrics2));
            DefaultShuffleMetrics defaultShuffleMetrics3 = new DefaultShuffleMetrics(new ResultPartitionBytes(new long[]{7, 8, 9}));
            NettyShuffleDescriptor buildLocal3 = NettyShuffleDescriptorBuilder.newBuilder().buildLocal();
            List singletonList3 = Collections.singletonList(new DefaultPartitionWithMetrics(buildLocal3, defaultShuffleMetrics3));
            DefaultShuffleMetrics defaultShuffleMetrics4 = new DefaultShuffleMetrics(new ResultPartitionBytes(new long[]{fastHeartbeatTimeout, 11}));
            NettyShuffleDescriptor buildLocal4 = NettyShuffleDescriptorBuilder.newBuilder().buildLocal();
            List singletonList4 = Collections.singletonList(new DefaultPartitionWithMetrics(buildLocal4, defaultShuffleMetrics4));
            registerSlotsAtJobMaster(1, jobMasterGateway, batchJobGraph.getJobID(), new TestingTaskExecutorGatewayBuilder().setRequestPartitionWithMetricsFunction(jobID -> {
                return CompletableFuture.completedFuture(singletonList);
            }).setAddress("tm1").createTestingTaskExecutorGateway(), new LocalUnresolvedTaskManagerLocation());
            createJobMaster.startFetchAndRetainPartitionWithMetricsOnTaskManager();
            verifyPartitionMetrics(createJobMaster.getPartitionWithMetricsOnTaskManagers(), singletonList);
            registerSlotsAtJobMaster(1, jobMasterGateway, batchJobGraph.getJobID(), new TestingTaskExecutorGatewayBuilder().setRequestPartitionWithMetricsFunction(jobID2 -> {
                return CompletableFuture.completedFuture(singletonList2);
            }).setAddress("tm2").createTestingTaskExecutorGateway(), new LocalUnresolvedTaskManagerLocation());
            ArrayList arrayList = new ArrayList(singletonList);
            arrayList.addAll(singletonList2);
            verifyPartitionMetrics(createJobMaster.getPartitionWithMetricsOnTaskManagers(), arrayList);
            CompletableFuture completableFuture = new CompletableFuture();
            CompletableFuture completableFuture2 = new CompletableFuture();
            registerSlotsAtJobMaster(1, jobMasterGateway, batchJobGraph.getJobID(), new TestingTaskExecutorGatewayBuilder().setRequestPartitionWithMetricsFunction(jobID3 -> {
                return completableFuture2;
            }).setReleasePartitionsConsumer((jobID4, set) -> {
                completableFuture.complete(Tuple2.of(jobID4, set));
            }).setAddress("tm3").createTestingTaskExecutorGateway(), new LocalUnresolvedTaskManagerLocation());
            registerSlotsAtJobMaster(1, jobMasterGateway, batchJobGraph.getJobID(), new TestingTaskExecutorGatewayBuilder().setRequestPartitionWithMetricsFunction(jobID5 -> {
                return CompletableFuture.completedFuture(singletonList4);
            }).setAddress("tm4").createTestingTaskExecutorGateway(), new LocalUnresolvedTaskManagerLocation());
            Duration ofSeconds = Duration.ofSeconds(fastHeartbeatTimeout);
            HashSet hashSet = new HashSet();
            hashSet.add(buildLocal.getResultPartitionID());
            hashSet.add(buildLocal2.getResultPartitionID());
            hashSet.add(buildLocal4.getResultPartitionID());
            CompletableFuture partitionWithMetrics = jobMasterGateway.getPartitionWithMetrics(ofSeconds, hashSet);
            ArrayList arrayList2 = new ArrayList(singletonList);
            arrayList2.addAll(singletonList2);
            arrayList2.addAll(singletonList4);
            Assertions.assertThat(partitionWithMetrics).succeedsWithin(ofSeconds);
            verifyPartitionMetrics((Map) ((Collection) partitionWithMetrics.get()).stream().collect(Collectors.toMap(partitionWithMetrics2 -> {
                return partitionWithMetrics2.getPartition().getResultPartitionID();
            }, partitionWithMetrics3 -> {
                return partitionWithMetrics3;
            })), arrayList2);
            completableFuture2.complete(singletonList3);
            Assertions.assertThat((Tuple2) completableFuture.get()).isEqualTo(Tuple2.of(batchJobGraph.getJobID(), Collections.singleton(buildLocal3.getResultPartitionID())));
            CompletableFuture completableFuture3 = new CompletableFuture();
            registerSlotsAtJobMaster(1, jobMasterGateway, batchJobGraph.getJobID(), new TestingTaskExecutorGatewayBuilder().setRequestPartitionWithMetricsFunction(jobID6 -> {
                completableFuture3.complete(null);
                return completableFuture3;
            }).setAddress("tm5").createTestingTaskExecutorGateway(), new LocalUnresolvedTaskManagerLocation());
            Assertions.assertThatThrownBy(() -> {
                completableFuture3.get(ofSeconds.toMillis(), TimeUnit.MILLISECONDS);
            }).isInstanceOf(TimeoutException.class);
            if (createJobMaster != null) {
                createJobMaster.close();
            }
        } catch (Throwable th) {
            if (createJobMaster != null) {
                try {
                    createJobMaster.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private static void verifyPartitionMetrics(Map<ResultPartitionID, PartitionWithMetrics> map, Collection<PartitionWithMetrics> collection) {
        Map map2 = (Map) collection.stream().collect(Collectors.toMap(partitionWithMetrics -> {
            return partitionWithMetrics.getPartition().getResultPartitionID();
        }, partitionWithMetrics2 -> {
            return partitionWithMetrics2;
        }));
        Assertions.assertThat(map).hasSameSizeAs(map2);
        map.forEach((resultPartitionID, partitionWithMetrics3) -> {
            PartitionWithMetrics partitionWithMetrics3 = (PartitionWithMetrics) map2.get(resultPartitionID);
            Assertions.assertThat(partitionWithMetrics3).isNotNull();
            Assertions.assertThat(partitionWithMetrics3.getPartitionMetrics().getPartitionBytes().getSubpartitionBytes()).isEqualTo(partitionWithMetrics3.getPartitionMetrics().getPartitionBytes().getSubpartitionBytes());
            Assertions.assertThat(partitionWithMetrics3.getPartition().isUnknown()).isEqualTo(partitionWithMetrics3.getPartition().isUnknown());
            Assertions.assertThat(partitionWithMetrics3.getPartition().storesLocalResourcesOn()).isEqualTo(partitionWithMetrics3.getPartition().storesLocalResourcesOn());
        });
    }

    @Test
    void testBlockResourcesWillTriggerReleaseFreeSlots() throws Exception {
        JobVertex jobVertex = new JobVertex("jobVertex");
        jobVertex.setInvokableClass(NoOpInvokable.class);
        jobVertex.setParallelism(2);
        JobGraph batchJobGraph = JobGraphTestUtils.batchJobGraph(jobVertex);
        LocalUnresolvedTaskManagerLocation localUnresolvedTaskManagerLocation = new LocalUnresolvedTaskManagerLocation();
        CompletableFuture completableFuture = new CompletableFuture();
        CompletableFuture completableFuture2 = new CompletableFuture();
        TaskExecutorGateway createTestingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setFreeSlotFunction((allocationID, th) -> {
            if (!completableFuture.isDone()) {
                completableFuture.complete(allocationID);
            } else if (!completableFuture2.isDone()) {
                completableFuture2.complete(allocationID);
            }
            return CompletableFuture.completedFuture(Acknowledge.get());
        }).createTestingTaskExecutorGateway();
        JobMaster createJobMaster = new JobMasterBuilder(batchJobGraph, rpcService).withConfiguration(this.configuration).withHighAvailabilityServices(this.haServices).withHeartbeatServices(heartbeatServices).withBlocklistHandlerFactory(new DefaultBlocklistHandler.Factory(Duration.ofMillis(100L))).createJobMaster();
        try {
            createJobMaster.start();
            JobMasterGateway gateway = createJobMaster.getGateway();
            Assertions.assertThat(registerSlotsAtJobMaster(2, gateway, batchJobGraph.getJobID(), createTestingTaskExecutorGateway, localUnresolvedTaskManagerLocation)).hasSize(2);
            waitUntilAllExecutionsAreScheduledOrDeployed(gateway);
            gateway.updateTaskExecutionState(SchedulerTestingUtils.createFinishedTaskExecutionState(getExecutions(gateway).iterator().next().getAttemptId())).get();
            gateway.notifyNewBlockedNodes(Collections.singleton(new BlockedNode(localUnresolvedTaskManagerLocation.getNodeId(), "Test cause", System.currentTimeMillis()))).get();
            Assertions.assertThat(completableFuture).isDone();
            Assertions.assertThat(completableFuture2).isNotDone();
            if (createJobMaster != null) {
                createJobMaster.close();
            }
        } catch (Throwable th2) {
            if (createJobMaster != null) {
                try {
                    createJobMaster.close();
                } catch (Throwable th3) {
                    th2.addSuppressed(th3);
                }
            }
            throw th2;
        }
    }

    @Test
    void testNewlyAddedBlockedNodesWillBeSynchronizedToResourceManager() throws Exception {
        JobMaster createJobMaster = new JobMasterBuilder(jobGraph, rpcService).withConfiguration(this.configuration).withHighAvailabilityServices(this.haServices).withBlocklistHandlerFactory(new DefaultBlocklistHandler.Factory(Duration.ofMillis(100L))).createJobMaster();
        try {
            createJobMaster.start();
            JobMasterGateway gateway = createJobMaster.getGateway();
            CompletableFuture<Void> completableFuture = new CompletableFuture<>();
            CompletableFuture<Void> completableFuture2 = new CompletableFuture<>();
            CompletableFuture<Collection<BlockedNode>> completableFuture3 = new CompletableFuture<>();
            CompletableFuture<Collection<BlockedNode>> completableFuture4 = new CompletableFuture<>();
            CompletableFuture<Collection<BlockedNode>> completableFuture5 = new CompletableFuture<>();
            TestingResourceManagerGateway createResourceManagerGateway = createResourceManagerGateway(completableFuture3, completableFuture4, completableFuture);
            TestingResourceManagerGateway createResourceManagerGateway2 = createResourceManagerGateway(completableFuture5, new CompletableFuture<>(), completableFuture2);
            notifyResourceManagerLeaderListeners(createResourceManagerGateway);
            completableFuture.get();
            BlockedNode blockedNode = new BlockedNode("node1", "Test exception", Long.MAX_VALUE);
            gateway.notifyNewBlockedNodes(Collections.singleton(blockedNode)).get();
            Assertions.assertThat(completableFuture3.get()).containsExactly(new BlockedNode[]{blockedNode});
            notifyResourceManagerLeaderListeners(createResourceManagerGateway2);
            completableFuture2.get();
            BlockedNode blockedNode2 = new BlockedNode("node2", "Test exception", Long.MAX_VALUE);
            gateway.notifyNewBlockedNodes(Collections.singleton(blockedNode2)).get();
            Assertions.assertThat(completableFuture5.get()).containsExactlyInAnyOrder(new BlockedNode[]{blockedNode, blockedNode2});
            Assertions.assertThat(completableFuture4).isNotDone();
            if (createJobMaster != null) {
                createJobMaster.close();
            }
        } catch (Throwable th) {
            if (createJobMaster != null) {
                try {
                    createJobMaster.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testSuccessfulResourceRequirementsUpdate() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        TestingSchedulerNG.Builder newBuilder = TestingSchedulerNG.newBuilder();
        Objects.requireNonNull(completableFuture);
        JobMaster createJobMaster = new JobMasterBuilder(jobGraph, rpcService).withConfiguration(this.configuration).withHighAvailabilityServices(this.haServices).withSlotPoolServiceSchedulerFactory(DefaultSlotPoolServiceSchedulerFactory.create(TestingSlotPoolServiceBuilder.newBuilder(), new TestingSchedulerNGFactory(newBuilder.setUpdateJobResourceRequirementsConsumer((v1) -> {
            r1.complete(v1);
        }).build()))).createJobMaster();
        try {
            createJobMaster.start();
            JobMasterGateway selfGateway = createJobMaster.getSelfGateway(JobMasterGateway.class);
            JobResourceRequirements.Builder newBuilder2 = JobResourceRequirements.newBuilder();
            Iterator it = jobGraph.getVertices().iterator();
            while (it.hasNext()) {
                newBuilder2.setParallelismForJobVertex(((JobVertex) it.next()).getID(), 1, 2);
            }
            JobResourceRequirements build = newBuilder2.build();
            FlinkAssertions.assertThatFuture(selfGateway.updateJobResourceRequirements(build)).eventuallySucceeds();
            FlinkAssertions.assertThatFuture(completableFuture).eventuallySucceeds().isEqualTo(build);
            if (createJobMaster != null) {
                createJobMaster.close();
            }
        } catch (Throwable th) {
            if (createJobMaster != null) {
                try {
                    createJobMaster.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testResourceRequirementsAreRequestedFromTheScheduler() throws Exception {
        JobResourceRequirements empty = JobResourceRequirements.empty();
        JobMaster createJobMaster = new JobMasterBuilder(jobGraph, rpcService).withSlotPoolServiceSchedulerFactory(DefaultSlotPoolServiceSchedulerFactory.create(TestingSlotPoolServiceBuilder.newBuilder(), new TestingSchedulerNGFactory(TestingSchedulerNG.newBuilder().setRequestJobResourceRequirementsSupplier(() -> {
            return empty;
        }).build()))).createJobMaster();
        try {
            createJobMaster.start();
            FlinkAssertions.assertThatFuture(createJobMaster.getSelfGateway(JobMasterGateway.class).requestJobResourceRequirements()).eventuallySucceeds().isEqualTo(empty);
            if (createJobMaster != null) {
                createJobMaster.close();
            }
        } catch (Throwable th) {
            if (createJobMaster != null) {
                try {
                    createJobMaster.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    void testRetrievingCheckpointStats() throws Exception {
        JobGraph createJobGraphWithCheckpointing = createJobGraphWithCheckpointing(2, SavepointRestoreSettings.forPath(createSavepoint(42L).getAbsolutePath(), true));
        StandaloneCompletedCheckpointStore standaloneCompletedCheckpointStore = new StandaloneCompletedCheckpointStore(1);
        this.haServices.setCheckpointRecoveryFactory(PerJobCheckpointRecoveryFactory.withoutCheckpointStoreRecovery(i -> {
            return standaloneCompletedCheckpointStore;
        }));
        JobMaster createJobMaster = new JobMasterBuilder(createJobGraphWithCheckpointing, rpcService).withHighAvailabilityServices(this.haServices).createJobMaster();
        try {
            createJobMaster.start();
            JobMasterGateway gateway = createJobMaster.getGateway();
            registerSlotsAtJobMaster(2, gateway, createJobGraphWithCheckpointing.getJobID(), new TestingTaskExecutorGatewayBuilder().setAddress("firstTaskManager").createTestingTaskExecutorGateway(), new LocalUnresolvedTaskManagerLocation());
            CommonTestUtils.waitUntilCondition(() -> {
                return Boolean.valueOf(gateway.requestJobStatus(testingTimeout).get() == JobStatus.RUNNING);
            });
            Assertions.assertThat(((CheckpointStatsSnapshot) createJobMaster.getGateway().requestCheckpointStats(testingTimeout).get()).getLatestRestoredCheckpoint().getCheckpointId()).isEqualTo(42L);
            if (createJobMaster != null) {
                createJobMaster.close();
            }
        } catch (Throwable th) {
            if (createJobMaster != null) {
                try {
                    createJobMaster.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private TestingResourceManagerGateway createResourceManagerGateway(CompletableFuture<Collection<BlockedNode>> completableFuture, CompletableFuture<Collection<BlockedNode>> completableFuture2, CompletableFuture<Void> completableFuture3) {
        RpcGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
        rpcService.registerGateway(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway);
        testingResourceManagerGateway.setNotifyNewBlockedNodesFunction(collection -> {
            if (!completableFuture.isDone()) {
                completableFuture.complete(collection);
            } else if (!completableFuture2.isDone()) {
                completableFuture2.complete(collection);
            }
            return CompletableFuture.completedFuture(Acknowledge.get());
        });
        testingResourceManagerGateway.setRegisterJobManagerFunction((jobMasterId, resourceID, str, jobID) -> {
            completableFuture3.complete(null);
            return CompletableFuture.completedFuture(testingResourceManagerGateway.getJobMasterRegistrationSuccess());
        });
        return testingResourceManagerGateway;
    }

    private void runJobFailureWhenTaskExecutorTerminatesTest(HeartbeatServices heartbeatServices2, BiConsumer<LocalUnresolvedTaskManagerLocation, JobMasterGateway> biConsumer) throws Exception {
        JobGraph singleNoOpJobGraph = JobGraphTestUtils.singleNoOpJobGraph();
        JobMasterBuilder.TestingOnCompletionActions testingOnCompletionActions = new JobMasterBuilder.TestingOnCompletionActions();
        JobMaster createJobMaster = new JobMasterBuilder(singleNoOpJobGraph, rpcService).withResourceId(this.jmResourceId).withHighAvailabilityServices(this.haServices).withHeartbeatServices(heartbeatServices2).withOnCompletionActions(testingOnCompletionActions).createJobMaster();
        try {
            createJobMaster.start();
            JobMasterGateway jobMasterGateway = (JobMasterGateway) createJobMaster.getSelfGateway(JobMasterGateway.class);
            LocalUnresolvedTaskManagerLocation localUnresolvedTaskManagerLocation = new LocalUnresolvedTaskManagerLocation();
            CompletableFuture completableFuture = new CompletableFuture();
            Assertions.assertThat(registerSlotsAtJobMaster(1, jobMasterGateway, singleNoOpJobGraph.getJobID(), new TestingTaskExecutorGatewayBuilder().setSubmitTaskConsumer((taskDeploymentDescriptor, jobMasterId) -> {
                completableFuture.complete(taskDeploymentDescriptor.getExecutionAttemptId());
                return CompletableFuture.completedFuture(Acknowledge.get());
            }).createTestingTaskExecutorGateway(), localUnresolvedTaskManagerLocation)).hasSize(1);
            ExecutionAttemptID executionAttemptID = (ExecutionAttemptID) completableFuture.get();
            jobMasterGateway.updateTaskExecutionState(new TaskExecutionState(executionAttemptID, ExecutionState.INITIALIZING)).get();
            jobMasterGateway.updateTaskExecutionState(new TaskExecutionState(executionAttemptID, ExecutionState.RUNNING)).get();
            biConsumer.accept(localUnresolvedTaskManagerLocation, jobMasterGateway);
            Assertions.assertThat(testingOnCompletionActions.getJobReachedGloballyTerminalStateFuture().get().getArchivedExecutionGraph().getState()).isEqualTo(JobStatus.FAILED);
            if (createJobMaster != null) {
                createJobMaster.close();
            }
        } catch (Throwable th) {
            if (createJobMaster != null) {
                try {
                    createJobMaster.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private Collection<SlotOffer> registerSlotsAtJobMaster(int i, JobMasterGateway jobMasterGateway, JobID jobID, TaskExecutorGateway taskExecutorGateway, UnresolvedTaskManagerLocation unresolvedTaskManagerLocation) throws ExecutionException, InterruptedException {
        rpcService.registerGateway(taskExecutorGateway.getAddress(), taskExecutorGateway);
        jobMasterGateway.registerTaskManager(jobID, TaskManagerRegistrationInformation.create(taskExecutorGateway.getAddress(), unresolvedTaskManagerLocation, TestingUtils.zeroUUID()), testingTimeout).get();
        return (Collection) jobMasterGateway.offerSlots(unresolvedTaskManagerLocation.getResourceID(), (Collection) IntStream.range(0, i).mapToObj(i2 -> {
            return new SlotOffer(new AllocationID(), i2, ResourceProfile.ANY);
        }).collect(Collectors.toList()), testingTimeout).get();
    }

    private JobGraph producerConsumerJobGraph() {
        JobVertex jobVertex = new JobVertex("Producer");
        jobVertex.setInvokableClass(NoOpInvokable.class);
        jobVertex.setParallelism(1);
        JobVertex jobVertex2 = new JobVertex("Consumer");
        jobVertex2.setInvokableClass(NoOpInvokable.class);
        jobVertex2.setParallelism(1);
        JobVertexConnectionUtils.connectNewDataSetAsInput(jobVertex2, jobVertex, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
        return JobGraphTestUtils.batchJobGraph(jobVertex, jobVertex2);
    }

    private File createSavepoint(long j) throws IOException {
        return TestUtils.createSavepointWithOperatorState(Files.createTempFile(this.temporaryFolder, UUID.randomUUID().toString(), "", new FileAttribute[0]).toFile(), j, new OperatorID[0]);
    }

    private JobGraph createJobGraphWithCheckpointing(SavepointRestoreSettings savepointRestoreSettings) {
        return createJobGraphWithCheckpointing(1, savepointRestoreSettings);
    }

    private JobGraph createJobGraphWithCheckpointing(int i, SavepointRestoreSettings savepointRestoreSettings) {
        JobVertex jobVertex = new JobVertex("source");
        jobVertex.setInvokableClass(NoOpInvokable.class);
        jobVertex.setParallelism(i);
        return TestUtils.createJobGraphFromJobVerticesWithCheckpointing(savepointRestoreSettings, jobVertex);
    }

    private JobGraph createSingleVertexJobWithRestartStrategy() throws IOException {
        JobGraph singleNoOpJobGraph = JobGraphTestUtils.singleNoOpJobGraph();
        RestartStrategyUtils.configureFixedDelayRestartStrategy(singleNoOpJobGraph, Integer.MAX_VALUE, 0L);
        return singleNoOpJobGraph;
    }

    private static void registerSlotsRequiredForJobExecution(JobMasterGateway jobMasterGateway, JobID jobID, int i) throws ExecutionException, InterruptedException {
        JobMasterTestUtils.registerTaskExecutorAndOfferSlots(rpcService, jobMasterGateway, jobID, i, new TestingTaskExecutorGatewayBuilder().setCancelTaskFunction(executionAttemptID -> {
            jobMasterGateway.updateTaskExecutionState(new TaskExecutionState(executionAttemptID, ExecutionState.CANCELED));
            return CompletableFuture.completedFuture(Acknowledge.get());
        }).createTestingTaskExecutorGateway(), testingTimeout);
    }
}
