/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.jobmaster;

import java.io.IOException;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.io.network.partition.TestingJobMasterPartitionTracker;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.utils.JobGraphTestUtils;
import org.apache.flink.runtime.jobmaster.JobMaster;
import org.apache.flink.runtime.jobmaster.JobMasterGateway;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.jobmaster.TestingJobManagerSharedServicesBuilder;
import org.apache.flink.runtime.jobmaster.utils.JobMasterBuilder;
import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.LocalUnresolvedTaskManagerLocation;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.taskmanager.UnresolvedTaskManagerLocation;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class JobMasterPartitionReleaseTest
extends TestLogger {
    @ClassRule
    public static TemporaryFolder temporaryFolder = new TemporaryFolder();
    private static final Time testingTimeout = Time.seconds((long)10L);
    private static TestingRpcService rpcService;
    private TestingFatalErrorHandler testingFatalErrorHandler;

    @BeforeClass
    public static void setupClass() {
        rpcService = new TestingRpcService();
    }

    @Before
    public void setup() throws IOException {
        this.testingFatalErrorHandler = new TestingFatalErrorHandler();
    }

    @After
    public void teardown() throws Exception {
        if (this.testingFatalErrorHandler != null) {
            this.testingFatalErrorHandler.rethrowError();
        }
        rpcService.clearGateways();
    }

    @AfterClass
    public static void teardownClass() {
        if (rpcService != null) {
            rpcService.stopService();
            rpcService = null;
        }
    }

    @Test
    public void testPartitionTableCleanupOnDisconnect() throws Exception {
        CompletableFuture disconnectTaskExecutorFuture = new CompletableFuture();
        TestingTaskExecutorGateway testingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setDisconnectJobManagerConsumer((jobID, throwable) -> disconnectTaskExecutorFuture.complete(jobID)).createTestingTaskExecutorGateway();
        try (TestSetup testSetup = new TestSetup(rpcService, this.testingFatalErrorHandler, testingTaskExecutorGateway);){
            JobMasterGateway jobMasterGateway = (JobMasterGateway)testSetup.jobMaster.getSelfGateway(JobMasterGateway.class);
            jobMasterGateway.disconnectTaskManager(testSetup.getTaskExecutorResourceID(), new Exception("test"));
            disconnectTaskExecutorFuture.get();
            MatcherAssert.assertThat((Object)testSetup.getStopTrackingPartitionsTargetResourceId().get(), (Matcher)Matchers.equalTo((Object)testSetup.getTaskExecutorResourceID()));
        }
    }

    @Test
    public void testPartitionReleaseOrPromotionOnJobSuccess() throws Exception {
        this.testPartitionReleaseOrPromotionOnJobTermination(TestSetup::getReleaseOrPromotePartitionsTargetResourceId, ExecutionState.FINISHED);
    }

    @Test
    public void testPartitionReleaseOrPromotionOnJobFailure() throws Exception {
        this.testPartitionReleaseOrPromotionOnJobTermination(TestSetup::getReleasePartitionsTargetResourceId, ExecutionState.FAILED);
    }

    private void testPartitionReleaseOrPromotionOnJobTermination(Function<TestSetup, CompletableFuture<ResourceID>> taskExecutorCallSelector, ExecutionState finalExecutionState) throws Exception {
        CompletableFuture taskDeploymentDescriptorFuture = new CompletableFuture();
        TestingTaskExecutorGateway testingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setSubmitTaskConsumer((tdd, ignored) -> {
            taskDeploymentDescriptorFuture.complete(tdd);
            return CompletableFuture.completedFuture(Acknowledge.get());
        }).createTestingTaskExecutorGateway();
        try (TestSetup testSetup = new TestSetup(rpcService, this.testingFatalErrorHandler, testingTaskExecutorGateway);){
            JobMasterGateway jobMasterGateway = testSetup.getJobMasterGateway();
            TaskDeploymentDescriptor taskDeploymentDescriptor = (TaskDeploymentDescriptor)taskDeploymentDescriptorFuture.get();
            jobMasterGateway.updateTaskExecutionState(new TaskExecutionState(taskDeploymentDescriptor.getJobId(), taskDeploymentDescriptor.getExecutionAttemptId(), finalExecutionState));
            MatcherAssert.assertThat((Object)taskExecutorCallSelector.apply(testSetup).get(), (Matcher)Matchers.equalTo((Object)testSetup.getTaskExecutorResourceID()));
        }
    }

    private static class TestSetup
    implements AutoCloseable {
        private final TemporaryFolder temporaryFolder = new TemporaryFolder();
        private final LocalUnresolvedTaskManagerLocation localTaskManagerUnresolvedLocation = new LocalUnresolvedTaskManagerLocation();
        private final CompletableFuture<ResourceID> taskExecutorIdForStopTracking = new CompletableFuture();
        private final CompletableFuture<ResourceID> taskExecutorIdForPartitionRelease = new CompletableFuture();
        private final CompletableFuture<ResourceID> taskExecutorIdForPartitionReleaseOrPromote = new CompletableFuture();
        private JobMaster jobMaster;

        public TestSetup(TestingRpcService rpcService, FatalErrorHandler fatalErrorHandler, TaskExecutorGateway taskExecutorGateway) throws Exception {
            this.temporaryFolder.create();
            TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices();
            haServices.setCheckpointRecoveryFactory((CheckpointRecoveryFactory)new StandaloneCheckpointRecoveryFactory());
            SettableLeaderRetrievalService rmLeaderRetrievalService = new SettableLeaderRetrievalService(null, null);
            haServices.setResourceManagerLeaderRetriever(rmLeaderRetrievalService);
            TestingJobMasterPartitionTracker partitionTracker = new TestingJobMasterPartitionTracker();
            partitionTracker.setStopTrackingAllPartitionsConsumer(this.taskExecutorIdForStopTracking::complete);
            partitionTracker.setStopTrackingAndReleaseAllPartitionsConsumer(this.taskExecutorIdForPartitionRelease::complete);
            partitionTracker.setStopTrackingAndReleaseOrPromotePartitionsConsumer(this.taskExecutorIdForPartitionReleaseOrPromote::complete);
            Configuration configuration = new Configuration();
            configuration.setString(BlobServerOptions.STORAGE_DIRECTORY, this.temporaryFolder.newFolder().getAbsolutePath());
            HeartbeatServices heartbeatServices = new HeartbeatServices(1000L, 5000000L);
            JobGraph jobGraph = JobGraphTestUtils.createSingleVertexJobGraph();
            this.jobMaster = new JobMasterBuilder(jobGraph, (RpcService)rpcService).withConfiguration(configuration).withHighAvailabilityServices(haServices).withJobManagerSharedServices(new TestingJobManagerSharedServicesBuilder().build()).withFatalErrorHandler(fatalErrorHandler).withHeartbeatServices(heartbeatServices).withPartitionTrackerFactory(ignored -> partitionTracker).createJobMaster();
            this.jobMaster.start(JobMasterId.generate()).get();
            this.registerTaskExecutorAtJobMaster(rpcService, this.getJobMasterGateway(), jobGraph.getJobID(), taskExecutorGateway, rmLeaderRetrievalService);
        }

        private void registerTaskExecutorAtJobMaster(TestingRpcService rpcService, JobMasterGateway jobMasterGateway, JobID jobId, TaskExecutorGateway taskExecutorGateway, SettableLeaderRetrievalService rmLeaderRetrievalService) throws ExecutionException, InterruptedException {
            AllocationIdsResourceManagerGateway resourceManagerGateway = new AllocationIdsResourceManagerGateway();
            rpcService.registerGateway(resourceManagerGateway.getAddress(), (RpcGateway)resourceManagerGateway);
            rmLeaderRetrievalService.notifyListener(resourceManagerGateway.getAddress(), resourceManagerGateway.getFencingToken().toUUID());
            rpcService.registerGateway(taskExecutorGateway.getAddress(), (RpcGateway)taskExecutorGateway);
            jobMasterGateway.registerTaskManager(taskExecutorGateway.getAddress(), (UnresolvedTaskManagerLocation)this.localTaskManagerUnresolvedLocation, jobId, testingTimeout).get();
            AllocationID allocationId = resourceManagerGateway.takeAllocationId();
            Set<SlotOffer> slotOffers = Collections.singleton(new SlotOffer(allocationId, 0, ResourceProfile.UNKNOWN));
            jobMasterGateway.offerSlots(this.localTaskManagerUnresolvedLocation.getResourceID(), slotOffers, testingTimeout).get();
        }

        public JobMasterGateway getJobMasterGateway() {
            return (JobMasterGateway)this.jobMaster.getSelfGateway(JobMasterGateway.class);
        }

        public ResourceID getTaskExecutorResourceID() {
            return this.localTaskManagerUnresolvedLocation.getResourceID();
        }

        public CompletableFuture<ResourceID> getStopTrackingPartitionsTargetResourceId() {
            return this.taskExecutorIdForStopTracking;
        }

        public CompletableFuture<ResourceID> getReleasePartitionsTargetResourceId() {
            return this.taskExecutorIdForPartitionRelease;
        }

        public CompletableFuture<ResourceID> getReleaseOrPromotePartitionsTargetResourceId() {
            return this.taskExecutorIdForPartitionReleaseOrPromote;
        }

        @Override
        public void close() throws Exception {
            try {
                if (this.jobMaster != null) {
                    RpcUtils.terminateRpcEndpoint((RpcEndpoint)this.jobMaster, (Time)testingTimeout);
                }
            }
            finally {
                this.temporaryFolder.delete();
            }
        }
    }

    private static final class AllocationIdsResourceManagerGateway
    extends TestingResourceManagerGateway {
        private final BlockingQueue<AllocationID> allocationIds = new ArrayBlockingQueue<AllocationID>(10);

        private AllocationIdsResourceManagerGateway() {
            this.setRequestSlotConsumer(slotRequest -> this.allocationIds.offer(slotRequest.getAllocationId()));
        }

        AllocationID takeAllocationId() {
            try {
                return this.allocationIds.take();
            }
            catch (InterruptedException e) {
                ExceptionUtils.rethrow((Throwable)e);
                return null;
            }
        }
    }
}

