package org.apache.flink.runtime.io.disk;

import java.io.File;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.runtime.testutils.TestJvmProcess;
import org.apache.flink.util.OperatingSystem;
import org.apache.flink.util.ShutdownHookUtil;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/io/disk/FileChannelManagerImplTest.class */
public class FileChannelManagerImplTest extends TestLogger {
    private static final String DIR_NAME_PREFIX = "manager-test";
    private static final String SIGNAL_FILE_FOR_KILLING = "could-kill";

    @Rule
    public final TemporaryFolder temporaryFolder = new TemporaryFolder();
    private static final Logger LOG = LoggerFactory.getLogger(FileChannelManagerImplTest.class);
    private static final Duration TEST_TIMEOUT = Duration.ofSeconds(10);

    /* loaded from: input_file:org/apache/flink/runtime/io/disk/FileChannelManagerImplTest$FileChannelManagerCleanupRunner.class */
    public static class FileChannelManagerCleanupRunner {
        public static void main(String[] strArr) throws Exception {
            boolean parseBoolean = Boolean.parseBoolean(strArr[0]);
            String str = strArr[1];
            String str2 = strArr[2];
            FileChannelManagerImplTest.LOG.info("The FileChannelManagerCleanupRunner process has started");
            FileChannelManagerImpl fileChannelManagerImpl = new FileChannelManagerImpl(new String[]{str}, FileChannelManagerImplTest.DIR_NAME_PREFIX);
            if (parseBoolean) {
                ShutdownHookUtil.addShutdownHook(() -> {
                    fileChannelManagerImpl.close();
                }, "Caller", FileChannelManagerImplTest.LOG);
            }
            FileChannelManagerImplTest.LOG.info("The FileChannelManagerCleanupRunner is going to create the new file");
            new File(str2).createNewFile();
            FileChannelManagerImplTest.LOG.info("The FileChannelManagerCleanupRunner has created the new file");
            Thread.sleep(3 * FileChannelManagerImplTest.TEST_TIMEOUT.toMillis());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/io/disk/FileChannelManagerImplTest$FileChannelManagerTestProcess.class */
    public static class FileChannelManagerTestProcess extends TestJvmProcess {
        private final boolean callerHasHook;
        private final String tmpDirectories;
        private final String signalFilePath;

        FileChannelManagerTestProcess(boolean z, String str, String str2) throws Exception {
            this.callerHasHook = z;
            this.tmpDirectories = str;
            this.signalFilePath = str2;
        }

        @Override // org.apache.flink.runtime.testutils.TestJvmProcess
        public String getName() {
            return "File Channel Manager Test";
        }

        @Override // org.apache.flink.runtime.testutils.TestJvmProcess
        public String[] getJvmArgs() {
            return new String[]{Boolean.toString(this.callerHasHook), this.tmpDirectories, this.signalFilePath};
        }

        @Override // org.apache.flink.runtime.testutils.TestJvmProcess
        public String getEntryPointClassName() {
            return FileChannelManagerCleanupRunner.class.getName();
        }
    }

    @Test
    public void testFairness() throws Exception {
        String absolutePath = this.temporaryFolder.newFolder().getAbsoluteFile().getAbsolutePath();
        FileChannelManagerImpl fileChannelManagerImpl = new FileChannelManagerImpl(new String[]{absolutePath, this.temporaryFolder.newFolder().getAbsoluteFile().getAbsolutePath()}, "test");
        int i = 100000;
        AtomicInteger atomicInteger = new AtomicInteger();
        AtomicInteger atomicInteger2 = new AtomicInteger();
        Thread[] threadArr = new Thread[10];
        for (int i2 = 0; i2 < 10; i2++) {
            threadArr[i2] = new Thread(() -> {
                for (int i3 = 0; i3 < i; i3++) {
                    if (fileChannelManagerImpl.createChannel().getPath().startsWith(absolutePath)) {
                        atomicInteger.incrementAndGet();
                    } else {
                        atomicInteger2.incrementAndGet();
                    }
                }
            });
            threadArr[i2].start();
        }
        for (int i3 = 0; i3 < 10; i3++) {
            threadArr[i3].join();
        }
        Assert.assertEquals(atomicInteger.get(), atomicInteger2.get());
    }

    @Test
    public void testDirectoriesCleanupOnKillWithoutCallerHook() throws Exception {
        testDirectoriesCleanupOnKill(false);
    }

    @Test
    public void testDirectoriesCleanupOnKillWithCallerHook() throws Exception {
        testDirectoriesCleanupOnKill(true);
    }

    private void testDirectoriesCleanupOnKill(boolean z) throws Exception {
        Assume.assumeTrue(OperatingSystem.isLinux() || OperatingSystem.isFreeBSD() || OperatingSystem.isSolaris() || OperatingSystem.isMac());
        File newFolder = this.temporaryFolder.newFolder();
        File file = new File(this.temporaryFolder.newFolder().getAbsolutePath(), SIGNAL_FILE_FOR_KILLING);
        FileChannelManagerTestProcess fileChannelManagerTestProcess = new FileChannelManagerTestProcess(z, newFolder.getAbsolutePath(), file.getAbsolutePath());
        try {
            fileChannelManagerTestProcess.startProcess();
            TestJvmProcess.waitForMarkerFile(file, 3 * TEST_TIMEOUT.toMillis());
            Runtime.getRuntime().exec("kill " + fileChannelManagerTestProcess.getProcessId()).waitFor();
            Assert.assertEquals("Failed to send SIG_TERM to process", 0L, r0.exitValue());
            Deadline plus = Deadline.now().plus(TEST_TIMEOUT);
            while (fileChannelManagerTestProcess.isAlive() && plus.hasTimeLeft()) {
                Thread.sleep(100L);
            }
            Assert.assertFalse("The file channel manager test process does not terminate in time, its output is: \n" + fileChannelManagerTestProcess.getProcessOutput(), fileChannelManagerTestProcess.isAlive());
            Assert.assertFalse("The file channel manager test process does not remove the tmp shuffle directories after termination, its output is \n" + fileChannelManagerTestProcess.getProcessOutput(), fileOrDirExists(newFolder, DIR_NAME_PREFIX));
            fileChannelManagerTestProcess.destroy();
        } catch (Throwable th) {
            fileChannelManagerTestProcess.destroy();
            throw th;
        }
    }

    private boolean fileOrDirExists(File file, String str) {
        File[] listFiles = file.listFiles((file2, str2) -> {
            return str2.contains(str);
        });
        return listFiles != null && listFiles.length > 0;
    }
}
