/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.filecache;

import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.CopyOption;
import java.nio.file.Files;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.runtime.blob.PermanentBlobService;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.filecache.FileCache;
import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.InstantiationUtil;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class FileCacheDirectoriesTest {
    private static final String testFileContent = "Goethe - Faust: Der Tragoedie erster Teil\nProlog im Himmel.\nDer Herr. Die himmlischen Heerscharen. Nachher Mephistopheles. Die drei\nErzengel treten vor.\nRAPHAEL: Die Sonne toent, nach alter Weise, In Brudersphaeren Wettgesang,\nUnd ihre vorgeschriebne Reise Vollendet sie mit Donnergang. Ihr Anblick\ngibt den Engeln Staerke, Wenn keiner Sie ergruenden mag; die unbegreiflich\nhohen Werke Sind herrlich wie am ersten Tag.\nGABRIEL: Und schnell und unbegreiflich schnelle Dreht sich umher der Erde\nPracht; Es wechselt Paradieseshelle Mit tiefer, schauervoller Nacht. Es\nschaeumt das Meer in breiten Fluessen Am tiefen Grund der Felsen auf, Und\nFels und Meer wird fortgerissen Im ewig schnellem Sphaerenlauf.\nMICHAEL: Und Stuerme brausen um die Wette Vom Meer aufs Land, vom Land\naufs Meer, und bilden wuetend eine Kette Der tiefsten Wirkung rings umher.\nDa flammt ein blitzendes Verheeren Dem Pfade vor des Donnerschlags. Doch\ndeine Boten, Herr, verehren Das sanfte Wandeln deines Tags.";
    @Rule
    public final TemporaryFolder temporaryFolder = new TemporaryFolder();
    private FileCache fileCache;
    private final PermanentBlobKey permanentBlobKey = new PermanentBlobKey();
    private final PermanentBlobService blobService = new PermanentBlobService(){

        public File getFile(JobID jobId, PermanentBlobKey key) throws IOException {
            if (key.equals((Object)FileCacheDirectoriesTest.this.permanentBlobKey)) {
                java.nio.file.Path directory = FileCacheDirectoriesTest.this.temporaryFolder.newFolder("zipArchive").toPath();
                java.nio.file.Path containedFile = directory.resolve("cacheFile");
                Files.copy(new ByteArrayInputStream(FileCacheDirectoriesTest.testFileContent.getBytes(StandardCharsets.UTF_8)), containedFile, new CopyOption[0]);
                Path zipPath = FileUtils.compressDirectory((Path)new Path(directory.toString()), (Path)new Path(directory + ".zip"));
                return new File(zipPath.getPath());
            }
            throw new IllegalArgumentException("This service contains only entry for " + FileCacheDirectoriesTest.this.permanentBlobKey);
        }

        public void close() throws IOException {
        }
    };
    private static final int CLEANUP_INTERVAL = 1000;
    private DeleteCapturingDirectScheduledExecutorService executorService = new DeleteCapturingDirectScheduledExecutorService();

    @Before
    public void setup() throws Exception {
        this.fileCache = new FileCache(new String[]{this.temporaryFolder.newFolder().getAbsolutePath()}, this.blobService, (ScheduledExecutorService)this.executorService, 1000L);
    }

    @After
    public void shutdown() {
        this.fileCache.shutdown();
        if (this.executorService.lastDeleteProcess != null) {
            this.executorService.lastDeleteProcess.run();
        }
    }

    @Test
    public void testDirectoryDownloadedFromBlob() throws Exception {
        DistributedCache.DistributedCacheEntry entry = new DistributedCache.DistributedCacheEntry("test_file", Boolean.valueOf(false), InstantiationUtil.serializeObject((Object)this.permanentBlobKey), true);
        this.testDirectoryDownloaded(entry);
    }

    @Test
    public void testDirectoryDownloadedFromDFS() throws Exception {
        String zippedFile = this.blobService.getFile(new JobID(), this.permanentBlobKey).getAbsolutePath();
        DistributedCache.DistributedCacheEntry entry = new DistributedCache.DistributedCacheEntry(zippedFile, Boolean.valueOf(false), null, true);
        this.testDirectoryDownloaded(entry);
    }

    @Test
    public void testDirectoryCleanUp() throws Exception {
        JobID jobID = new JobID();
        ExecutionAttemptID attemptID1 = ExecutionGraphTestUtils.createExecutionAttemptId();
        ExecutionAttemptID attemptID2 = ExecutionGraphTestUtils.createExecutionAttemptId();
        String fileName = "test_file";
        DistributedCache.DistributedCacheEntry entry = new DistributedCache.DistributedCacheEntry("test_file", Boolean.valueOf(false), InstantiationUtil.serializeObject((Object)this.permanentBlobKey), true);
        Future copyResult = this.fileCache.createTmpFile("test_file", entry, jobID, attemptID1);
        this.fileCache.createTmpFile("test_file", entry, jobID, attemptID2);
        Path dstPath = (Path)copyResult.get();
        FileSystem fs = dstPath.getFileSystem();
        FileStatus fileStatus = fs.getFileStatus(dstPath);
        Path cacheFile = new Path(dstPath, "cacheFile");
        Assert.assertTrue((boolean)fileStatus.isDir());
        Assert.assertTrue((boolean)fs.exists(cacheFile));
        this.fileCache.releaseJob(jobID, attemptID1);
        Assert.assertTrue((boolean)fileStatus.isDir());
        Assert.assertTrue((boolean)fs.exists(cacheFile));
        this.fileCache.releaseJob(jobID, attemptID2);
        Assert.assertTrue((boolean)fileStatus.isDir());
        Assert.assertTrue((boolean)fs.exists(cacheFile));
        Assert.assertEquals((long)1000L, (long)this.executorService.lastDelayMillis);
        this.executorService.lastDeleteProcess.run();
        Assert.assertFalse((boolean)fs.exists(dstPath));
        Assert.assertFalse((boolean)fs.exists(cacheFile));
    }

    private void testDirectoryDownloaded(DistributedCache.DistributedCacheEntry entry) throws Exception {
        JobID jobID = new JobID();
        ExecutionAttemptID attemptID = ExecutionGraphTestUtils.createExecutionAttemptId();
        String fileName = "test_file";
        Future copyResult = this.fileCache.createTmpFile("test_file", entry, jobID, attemptID);
        Path dstPath = (Path)copyResult.get();
        FileSystem fs = dstPath.getFileSystem();
        FileStatus fileStatus = fs.getFileStatus(dstPath);
        Assert.assertTrue((boolean)fileStatus.isDir());
        Path cacheFile = new Path(dstPath, "cacheFile");
        Assert.assertTrue((boolean)fs.exists(cacheFile));
        String actualContent = FileUtils.readFileUtf8((File)new File(cacheFile.getPath()));
        Assert.assertEquals((Object)testFileContent, (Object)actualContent);
    }

    private final class DeleteCapturingDirectScheduledExecutorService
    extends DirectScheduledExecutorService {
        FileCache.DeleteProcess lastDeleteProcess;
        long lastDelayMillis;

        private DeleteCapturingDirectScheduledExecutorService() {
        }

        @Override
        public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
            if (command instanceof FileCache.DeleteProcess) {
                Assert.assertNull((String)"Multiple delete process registered", (Object)this.lastDeleteProcess);
                this.lastDeleteProcess = (FileCache.DeleteProcess)command;
                this.lastDelayMillis = unit.toMillis(delay);
                return super.schedule(() -> {}, delay, unit);
            }
            return super.schedule(command, delay, unit);
        }
    }
}

