package org.apache.flink.runtime.taskexecutor;

import java.io.File;
import java.io.IOException;
import java.io.PrintWriter;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.attribute.FileAttribute;
import java.time.Duration;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.configuration.TaskManagerOptionsInternal;
import org.apache.flink.configuration.UnmodifiableConfiguration;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.entrypoint.FlinkParseException;
import org.apache.flink.runtime.entrypoint.WorkingDirectory;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.runtime.rest.util.NoOpFatalErrorHandler;
import org.apache.flink.runtime.rpc.AddressResolution;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcSystem;
import org.apache.flink.shaded.guava31.com.google.common.net.InetAddresses;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.concurrent.Executors;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.opentest4j.TestAbortedException;

@NotThreadSafe
/* loaded from: input_file:org/apache/flink/runtime/taskexecutor/TaskManagerRunnerConfigurationTest.class */
class TaskManagerRunnerConfigurationTest {
    private static final RpcSystem RPC_SYSTEM = RpcSystem.load();
    private static final int TEST_TIMEOUT_SECONDS = 10;

    @TempDir
    private Path temporaryFolder;

    TaskManagerRunnerConfigurationTest() {
    }

    @Test
    void testTaskManagerRpcServiceShouldBindToConfiguredTaskManagerHostname() throws Exception {
        Configuration createFlinkConfigWithPredefinedTaskManagerHostname = createFlinkConfigWithPredefinedTaskManagerHostname("testhostname");
        HighAvailabilityServices createHighAvailabilityServices = createHighAvailabilityServices(createFlinkConfigWithPredefinedTaskManagerHostname);
        RpcService rpcService = null;
        try {
            rpcService = TaskManagerRunner.createRpcService(createFlinkConfigWithPredefinedTaskManagerHostname, createHighAvailabilityServices, RPC_SYSTEM);
            Assertions.assertThat(rpcService.getPort()).isGreaterThanOrEqualTo(0);
            Assertions.assertThat(rpcService.getAddress()).isEqualTo("testhostname");
            maybeCloseRpcService(rpcService);
            createHighAvailabilityServices.closeAndCleanupAllData();
        } catch (Throwable th) {
            maybeCloseRpcService(rpcService);
            createHighAvailabilityServices.closeAndCleanupAllData();
            throw th;
        }
    }

    @Test
    void testTaskManagerRpcServiceShouldBindToHostnameAddress() throws Exception {
        Configuration createFlinkConfigWithHostBindPolicy = createFlinkConfigWithHostBindPolicy(HostBindPolicy.NAME);
        HighAvailabilityServices createHighAvailabilityServices = createHighAvailabilityServices(createFlinkConfigWithHostBindPolicy);
        RpcService rpcService = null;
        try {
            rpcService = TaskManagerRunner.createRpcService(createFlinkConfigWithHostBindPolicy, createHighAvailabilityServices, RPC_SYSTEM);
            Assertions.assertThat(rpcService.getAddress()).isNotNull().isNotEmpty();
            maybeCloseRpcService(rpcService);
            createHighAvailabilityServices.closeAndCleanupAllData();
        } catch (Throwable th) {
            maybeCloseRpcService(rpcService);
            createHighAvailabilityServices.closeAndCleanupAllData();
            throw th;
        }
    }

    @Test
    void testTaskManagerRpcServiceShouldBindToIpAddressDeterminedByConnectingToResourceManager() throws Exception {
        ServerSocket openServerSocket = openServerSocket();
        Configuration createFlinkConfigWithJobManagerPort = createFlinkConfigWithJobManagerPort(openServerSocket.getLocalPort());
        HighAvailabilityServices createHighAvailabilityServices = createHighAvailabilityServices(createFlinkConfigWithJobManagerPort);
        RpcService rpcService = null;
        try {
            rpcService = TaskManagerRunner.createRpcService(createFlinkConfigWithJobManagerPort, createHighAvailabilityServices, RPC_SYSTEM);
            Assertions.assertThat(rpcService.getAddress()).matches(InetAddresses::isInetAddress);
            maybeCloseRpcService(rpcService);
            createHighAvailabilityServices.closeAndCleanupAllData();
            IOUtils.closeQuietly(openServerSocket);
        } catch (Throwable th) {
            maybeCloseRpcService(rpcService);
            createHighAvailabilityServices.closeAndCleanupAllData();
            IOUtils.closeQuietly(openServerSocket);
            throw th;
        }
    }

    @Test
    void testCreatingTaskManagerRpcServiceShouldFailIfRpcPortRangeIsInvalid() throws Exception {
        Configuration configuration = new Configuration(createFlinkConfigWithPredefinedTaskManagerHostname("example.org"));
        configuration.setString(TaskManagerOptions.RPC_PORT, "-1");
        HighAvailabilityServices createHighAvailabilityServices = createHighAvailabilityServices(configuration);
        try {
            Assertions.assertThatThrownBy(() -> {
                TaskManagerRunner.createRpcService(configuration, createHighAvailabilityServices, RPC_SYSTEM);
            }).isInstanceOf(IllegalArgumentException.class).hasMessage("Invalid port range definition: -1");
        } finally {
            createHighAvailabilityServices.closeAndCleanupAllData();
        }
    }

    @Test
    void testDefaultFsParameterLoading() throws Exception {
        try {
            File file = Files.createTempDirectory(this.temporaryFolder, UUID.randomUUID().toString(), new FileAttribute[0]).toFile();
            File file2 = new File(file, "flink-conf.yaml");
            URI uri = new URI("otherFS", null, "localhost", 1234, null, null, null);
            PrintWriter printWriter = new PrintWriter(file2);
            printWriter.println("fs.default-scheme: " + uri);
            printWriter.close();
            FileSystem.initialize(TaskManagerRunner.loadConfiguration(new String[]{"--configDir", file.toString()}));
            Assertions.assertThat(uri).isEqualTo(FileSystem.getDefaultFsUri());
            FileSystem.initialize(new Configuration());
        } catch (Throwable th) {
            FileSystem.initialize(new Configuration());
            throw th;
        }
    }

    @Test
    void testLoadDynamicalProperties() throws IOException, FlinkParseException {
        File file = Files.createTempDirectory(this.temporaryFolder, UUID.randomUUID().toString(), new FileAttribute[0]).toFile();
        PrintWriter printWriter = new PrintWriter(new File(file, "flink-conf.yaml"));
        printWriter.println(JobManagerOptions.ADDRESS.key() + ": localhost");
        printWriter.println(TaskManagerOptions.MANAGED_MEMORY_SIZE.key() + ": 268435456b");
        printWriter.close();
        Configuration loadConfiguration = TaskManagerRunner.loadConfiguration(new String[]{"--configDir", file.toString(), "-D" + JobManagerOptions.ADDRESS.key() + "=host1", "-D" + JobManagerOptions.PORT.key() + "=12345"});
        Assertions.assertThat(MemorySize.parse("268435456b")).isEqualTo(loadConfiguration.get(TaskManagerOptions.MANAGED_MEMORY_SIZE));
        Assertions.assertThat("host1").isEqualTo((String) loadConfiguration.get(JobManagerOptions.ADDRESS));
        Assertions.assertThat(12345).isEqualTo(loadConfiguration.getInteger(JobManagerOptions.PORT));
    }

    @Test
    void testNodeIdShouldBeConfiguredValueIfExplicitlySet() throws Exception {
        Configuration configuration = new Configuration();
        configuration.set(TaskManagerOptionsInternal.TASK_MANAGER_NODE_ID, "node1");
        Assertions.assertThat(createTaskManagerServiceConfiguration(configuration).getNodeId()).isEqualTo("node1");
    }

    @Test
    void testNodeIdShouldBeExternalAddressIfNotExplicitlySet() throws Exception {
        Assertions.assertThat(createTaskManagerServiceConfiguration(new Configuration()).getNodeId()).isEqualTo(InetAddress.getLocalHost().getHostName());
    }

    private TaskManagerServicesConfiguration createTaskManagerServiceConfiguration(Configuration configuration) throws Exception {
        return TaskManagerServicesConfiguration.fromConfiguration(configuration, ResourceID.generate(), InetAddress.getLocalHost().getHostName(), true, TaskExecutorResourceUtils.resourceSpecFromConfigForLocalExecution(configuration), WorkingDirectory.create(Files.createTempDirectory(this.temporaryFolder, UUID.randomUUID().toString(), new FileAttribute[0]).toFile()));
    }

    private static Configuration createFlinkConfigWithPredefinedTaskManagerHostname(String str) {
        Configuration configuration = new Configuration();
        configuration.setString(TaskManagerOptions.HOST, str);
        configuration.setString(JobManagerOptions.ADDRESS, "localhost");
        return new UnmodifiableConfiguration(configuration);
    }

    private static Configuration createFlinkConfigWithHostBindPolicy(HostBindPolicy hostBindPolicy) {
        Configuration configuration = new Configuration();
        configuration.setString(TaskManagerOptions.HOST_BIND_POLICY, hostBindPolicy.toString());
        configuration.setString(JobManagerOptions.ADDRESS, "localhost");
        configuration.set(AkkaOptions.LOOKUP_TIMEOUT_DURATION, Duration.ofMillis(10L));
        return new UnmodifiableConfiguration(configuration);
    }

    private static Configuration createFlinkConfigWithJobManagerPort(int i) {
        Configuration configuration = new Configuration();
        configuration.setString(JobManagerOptions.ADDRESS, "localhost");
        configuration.setInteger(JobManagerOptions.PORT, i);
        return new UnmodifiableConfiguration(configuration);
    }

    private HighAvailabilityServices createHighAvailabilityServices(Configuration configuration) throws Exception {
        return HighAvailabilityServicesUtils.createHighAvailabilityServices(configuration, Executors.directExecutor(), AddressResolution.NO_ADDRESS_RESOLUTION, RpcSystem.load(), NoOpFatalErrorHandler.INSTANCE);
    }

    private static ServerSocket openServerSocket() {
        try {
            return new ServerSocket(0);
        } catch (IOException e) {
            throw new TestAbortedException("Skip test because could not open a server socket");
        }
    }

    private static void maybeCloseRpcService(@Nullable RpcService rpcService) throws Exception {
        if (rpcService != null) {
            rpcService.closeAsync().get(10L, TimeUnit.SECONDS);
        }
    }
}
