package org.apache.flink.runtime.jobmanager;

import java.io.File;
import java.io.FilenameFilter;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestartStrategyOptions;
import org.apache.flink.configuration.UnmodifiableConfiguration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.blob.BlobClient;
import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.runtime.client.JobSubmissionException;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable;
import org.apache.flink.runtime.testtasks.FailingBlockingInvokable;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.testutils.MiniClusterResource;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.testutils.junit.category.AlsoRunWithLegacyScheduler;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matchers;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TemporaryFolder;

@Category({AlsoRunWithLegacyScheduler.class})
/* loaded from: input_file:org/apache/flink/runtime/jobmanager/BlobsCleanupITCase.class */
public class BlobsCleanupITCase extends TestLogger {
    private static final long RETRY_INTERVAL = 100;

    @ClassRule
    public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
    private static MiniClusterResource miniClusterResource;
    private static UnmodifiableConfiguration configuration;
    private static File blobBaseDir;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/jobmanager/BlobsCleanupITCase$TestCase.class */
    public enum TestCase {
        JOB_FINISHES_SUCESSFULLY,
        JOB_IS_CANCELLED,
        JOB_FAILS,
        JOB_SUBMISSION_FAILS
    }

    @BeforeClass
    public static void setup() throws Exception {
        blobBaseDir = TEMPORARY_FOLDER.newFolder();
        Configuration configuration2 = new Configuration();
        configuration2.setString(BlobServerOptions.STORAGE_DIRECTORY, blobBaseDir.getAbsolutePath());
        configuration2.setString(RestartStrategyOptions.RESTART_STRATEGY, "fixeddelay");
        configuration2.setInteger(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 1);
        configuration2.setLong(BlobServerOptions.CLEANUP_INTERVAL, 1L);
        configuration = new UnmodifiableConfiguration(configuration2);
        miniClusterResource = new MiniClusterResource(new MiniClusterResourceConfiguration.Builder().setNumberSlotsPerTaskManager(2).setNumberTaskManagers(1).setConfiguration(configuration).build());
        miniClusterResource.before();
    }

    @AfterClass
    public static void teardown() {
        if (miniClusterResource != null) {
            miniClusterResource.after();
        }
    }

    @Test
    public void testBlobServerCleanupFinishedJob() throws Exception {
        testBlobServerCleanup(TestCase.JOB_FINISHES_SUCESSFULLY);
    }

    @Test
    public void testBlobServerCleanupCancelledJob() throws Exception {
        testBlobServerCleanup(TestCase.JOB_IS_CANCELLED);
    }

    @Test
    public void testBlobServerCleanupFailedJob() throws Exception {
        testBlobServerCleanup(TestCase.JOB_FAILS);
    }

    @Test
    public void testBlobServerCleanupFailedSubmission() throws Exception {
        testBlobServerCleanup(TestCase.JOB_SUBMISSION_FAILS);
    }

    private void testBlobServerCleanup(TestCase testCase) throws Exception {
        MiniCluster miniCluster = miniClusterResource.getMiniCluster();
        Deadline fromNow = Deadline.fromNow(Duration.ofSeconds(30L));
        JobGraph createJobGraph = createJobGraph(testCase, 2);
        JobID jobID = createJobGraph.getJobID();
        List uploadFiles = BlobClient.uploadFiles(new InetSocketAddress("localhost", miniCluster.getClusterInformation().getBlobServerPort()), configuration, jobID, Collections.singletonList(new Path(File.createTempFile("Required", ".jar").getAbsolutePath())));
        Assert.assertThat(uploadFiles, Matchers.hasSize(1));
        createJobGraph.addUserJarBlobKey((PermanentBlobKey) uploadFiles.get(0));
        if (testCase == TestCase.JOB_SUBMISSION_FAILS) {
            createJobGraph.addUserJarBlobKey(new PermanentBlobKey());
        }
        CompletableFuture submitJob = miniCluster.submitJob(createJobGraph);
        if (testCase == TestCase.JOB_SUBMISSION_FAILS) {
            try {
                submitJob.get();
                Assert.fail("Expected job submission failure.");
            } catch (ExecutionException e) {
                Assert.assertThat(Boolean.valueOf(ExceptionUtils.findThrowable(e, JobSubmissionException.class).isPresent()), Matchers.is(true));
            }
        } else {
            Assert.assertThat(((JobSubmissionResult) submitJob.get()).getJobID(), Matchers.is(jobID));
            CompletableFuture requestJobResult = miniCluster.requestJobResult(jobID);
            if (testCase == TestCase.JOB_FAILS) {
                FailingBlockingInvokable.unblock();
                JobResult jobResult = (JobResult) requestJobResult.get();
                Assert.assertThat(Boolean.valueOf(jobResult.isSuccess()), Matchers.is(false));
                Assert.assertThat(jobResult.getApplicationStatus(), Matchers.is(ApplicationStatus.FAILED));
            } else if (testCase == TestCase.JOB_IS_CANCELLED) {
                miniCluster.cancelJob(jobID);
                JobResult jobResult2 = (JobResult) requestJobResult.get();
                Assert.assertThat(Boolean.valueOf(jobResult2.isSuccess()), Matchers.is(false));
                Assert.assertThat(jobResult2.getApplicationStatus(), Matchers.is(ApplicationStatus.CANCELED));
            } else {
                JobResult jobResult3 = (JobResult) requestJobResult.get();
                Assert.assertThat(ExceptionUtils.stringifyException((Throwable) jobResult3.getSerializedThrowable().map(serializedThrowable -> {
                    return serializedThrowable.deserializeError(getClass().getClassLoader());
                }).orElse(null)), Boolean.valueOf(jobResult3.isSuccess()), Matchers.is(true));
            }
        }
        File[] listFiles = blobBaseDir.listFiles((file, str) -> {
            return str.startsWith("blobStore-");
        });
        Assert.assertNotNull(listFiles);
        for (File file2 : listFiles) {
            waitForEmptyBlobDir(file2, fromNow.timeLeft());
        }
    }

    @Nonnull
    private JobGraph createJobGraph(TestCase testCase, int i) {
        JobVertex jobVertex = new JobVertex("Source");
        if (testCase == TestCase.JOB_FAILS) {
            jobVertex.setInvokableClass(FailingBlockingInvokable.class);
        } else if (testCase == TestCase.JOB_IS_CANCELLED) {
            jobVertex.setInvokableClass(BlockingNoOpInvokable.class);
        } else {
            jobVertex.setInvokableClass(NoOpInvokable.class);
        }
        jobVertex.setParallelism(i);
        return new JobGraph("BlobCleanupTest", new JobVertex[]{jobVertex});
    }

    private static void waitForEmptyBlobDir(File file, Duration duration) throws InterruptedException {
        String[] list;
        long currentTimeMillis = System.currentTimeMillis() + duration.toMillis();
        FilenameFilter filenameFilter = (file2, str) -> {
            return str.startsWith("job_");
        };
        do {
            list = file.list(filenameFilter);
            if (list == null || list.length == 0) {
                return;
            } else {
                Thread.sleep(RETRY_INTERVAL);
            }
        } while (System.currentTimeMillis() < currentTimeMillis);
        Assert.fail("Timeout while waiting for " + file.getAbsolutePath() + " to become empty. Current contents: " + Arrays.toString(list));
    }
}
