/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.taskexecutor;

import java.io.File;
import java.net.InetAddress;
import java.nio.file.Path;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Nonnull;
import org.apache.flink.configuration.ClusterOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.configuration.TaskManagerOptionsInternal;
import org.apache.flink.core.plugin.PluginManager;
import org.apache.flink.core.plugin.PluginUtils;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.runtime.blob.BlobCacheService;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.entrypoint.ClusterEntrypointUtils;
import org.apache.flink.runtime.entrypoint.WorkingDirectory;
import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.security.token.DelegationTokenReceiverRepository;
import org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils;
import org.apache.flink.runtime.taskexecutor.TaskManagerRunner;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorService;
import org.apache.flink.testutils.junit.utils.TempDirUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TimeUtils;
import org.assertj.core.api.AbstractFileAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

class TaskManagerRunnerTest {
    @TempDir
    private Path temporaryFolder;
    private TaskManagerRunner taskManagerRunner;

    TaskManagerRunnerTest() {
    }

    @AfterEach
    void after() throws Exception {
        if (this.taskManagerRunner != null) {
            this.taskManagerRunner.close();
        }
    }

    @Test
    void testShouldShutdownOnFatalError() throws Exception {
        Configuration configuration = TaskManagerRunnerTest.createConfiguration();
        configuration.set(TaskManagerOptions.REGISTRATION_TIMEOUT, (Object)TimeUtils.parseDuration((String)"42 h"));
        this.taskManagerRunner = TaskManagerRunnerTest.createTaskManagerRunner(configuration);
        this.taskManagerRunner.onFatalError((Throwable)new RuntimeException());
        FlinkAssertions.assertThatFuture((CompletableFuture)this.taskManagerRunner.getTerminationFuture()).eventuallySucceeds().isEqualTo((Object)TaskManagerRunner.Result.FAILURE);
    }

    @Test
    void testShouldShutdownIfRegistrationWithJobManagerFails() throws Exception {
        Configuration configuration = TaskManagerRunnerTest.createConfiguration();
        configuration.set(TaskManagerOptions.REGISTRATION_TIMEOUT, (Object)TimeUtils.parseDuration((String)"10 ms"));
        this.taskManagerRunner = TaskManagerRunnerTest.createTaskManagerRunner(configuration);
        FlinkAssertions.assertThatFuture((CompletableFuture)this.taskManagerRunner.getTerminationFuture()).eventuallySucceeds().isEqualTo((Object)TaskManagerRunner.Result.FAILURE);
    }

    @Test
    void testGenerateTaskManagerResourceIDWithMetaData() throws Exception {
        Configuration configuration = TaskManagerRunnerTest.createConfiguration();
        String metadata = "test";
        configuration.set(TaskManagerOptionsInternal.TASK_MANAGER_RESOURCE_ID_METADATA, (Object)"test");
        ResourceID taskManagerResourceID = (ResourceID)TaskManagerRunner.getTaskManagerResourceID((Configuration)configuration, (String)"", (int)-1).unwrap();
        Assertions.assertThat((String)taskManagerResourceID.getMetadata()).isEqualTo("test");
    }

    @Test
    void testGenerateTaskManagerResourceIDWithoutMetaData() throws Exception {
        Configuration configuration = TaskManagerRunnerTest.createConfiguration();
        String resourceID = "test";
        configuration.set(TaskManagerOptions.TASK_MANAGER_RESOURCE_ID, (Object)"test");
        ResourceID taskManagerResourceID = (ResourceID)TaskManagerRunner.getTaskManagerResourceID((Configuration)configuration, (String)"", (int)-1).unwrap();
        Assertions.assertThat((String)taskManagerResourceID.getMetadata()).isEmpty();
        Assertions.assertThat((String)taskManagerResourceID.getStringWithMetadata()).isEqualTo("test");
    }

    @Test
    void testGenerateTaskManagerResourceIDWithConfig() throws Exception {
        Configuration configuration = TaskManagerRunnerTest.createConfiguration();
        String resourceID = "test";
        configuration.set(TaskManagerOptions.TASK_MANAGER_RESOURCE_ID, (Object)"test");
        ResourceID taskManagerResourceID = (ResourceID)TaskManagerRunner.getTaskManagerResourceID((Configuration)configuration, (String)"", (int)-1).unwrap();
        Assertions.assertThat((String)taskManagerResourceID.getResourceIdString()).isEqualTo("test");
    }

    @Test
    void testGenerateTaskManagerResourceIDWithRemoteRpcService() throws Exception {
        Configuration configuration = TaskManagerRunnerTest.createConfiguration();
        String rpcAddress = "flink";
        int rpcPort = 9090;
        ResourceID taskManagerResourceID = (ResourceID)TaskManagerRunner.getTaskManagerResourceID((Configuration)configuration, (String)"flink", (int)9090).unwrap();
        Assertions.assertThat((Object)taskManagerResourceID).isNotNull();
        Assertions.assertThat((String)taskManagerResourceID.getResourceIdString()).contains(new CharSequence[]{"flink:9090"});
    }

    @Test
    void testGenerateTaskManagerResourceIDWithLocalRpcService() throws Exception {
        Configuration configuration = TaskManagerRunnerTest.createConfiguration();
        String rpcAddress = "";
        int rpcPort = -1;
        ResourceID taskManagerResourceID = (ResourceID)TaskManagerRunner.getTaskManagerResourceID((Configuration)configuration, (String)"", (int)-1).unwrap();
        Assertions.assertThat((Object)taskManagerResourceID).isNotNull();
        Assertions.assertThat((String)taskManagerResourceID.getResourceIdString()).contains(new CharSequence[]{InetAddress.getLocalHost().getHostName()});
    }

    @Test
    void testUnexpectedTaskManagerTerminationFailsRunnerFatally() throws Exception {
        CompletableFuture<Void> terminationFuture = new CompletableFuture<Void>();
        TestingTaskExecutorService taskExecutorService = TestingTaskExecutorService.newBuilder().setTerminationFuture(terminationFuture).build();
        TaskManagerRunner taskManagerRunner = TaskManagerRunnerTest.createTaskManagerRunner(TaskManagerRunnerTest.createConfiguration(), this.createTaskExecutorServiceFactory(taskExecutorService));
        terminationFuture.complete(null);
        FlinkAssertions.assertThatFuture((CompletableFuture)taskManagerRunner.getTerminationFuture()).eventuallySucceeds().isEqualTo((Object)TaskManagerRunner.Result.FAILURE);
    }

    @Test
    void testUnexpectedTaskManagerTerminationAfterRunnerCloseWillBeIgnored() throws Exception {
        CompletableFuture<Void> terminationFuture = new CompletableFuture<Void>();
        TestingTaskExecutorService taskExecutorService = TestingTaskExecutorService.newBuilder().setTerminationFuture(terminationFuture).withManualTerminationFutureCompletion().build();
        TaskManagerRunner taskManagerRunner = TaskManagerRunnerTest.createTaskManagerRunner(TaskManagerRunnerTest.createConfiguration(), this.createTaskExecutorServiceFactory(taskExecutorService));
        taskManagerRunner.closeAsync();
        terminationFuture.complete(null);
        FlinkAssertions.assertThatFuture((CompletableFuture)taskManagerRunner.getTerminationFuture()).eventuallySucceeds().isEqualTo((Object)TaskManagerRunner.Result.SUCCESS);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testWorkingDirIsSetupWhenStartingTaskManagerRunner() throws Exception {
        File workingDirBase = TempDirUtils.newFolder((Path)this.temporaryFolder);
        ResourceID taskManagerResourceId = new ResourceID("foobar");
        Configuration configuration = this.createConfigurationWithWorkingDirectory(workingDirBase, taskManagerResourceId);
        File workingDir = ClusterEntrypointUtils.generateTaskManagerWorkingDirectoryFile((Configuration)configuration, (ResourceID)taskManagerResourceId);
        try (TaskManagerRunner taskManagerRunner = TaskManagerRunnerTest.createTaskManagerRunner(configuration);){
            Assertions.assertThat((File)workingDir).exists();
        }
        ((AbstractFileAssert)Assertions.assertThat((File)workingDir).withFailMessage("The working dir should be cleaned up when stopping the TaskManager process gracefully.", new Object[0])).doesNotExist();
    }

    @Nonnull
    private Configuration createConfigurationWithWorkingDirectory(File workingDirBase, ResourceID taskManagerResourceId) {
        Configuration configuration = TaskManagerRunnerTest.createConfiguration();
        configuration.set(ClusterOptions.PROCESS_WORKING_DIR_BASE, (Object)workingDirBase.getAbsolutePath());
        configuration.set(TaskManagerOptions.TASK_MANAGER_RESOURCE_ID, (Object)taskManagerResourceId.toString());
        return configuration;
    }

    @Test
    void testWorkingDirIsNotDeletedInCaseOfFailure() throws Exception {
        File workingDirBase = TempDirUtils.newFolder((Path)this.temporaryFolder);
        ResourceID resourceId = ResourceID.generate();
        Configuration configuration = this.createConfigurationWithWorkingDirectory(workingDirBase, resourceId);
        TaskManagerRunner taskManagerRunner = TaskManagerRunnerTest.createTaskManagerRunner(configuration, new TestingFailingTaskExecutorServiceFactory());
        taskManagerRunner.getTerminationFuture().join();
        Assertions.assertThat((File)ClusterEntrypointUtils.generateTaskManagerWorkingDirectoryFile((Configuration)configuration, (ResourceID)resourceId)).exists();
    }

    @Nonnull
    private TaskManagerRunner.TaskExecutorServiceFactory createTaskExecutorServiceFactory(TestingTaskExecutorService taskExecutorService) {
        return (configuration, resourceID, rpcService, highAvailabilityServices, heartbeatServices, metricRegistry, blobCacheService, localCommunicationOnly, externalResourceInfoProvider, workingDirectory, fatalErrorHandler, delegationTokenReceiverRepository) -> taskExecutorService;
    }

    private static Configuration createConfiguration() {
        Configuration configuration = new Configuration();
        configuration.set(JobManagerOptions.ADDRESS, (Object)"localhost");
        configuration.set(TaskManagerOptions.HOST, (Object)"localhost");
        return TaskExecutorResourceUtils.adjustForLocalExecution((Configuration)configuration);
    }

    private static TaskManagerRunner createTaskManagerRunner(Configuration configuration) throws Exception {
        return TaskManagerRunnerTest.createTaskManagerRunner(configuration, TaskManagerRunner::createTaskExecutorService);
    }

    private static TaskManagerRunner createTaskManagerRunner(Configuration configuration, TaskManagerRunner.TaskExecutorServiceFactory taskExecutorServiceFactory) throws Exception {
        PluginManager pluginManager = PluginUtils.createPluginManagerFromRootFolder((Configuration)configuration);
        TaskManagerRunner taskManagerRunner = new TaskManagerRunner(configuration, pluginManager, taskExecutorServiceFactory);
        taskManagerRunner.start();
        return taskManagerRunner;
    }

    private static class TestingFailingTaskExecutorServiceFactory
    implements TaskManagerRunner.TaskExecutorServiceFactory {
        private TestingFailingTaskExecutorServiceFactory() {
        }

        public TaskManagerRunner.TaskExecutorService createTaskExecutor(Configuration configuration, ResourceID resourceID, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, MetricRegistry metricRegistry, BlobCacheService blobCacheService, boolean localCommunicationOnly, ExternalResourceInfoProvider externalResourceInfoProvider, WorkingDirectory workingDirectory, FatalErrorHandler fatalErrorHandler, DelegationTokenReceiverRepository delegationTokenReceiverRepository) {
            return TestingTaskExecutorService.newBuilder().setStartRunnable(() -> fatalErrorHandler.onFatalError((Throwable)new FlinkException("Cannot instantiate the TaskExecutorService."))).build();
        }
    }
}

