/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.jobmanager;

import java.io.File;
import java.io.FilenameFilter;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestartStrategyOptions;
import org.apache.flink.configuration.UnmodifiableConfiguration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.blob.BlobClient;
import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.runtime.client.JobSubmissionException;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable;
import org.apache.flink.runtime.testtasks.FailingBlockingInvokable;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.testutils.MiniClusterResource;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class BlobsCleanupITCase
extends TestLogger {
    private static final long RETRY_INTERVAL = 100L;
    @ClassRule
    public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
    private static MiniClusterResource miniClusterResource;
    private static UnmodifiableConfiguration configuration;
    private static File blobBaseDir;

    @BeforeClass
    public static void setup() throws Exception {
        blobBaseDir = TEMPORARY_FOLDER.newFolder();
        Configuration cfg = new Configuration();
        cfg.set(BlobServerOptions.STORAGE_DIRECTORY, (Object)blobBaseDir.getAbsolutePath());
        cfg.set(RestartStrategyOptions.RESTART_STRATEGY, (Object)RestartStrategyOptions.RestartStrategyType.FIXED_DELAY.getMainValue());
        cfg.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, (Object)1);
        cfg.set(BlobServerOptions.CLEANUP_INTERVAL, (Object)1L);
        configuration = new UnmodifiableConfiguration(cfg);
        miniClusterResource = new MiniClusterResource(new MiniClusterResourceConfiguration.Builder().setNumberSlotsPerTaskManager(2).setNumberTaskManagers(1).setConfiguration((Configuration)configuration).build());
        miniClusterResource.before();
    }

    @AfterClass
    public static void teardown() {
        if (miniClusterResource != null) {
            miniClusterResource.after();
        }
    }

    @Test
    public void testBlobServerCleanupFinishedJob() throws Exception {
        this.testBlobServerCleanup(TestCase.JOB_FINISHES_SUCESSFULLY);
    }

    @Test
    public void testBlobServerCleanupCancelledJob() throws Exception {
        this.testBlobServerCleanup(TestCase.JOB_IS_CANCELLED);
    }

    @Test
    public void testBlobServerCleanupFailedJob() throws Exception {
        this.testBlobServerCleanup(TestCase.JOB_FAILS);
    }

    @Test
    public void testBlobServerCleanupFailedSubmission() throws Exception {
        this.testBlobServerCleanup(TestCase.JOB_SUBMISSION_FAILS);
    }

    private void testBlobServerCleanup(TestCase testCase) throws Exception {
        MiniCluster miniCluster = miniClusterResource.getMiniCluster();
        int numTasks = 2;
        Deadline timeout = Deadline.fromNow((Duration)Duration.ofSeconds(30L));
        JobGraph jobGraph = this.createJobGraph(testCase, 2);
        JobID jid = jobGraph.getJobID();
        File tempBlob = File.createTempFile("Required", ".jar");
        int blobPort = miniCluster.getClusterInformation().getBlobServerPort();
        List keys = BlobClient.uploadFiles((InetSocketAddress)new InetSocketAddress("localhost", blobPort), (Configuration)configuration, (JobID)jid, Collections.singletonList(new Path(tempBlob.getAbsolutePath())));
        Assert.assertThat((Object)keys, (Matcher)Matchers.hasSize((int)1));
        jobGraph.addUserJarBlobKey((PermanentBlobKey)keys.get(0));
        if (testCase == TestCase.JOB_SUBMISSION_FAILS) {
            jobGraph.addUserJarBlobKey(new PermanentBlobKey());
        }
        CompletableFuture submissionFuture = miniCluster.submitJob(jobGraph);
        if (testCase == TestCase.JOB_SUBMISSION_FAILS) {
            try {
                submissionFuture.get();
                Assert.fail((String)"Expected job submission failure.");
            }
            catch (ExecutionException e) {
                Assert.assertThat((Object)ExceptionUtils.findThrowable((Throwable)e, JobSubmissionException.class).isPresent(), (Matcher)Matchers.is((Object)true));
            }
        } else {
            JobResult jobResult;
            JobSubmissionResult jobSubmissionResult = (JobSubmissionResult)submissionFuture.get();
            Assert.assertThat((Object)jobSubmissionResult.getJobID(), (Matcher)Matchers.is((Object)jid));
            CompletableFuture resultFuture = miniCluster.requestJobResult(jid);
            if (testCase == TestCase.JOB_FAILS) {
                FailingBlockingInvokable.unblock();
                jobResult = (JobResult)resultFuture.get();
                Assert.assertThat((Object)jobResult.isSuccess(), (Matcher)Matchers.is((Object)false));
                Assert.assertThat((Object)jobResult.getApplicationStatus(), (Matcher)Matchers.is((Object)ApplicationStatus.FAILED));
            } else if (testCase == TestCase.JOB_IS_CANCELLED) {
                miniCluster.cancelJob(jid);
                jobResult = (JobResult)resultFuture.get();
                Assert.assertThat((Object)jobResult.isSuccess(), (Matcher)Matchers.is((Object)false));
                Assert.assertThat((Object)jobResult.getApplicationStatus(), (Matcher)Matchers.is((Object)ApplicationStatus.CANCELED));
            } else {
                jobResult = (JobResult)resultFuture.get();
                Throwable cause = jobResult.getSerializedThrowable().map(throwable -> throwable.deserializeError(((Object)((Object)this)).getClass().getClassLoader())).orElse(null);
                Assert.assertThat((String)ExceptionUtils.stringifyException((Throwable)cause), (Object)jobResult.isSuccess(), (Matcher)Matchers.is((Object)true));
            }
        }
        File[] blobDirs = blobBaseDir.listFiles((dir, name) -> name.startsWith("blobStore-"));
        Assert.assertNotNull((Object)blobDirs);
        for (File blobDir : blobDirs) {
            BlobsCleanupITCase.waitForEmptyBlobDir(blobDir, timeout.timeLeft());
        }
    }

    @Nonnull
    private JobGraph createJobGraph(TestCase testCase, int numTasks) {
        JobVertex source = new JobVertex("Source");
        if (testCase == TestCase.JOB_FAILS) {
            source.setInvokableClass(FailingBlockingInvokable.class);
        } else if (testCase == TestCase.JOB_IS_CANCELLED) {
            source.setInvokableClass(BlockingNoOpInvokable.class);
        } else {
            source.setInvokableClass(NoOpInvokable.class);
        }
        source.setParallelism(numTasks);
        return JobGraphBuilder.newStreamingJobGraphBuilder().setJobId(new JobID(0L, (long)testCase.ordinal())).addJobVertex(source).build();
    }

    private static void waitForEmptyBlobDir(File blobDir, Duration remaining) throws InterruptedException {
        Object[] blobDirContents;
        long deadline = System.currentTimeMillis() + remaining.toMillis();
        FilenameFilter jobDirFilter = (dir, name) -> name.startsWith("job_");
        do {
            if ((blobDirContents = blobDir.list(jobDirFilter)) == null || blobDirContents.length == 0) {
                return;
            }
            Thread.sleep(100L);
        } while (System.currentTimeMillis() < deadline);
        Assert.fail((String)("Timeout while waiting for " + blobDir.getAbsolutePath() + " to become empty. Current contents: " + Arrays.toString(blobDirContents)));
    }

    private static enum TestCase {
        JOB_FINISHES_SUCESSFULLY,
        JOB_IS_CANCELLED,
        JOB_FAILS,
        JOB_SUBMISSION_FAILS;

    }
}

