/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.taskexecutor;

import java.io.File;
import java.io.IOException;
import java.io.PrintWriter;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.attribute.FileAttribute;
import java.time.Duration;
import java.util.UUID;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.RpcOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.configuration.TaskManagerOptionsInternal;
import org.apache.flink.configuration.UnmodifiableConfiguration;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.entrypoint.FlinkParseException;
import org.apache.flink.runtime.entrypoint.WorkingDirectory;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.runtime.rest.util.NoOpFatalErrorHandler;
import org.apache.flink.runtime.rpc.AddressResolution;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcSystem;
import org.apache.flink.runtime.rpc.RpcSystemUtils;
import org.apache.flink.runtime.taskexecutor.HostBindPolicy;
import org.apache.flink.runtime.taskexecutor.TaskExecutorResourceSpec;
import org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils;
import org.apache.flink.runtime.taskexecutor.TaskManagerRunner;
import org.apache.flink.runtime.taskexecutor.TaskManagerServicesConfiguration;
import org.apache.flink.shaded.guava31.com.google.common.net.InetAddresses;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.concurrent.Executors;
import org.assertj.core.api.AbstractStringAssert;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.opentest4j.TestAbortedException;

@NotThreadSafe
class TaskManagerRunnerConfigurationTest {
    private static final RpcSystem RPC_SYSTEM = RpcSystem.load();
    private static final int TEST_TIMEOUT_SECONDS = 10;
    @TempDir
    private Path temporaryFolder;

    TaskManagerRunnerConfigurationTest() {
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testTaskManagerRpcServiceShouldBindToConfiguredTaskManagerHostname() throws Exception {
        String taskmanagerHost = "testhostname";
        Configuration config = TaskManagerRunnerConfigurationTest.createFlinkConfigWithPredefinedTaskManagerHostname("testhostname");
        HighAvailabilityServices highAvailabilityServices = this.createHighAvailabilityServices(config);
        RpcService taskManagerRpcService = null;
        try {
            taskManagerRpcService = TaskManagerRunner.createRpcService((Configuration)config, (HighAvailabilityServices)highAvailabilityServices, (RpcSystem)RPC_SYSTEM);
            Assertions.assertThat((int)taskManagerRpcService.getPort()).isGreaterThanOrEqualTo(0);
            Assertions.assertThat((String)taskManagerRpcService.getAddress()).isEqualTo("testhostname");
        }
        catch (Throwable throwable) {
            TaskManagerRunnerConfigurationTest.maybeCloseRpcService(taskManagerRpcService);
            highAvailabilityServices.closeWithOptionalClean(true);
            throw throwable;
        }
        TaskManagerRunnerConfigurationTest.maybeCloseRpcService(taskManagerRpcService);
        highAvailabilityServices.closeWithOptionalClean(true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testTaskManagerRpcServiceShouldBindToHostnameAddress() throws Exception {
        Configuration config = TaskManagerRunnerConfigurationTest.createFlinkConfigWithHostBindPolicy(HostBindPolicy.NAME);
        HighAvailabilityServices highAvailabilityServices = this.createHighAvailabilityServices(config);
        RpcService taskManagerRpcService = null;
        try {
            taskManagerRpcService = TaskManagerRunner.createRpcService((Configuration)config, (HighAvailabilityServices)highAvailabilityServices, (RpcSystem)RPC_SYSTEM);
            ((AbstractStringAssert)Assertions.assertThat((String)taskManagerRpcService.getAddress()).isNotNull()).isNotEmpty();
        }
        catch (Throwable throwable) {
            TaskManagerRunnerConfigurationTest.maybeCloseRpcService(taskManagerRpcService);
            highAvailabilityServices.closeWithOptionalClean(true);
            throw throwable;
        }
        TaskManagerRunnerConfigurationTest.maybeCloseRpcService(taskManagerRpcService);
        highAvailabilityServices.closeWithOptionalClean(true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testTaskManagerRpcServiceShouldBindToIpAddressDeterminedByConnectingToResourceManager() throws Exception {
        ServerSocket testJobManagerSocket = TaskManagerRunnerConfigurationTest.openServerSocket();
        Configuration config = TaskManagerRunnerConfigurationTest.createFlinkConfigWithJobManagerPort(testJobManagerSocket.getLocalPort());
        HighAvailabilityServices highAvailabilityServices = this.createHighAvailabilityServices(config);
        RpcService taskManagerRpcService = null;
        try {
            taskManagerRpcService = TaskManagerRunner.createRpcService((Configuration)config, (HighAvailabilityServices)highAvailabilityServices, (RpcSystem)RPC_SYSTEM);
            Assertions.assertThat((String)taskManagerRpcService.getAddress()).matches(InetAddresses::isInetAddress);
        }
        catch (Throwable throwable) {
            TaskManagerRunnerConfigurationTest.maybeCloseRpcService(taskManagerRpcService);
            highAvailabilityServices.closeWithOptionalClean(true);
            IOUtils.closeQuietly((AutoCloseable)testJobManagerSocket);
            throw throwable;
        }
        TaskManagerRunnerConfigurationTest.maybeCloseRpcService(taskManagerRpcService);
        highAvailabilityServices.closeWithOptionalClean(true);
        IOUtils.closeQuietly((AutoCloseable)testJobManagerSocket);
    }

    @Test
    void testCreatingTaskManagerRpcServiceShouldFailIfRpcPortRangeIsInvalid() throws Exception {
        Configuration config = new Configuration(TaskManagerRunnerConfigurationTest.createFlinkConfigWithPredefinedTaskManagerHostname("example.org"));
        config.set(TaskManagerOptions.RPC_PORT, (Object)"-1");
        HighAvailabilityServices highAvailabilityServices = this.createHighAvailabilityServices(config);
        try {
            ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> TaskManagerRunner.createRpcService((Configuration)config, (HighAvailabilityServices)highAvailabilityServices, (RpcSystem)RPC_SYSTEM)).isInstanceOf(IllegalArgumentException.class)).hasMessage("Invalid port range definition: -1");
        }
        finally {
            highAvailabilityServices.closeWithOptionalClean(true);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testDefaultFsParameterLoading() throws Exception {
        try {
            File tmpDir = Files.createTempDirectory(this.temporaryFolder, UUID.randomUUID().toString(), new FileAttribute[0]).toFile();
            File confFile = new File(tmpDir, "config.yaml");
            URI defaultFS = new URI("otherFS", null, "localhost", 1234, null, null, null);
            PrintWriter pw1 = new PrintWriter(confFile);
            pw1.println("fs.default-scheme: " + defaultFS);
            pw1.close();
            String[] args = new String[]{"--configDir", tmpDir.toString()};
            Configuration configuration = TaskManagerRunner.loadConfiguration((String[])args);
            FileSystem.initialize((Configuration)configuration);
            Assertions.assertThat((URI)defaultFS).isEqualTo((Object)FileSystem.getDefaultFsUri());
        }
        finally {
            FileSystem.initialize((Configuration)new Configuration());
        }
    }

    @Test
    void testLoadDynamicalProperties() throws IOException, FlinkParseException {
        File tmpDir = Files.createTempDirectory(this.temporaryFolder, UUID.randomUUID().toString(), new FileAttribute[0]).toFile();
        File confFile = new File(tmpDir, "config.yaml");
        PrintWriter pw1 = new PrintWriter(confFile);
        long managedMemory = 0x10000000L;
        pw1.println(JobManagerOptions.ADDRESS.key() + ": localhost");
        pw1.println(TaskManagerOptions.MANAGED_MEMORY_SIZE.key() + ": " + 0x10000000L + "b");
        pw1.close();
        String jmHost = "host1";
        int jmPort = 12345;
        String[] args = new String[]{"--configDir", tmpDir.toString(), "-D" + JobManagerOptions.ADDRESS.key() + "=" + "host1", "-D" + JobManagerOptions.PORT.key() + "=" + 12345};
        Configuration configuration = TaskManagerRunner.loadConfiguration((String[])args);
        Assertions.assertThat((Comparable)MemorySize.parse((String)"268435456b")).isEqualTo(configuration.get(TaskManagerOptions.MANAGED_MEMORY_SIZE));
        Assertions.assertThat((String)"host1").isEqualTo((String)configuration.get(JobManagerOptions.ADDRESS));
        Assertions.assertThat((int)12345).isEqualTo(configuration.get(JobManagerOptions.PORT));
    }

    @Test
    void testNodeIdShouldBeConfiguredValueIfExplicitlySet() throws Exception {
        String nodeId = "node1";
        Configuration configuration = new Configuration();
        configuration.set(TaskManagerOptionsInternal.TASK_MANAGER_NODE_ID, (Object)nodeId);
        TaskManagerServicesConfiguration servicesConfiguration = this.createTaskManagerServiceConfiguration(configuration);
        Assertions.assertThat((String)servicesConfiguration.getNodeId()).isEqualTo(nodeId);
    }

    @Test
    void testNodeIdShouldBeExternalAddressIfNotExplicitlySet() throws Exception {
        TaskManagerServicesConfiguration servicesConfiguration = this.createTaskManagerServiceConfiguration(new Configuration());
        Assertions.assertThat((String)servicesConfiguration.getNodeId()).isEqualTo(InetAddress.getLocalHost().getHostName());
    }

    private TaskManagerServicesConfiguration createTaskManagerServiceConfiguration(Configuration config) throws Exception {
        return TaskManagerServicesConfiguration.fromConfiguration((Configuration)config, (ResourceID)ResourceID.generate(), (String)InetAddress.getLocalHost().getHostName(), (boolean)true, (TaskExecutorResourceSpec)TaskExecutorResourceUtils.resourceSpecFromConfigForLocalExecution((Configuration)config), (WorkingDirectory)WorkingDirectory.create((File)Files.createTempDirectory(this.temporaryFolder, UUID.randomUUID().toString(), new FileAttribute[0]).toFile()));
    }

    private static Configuration createFlinkConfigWithPredefinedTaskManagerHostname(String taskmanagerHost) {
        Configuration config = new Configuration();
        config.set(TaskManagerOptions.HOST, (Object)taskmanagerHost);
        config.set(JobManagerOptions.ADDRESS, (Object)"localhost");
        return new UnmodifiableConfiguration(config);
    }

    private static Configuration createFlinkConfigWithHostBindPolicy(HostBindPolicy bindPolicy) {
        Configuration config = new Configuration();
        config.set(TaskManagerOptions.HOST_BIND_POLICY, (Object)bindPolicy.toString());
        config.set(JobManagerOptions.ADDRESS, (Object)"localhost");
        config.set(RpcOptions.LOOKUP_TIMEOUT_DURATION, (Object)Duration.ofMillis(10L));
        return new UnmodifiableConfiguration(config);
    }

    private static Configuration createFlinkConfigWithJobManagerPort(int port) {
        Configuration config = new Configuration();
        config.set(JobManagerOptions.ADDRESS, (Object)"localhost");
        config.set(JobManagerOptions.PORT, (Object)port);
        return new UnmodifiableConfiguration(config);
    }

    private HighAvailabilityServices createHighAvailabilityServices(Configuration config) throws Exception {
        return HighAvailabilityServicesUtils.createHighAvailabilityServices((Configuration)config, (Executor)Executors.directExecutor(), (AddressResolution)AddressResolution.NO_ADDRESS_RESOLUTION, (RpcSystemUtils)RpcSystem.load(), (FatalErrorHandler)NoOpFatalErrorHandler.INSTANCE);
    }

    private static ServerSocket openServerSocket() {
        try {
            return new ServerSocket(0);
        }
        catch (IOException e) {
            throw new TestAbortedException("Skip test because could not open a server socket");
        }
    }

    private static void maybeCloseRpcService(@Nullable RpcService rpcService) throws Exception {
        if (rpcService != null) {
            rpcService.closeAsync().get(10L, TimeUnit.SECONDS);
        }
    }
}

