/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.taskexecutor;

import java.io.File;
import java.util.Collection;
import java.util.HashSet;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.StateRecoveryOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.testutils.CustomExtension;
import org.apache.flink.core.testutils.EachCallbackWrapper;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.entrypoint.WorkingDirectory;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway;
import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.rpc.TestingRpcServiceExtension;
import org.apache.flink.runtime.taskexecutor.SlotReport;
import org.apache.flink.runtime.taskexecutor.SlotStatus;
import org.apache.flink.runtime.taskexecutor.TaskExecutor;
import org.apache.flink.runtime.taskexecutor.TaskExecutorBuilder;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.api.io.TempDir;

class TaskExecutorRecoveryTest {
    private final TestingRpcServiceExtension rpcServiceExtension = new TestingRpcServiceExtension();
    @RegisterExtension
    private final EachCallbackWrapper<TestingRpcServiceExtension> eachWrapper = new EachCallbackWrapper((CustomExtension)this.rpcServiceExtension);

    TaskExecutorRecoveryTest() {
    }

    @Test
    void testRecoveredTaskExecutorWillRestoreAllocationState(@TempDir File tempDir) throws Exception {
        ResourceID resourceId = ResourceID.generate();
        Configuration configuration = new Configuration();
        configuration.set(TaskManagerOptions.NUM_TASK_SLOTS, (Object)2);
        configuration.set(StateRecoveryOptions.LOCAL_RECOVERY, (Object)true);
        TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
        ArrayBlockingQueue queue = new ArrayBlockingQueue(2);
        testingResourceManagerGateway.setSendSlotReportFunction(slotReportInformation -> {
            queue.offer(TaskExecutorSlotReport.create((ResourceID)slotReportInformation.f0, (SlotReport)slotReportInformation.f2));
            return CompletableFuture.completedFuture(Acknowledge.get());
        });
        TestingRpcService rpcService = this.rpcServiceExtension.getTestingRpcService();
        rpcService.registerGateway(testingResourceManagerGateway.getAddress(), (RpcGateway)testingResourceManagerGateway);
        JobID jobId = new JobID();
        TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices();
        highAvailabilityServices.setResourceManagerLeaderRetriever(new SettableLeaderRetrievalService(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway.getFencingToken().toUUID()));
        SettableLeaderRetrievalService jobMasterLeaderRetriever = new SettableLeaderRetrievalService();
        highAvailabilityServices.setJobMasterLeaderRetriever(jobId, jobMasterLeaderRetriever);
        WorkingDirectory workingDirectory = WorkingDirectory.create((File)tempDir);
        TaskExecutor taskExecutor = TaskExecutorBuilder.newBuilder(rpcService, highAvailabilityServices, workingDirectory).setConfiguration(configuration).setResourceId(resourceId).build();
        taskExecutor.start();
        TaskExecutorGateway taskExecutorGateway = (TaskExecutorGateway)taskExecutor.getSelfGateway(TaskExecutorGateway.class);
        TaskExecutorSlotReport taskExecutorSlotReport = (TaskExecutorSlotReport)queue.take();
        SlotReport slotReport = taskExecutorSlotReport.getSlotReport();
        MatcherAssert.assertThat((Object)slotReport.getNumSlotStatus(), (Matcher)Matchers.is((Object)2));
        SlotStatus slotStatus = (SlotStatus)slotReport.iterator().next();
        SlotID allocatedSlotID = slotStatus.getSlotID();
        AllocationID allocationId = new AllocationID();
        taskExecutorGateway.requestSlot(allocatedSlotID, jobId, allocationId, slotStatus.getResourceProfile(), "localhost", testingResourceManagerGateway.getFencingToken(), Time.seconds((long)10L)).join();
        taskExecutor.close();
        ArrayBlockingQueue offeredSlots = new ArrayBlockingQueue(1);
        TestingJobMasterGateway jobMasterGateway = new TestingJobMasterGatewayBuilder().setOfferSlotsFunction((resourceID, slotOffers) -> {
            offeredSlots.offer(new HashSet(slotOffers));
            return CompletableFuture.completedFuture(slotOffers);
        }).build();
        rpcService.registerGateway(jobMasterGateway.getAddress(), (RpcGateway)jobMasterGateway);
        jobMasterLeaderRetriever.notifyListener(jobMasterGateway.getAddress(), jobMasterGateway.getFencingToken().toUUID());
        TaskExecutor recoveredTaskExecutor = TaskExecutorBuilder.newBuilder(rpcService, highAvailabilityServices, workingDirectory).setConfiguration(configuration).setResourceId(resourceId).build();
        recoveredTaskExecutor.start();
        TaskExecutorSlotReport recoveredSlotReport = (TaskExecutorSlotReport)queue.take();
        for (SlotStatus status : recoveredSlotReport.getSlotReport()) {
            if (status.getSlotID().equals((Object)allocatedSlotID)) {
                MatcherAssert.assertThat((Object)status.getJobID(), (Matcher)Matchers.is((Object)jobId));
                MatcherAssert.assertThat((Object)status.getAllocationID(), (Matcher)Matchers.is((Object)allocationId));
                continue;
            }
            MatcherAssert.assertThat((Object)status.getJobID(), (Matcher)Matchers.is((Matcher)Matchers.nullValue()));
        }
        Collection take = (Collection)offeredSlots.take();
        MatcherAssert.assertThat((Object)take, (Matcher)Matchers.hasSize((int)1));
        SlotOffer offeredSlot = (SlotOffer)take.iterator().next();
        MatcherAssert.assertThat((Object)offeredSlot.getAllocationId(), (Matcher)Matchers.is((Object)allocationId));
    }

    private static final class TaskExecutorSlotReport {
        private final ResourceID taskExecutorResourceId;
        private final SlotReport slotReport;

        private TaskExecutorSlotReport(ResourceID taskExecutorResourceId, SlotReport slotReport) {
            this.taskExecutorResourceId = taskExecutorResourceId;
            this.slotReport = slotReport;
        }

        public ResourceID getTaskExecutorResourceId() {
            return this.taskExecutorResourceId;
        }

        public SlotReport getSlotReport() {
            return this.slotReport;
        }

        public static TaskExecutorSlotReport create(ResourceID taskExecutorResourceId, SlotReport slotReport) {
            return new TaskExecutorSlotReport(taskExecutorResourceId, slotReport);
        }
    }
}

