package org.apache.flink.runtime.state;

import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.nio.file.Paths;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.blob.VoidPermanentBlobService;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.entrypoint.WorkingDirectory;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils;
import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
import org.apache.flink.runtime.taskexecutor.TaskManagerServicesConfiguration;
import org.apache.flink.runtime.testutils.WorkingDirectoryResource;
import org.apache.flink.shaded.guava30.com.google.common.collect.Sets;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.Reference;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.concurrent.Executors;
import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.class */
public class TaskExecutorLocalStateStoresManagerTest extends TestLogger {

    @ClassRule
    public static TemporaryFolder temporaryFolder = new TemporaryFolder();

    @ClassRule
    public static final WorkingDirectoryResource WORKING_DIRECTORY_RESOURCE = new WorkingDirectoryResource();

    @Test
    public void testCreationFromConfig() throws Exception {
        Configuration configuration = new Configuration();
        String replaceAll = "__localStateRoot1,__localStateRoot2,__localStateRoot3".replaceAll("__", temporaryFolder.newFolder().getAbsolutePath() + File.separator);
        configuration.setString(CheckpointingOptions.LOCAL_RECOVERY_TASK_MANAGER_STATE_ROOT_DIRS, replaceAll);
        configuration.setBoolean(CheckpointingOptions.LOCAL_RECOVERY, true);
        WorkingDirectory createNewWorkingDirectory = WORKING_DIRECTORY_RESOURCE.createNewWorkingDirectory();
        TaskManagerServices createTaskManagerServices = createTaskManagerServices(createTaskManagerServiceConfiguration(configuration, createNewWorkingDirectory), createNewWorkingDirectory);
        try {
            TaskExecutorLocalStateStoresManager taskManagerStateStore = createTaskManagerServices.getTaskManagerStateStore();
            String[] split = replaceAll.split(",");
            File[] localStateRootDirectories = taskManagerStateStore.getLocalStateRootDirectories();
            for (int i = 0; i < split.length; i++) {
                Assertions.assertThat(localStateRootDirectories[i].toPath()).startsWith(Paths.get(split[i], new String[0]));
            }
            Assert.assertTrue(taskManagerStateStore.isLocalRecoveryEnabled());
            for (File file : localStateRootDirectories) {
                FileUtils.deleteFileOrDirectory(file);
            }
        } finally {
            createTaskManagerServices.shutDown();
        }
    }

    @Test
    public void testCreationFromConfigDefault() throws Exception {
        Configuration configuration = new Configuration();
        WorkingDirectory createNewWorkingDirectory = WORKING_DIRECTORY_RESOURCE.createNewWorkingDirectory();
        TaskManagerServices createTaskManagerServices = createTaskManagerServices(createTaskManagerServiceConfiguration(configuration, createNewWorkingDirectory), createNewWorkingDirectory);
        try {
            TaskExecutorLocalStateStoresManager taskManagerStateStore = createTaskManagerServices.getTaskManagerStateStore();
            for (File file : taskManagerStateStore.getLocalStateRootDirectories()) {
                Assert.assertEquals(createNewWorkingDirectory.getLocalStateDirectory(), file);
            }
            Assert.assertFalse(taskManagerStateStore.isLocalRecoveryEnabled());
            createTaskManagerServices.shutDown();
        } catch (Throwable th) {
            createTaskManagerServices.shutDown();
            throw th;
        }
    }

    @Test
    public void testLocalStateNoCreateDirWhenDisabledLocalRecovery() throws Exception {
        JobID jobID = new JobID();
        JobVertexID jobVertexID = new JobVertexID();
        AllocationID allocationID = new AllocationID();
        File[] fileArr = {temporaryFolder.newFolder(), temporaryFolder.newFolder(), temporaryFolder.newFolder()};
        TaskLocalStateStore localStateStoreForSubtask = new TaskExecutorLocalStateStoresManager(false, Reference.owned(fileArr), Executors.directExecutor()).localStateStoreForSubtask(jobID, allocationID, jobVertexID, 23);
        Assert.assertFalse(localStateStoreForSubtask.getLocalRecoveryConfig().isLocalRecoveryEnabled());
        Assert.assertNull(localStateStoreForSubtask.getLocalRecoveryConfig().getLocalStateDirectoryProvider().orElse(null));
        for (File file : fileArr) {
            Assert.assertEquals(0L, file.listFiles().length);
        }
    }

    @Test
    public void testSubtaskStateStoreDirectoryCreateAndDelete() throws Exception {
        JobID jobID = new JobID();
        JobVertexID jobVertexID = new JobVertexID();
        AllocationID allocationID = new AllocationID();
        File[] fileArr = {temporaryFolder.newFolder(), temporaryFolder.newFolder(), temporaryFolder.newFolder()};
        TaskExecutorLocalStateStoresManager taskExecutorLocalStateStoresManager = new TaskExecutorLocalStateStoresManager(true, Reference.owned(fileArr), Executors.directExecutor());
        TaskLocalStateStore localStateStoreForSubtask = taskExecutorLocalStateStoresManager.localStateStoreForSubtask(jobID, allocationID, jobVertexID, 23);
        LocalRecoveryDirectoryProvider localRecoveryDirectoryProvider = (LocalRecoveryDirectoryProvider) localStateStoreForSubtask.getLocalRecoveryConfig().getLocalStateDirectoryProvider().orElseThrow(LocalRecoveryConfig.localRecoveryNotEnabled());
        for (int i = 0; i < 10; i++) {
            Assert.assertEquals(new File(fileArr[(i & Integer.MAX_VALUE) % fileArr.length], taskExecutorLocalStateStoresManager.allocationSubDirString(allocationID)), localRecoveryDirectoryProvider.allocationBaseDirectory(i));
        }
        File allocationBaseDirectory = localRecoveryDirectoryProvider.allocationBaseDirectory(42L);
        File subtaskSpecificCheckpointDirectory = localRecoveryDirectoryProvider.subtaskSpecificCheckpointDirectory(42L);
        Assert.assertEquals(new File(allocationBaseDirectory, "jid_" + jobID + File.separator + "vtx_" + jobVertexID + "_sti_23" + File.separator + "chk_42"), subtaskSpecificCheckpointDirectory);
        Assert.assertTrue(subtaskSpecificCheckpointDirectory.mkdirs());
        File file = new File(subtaskSpecificCheckpointDirectory, "test");
        Assert.assertTrue(file.createNewFile());
        Assert.assertEquals(Boolean.valueOf(taskExecutorLocalStateStoresManager.isLocalRecoveryEnabled()), Boolean.valueOf(localStateStoreForSubtask.getLocalRecoveryConfig().isLocalRecoveryEnabled()));
        Assert.assertTrue(file.exists());
        taskExecutorLocalStateStoresManager.releaseLocalStateForAllocationId(allocationID);
        checkRootDirsClean(fileArr);
        File subtaskSpecificCheckpointDirectory2 = ((LocalRecoveryDirectoryProvider) taskExecutorLocalStateStoresManager.localStateStoreForSubtask(jobID, new AllocationID(), jobVertexID, 23).getLocalRecoveryConfig().getLocalStateDirectoryProvider().orElseThrow(LocalRecoveryConfig.localRecoveryNotEnabled())).subtaskSpecificCheckpointDirectory(23L);
        Assert.assertTrue(subtaskSpecificCheckpointDirectory2.mkdirs());
        Assert.assertTrue(new File(subtaskSpecificCheckpointDirectory2, "test").createNewFile());
        taskExecutorLocalStateStoresManager.shutdown();
        checkRootDirsClean(fileArr);
    }

    @Test
    public void testOwnedLocalStateDirectoriesAreDeletedOnShutdown() throws IOException {
        File[] fileArr = {temporaryFolder.newFolder(), temporaryFolder.newFolder()};
        TaskExecutorLocalStateStoresManager taskExecutorLocalStateStoresManager = new TaskExecutorLocalStateStoresManager(true, Reference.owned(fileArr), Executors.directExecutor());
        for (File file : fileArr) {
            Assertions.assertThat(file).exists();
        }
        taskExecutorLocalStateStoresManager.shutdown();
        for (File file2 : fileArr) {
            Assertions.assertThat(file2).doesNotExist();
        }
    }

    @Test
    public void testBorrowedLocalStateDirectoriesAreNotDeletedOnShutdown() throws IOException {
        File[] fileArr = {temporaryFolder.newFolder(), temporaryFolder.newFolder()};
        TaskExecutorLocalStateStoresManager taskExecutorLocalStateStoresManager = new TaskExecutorLocalStateStoresManager(true, Reference.borrowed(fileArr), Executors.directExecutor());
        for (File file : fileArr) {
            Assertions.assertThat(file).exists();
        }
        taskExecutorLocalStateStoresManager.shutdown();
        for (File file2 : fileArr) {
            Assertions.assertThat(file2).exists();
        }
    }

    @Test
    public void testRetainLocalStateForAllocationsDeletesUnretainedAllocationDirectories() throws IOException {
        File newFolder = temporaryFolder.newFolder();
        TaskExecutorLocalStateStoresManager taskExecutorLocalStateStoresManager = new TaskExecutorLocalStateStoresManager(true, Reference.owned(new File[]{newFolder}), Executors.directExecutor());
        JobID jobID = new JobID();
        AllocationID allocationID = new AllocationID();
        AllocationID allocationID2 = new AllocationID();
        JobVertexID jobVertexID = new JobVertexID();
        taskExecutorLocalStateStoresManager.localStateStoreForSubtask(jobID, allocationID, jobVertexID, 0);
        taskExecutorLocalStateStoresManager.localStateStoreForSubtask(jobID, allocationID2, jobVertexID, 1);
        Assertions.assertThat(TaskExecutorLocalStateStoresManager.listAllocationDirectoriesIn(newFolder)).hasSize(2);
        taskExecutorLocalStateStoresManager.retainLocalStateForAllocations(Sets.newHashSet(new AllocationID[]{allocationID}));
        Assertions.assertThat(TaskExecutorLocalStateStoresManager.listAllocationDirectoriesIn(newFolder)).hasSize(1);
        Assertions.assertThat(new File(newFolder, taskExecutorLocalStateStoresManager.allocationSubDirString(allocationID2))).doesNotExist();
    }

    private void checkRootDirsClean(File[] fileArr) {
        for (File file : fileArr) {
            File[] listFiles = file.listFiles();
            if (listFiles != null) {
                Assert.assertArrayEquals(new File[0], listFiles);
            }
        }
    }

    private TaskManagerServicesConfiguration createTaskManagerServiceConfiguration(Configuration configuration, WorkingDirectory workingDirectory) throws Exception {
        return TaskManagerServicesConfiguration.fromConfiguration(configuration, ResourceID.generate(), InetAddress.getLocalHost().getHostName(), true, TaskExecutorResourceUtils.resourceSpecFromConfigForLocalExecution(configuration), workingDirectory);
    }

    private TaskManagerServices createTaskManagerServices(TaskManagerServicesConfiguration taskManagerServicesConfiguration, WorkingDirectory workingDirectory) throws Exception {
        return TaskManagerServices.fromConfiguration(taskManagerServicesConfiguration, VoidPermanentBlobService.INSTANCE, UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), Executors.newDirectExecutorService(), th -> {
        }, workingDirectory);
    }
}
