/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.rest.handler.job;

import java.io.IOException;
import java.io.ObjectOutputStream;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.blob.BlobStore;
import org.apache.flink.runtime.blob.VoidBlobStore;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.net.SSLUtilsTest;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
import org.apache.flink.runtime.rest.handler.job.JobSubmitHandler;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.MessageParameters;
import org.apache.flink.runtime.rest.messages.RequestBody;
import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.webmonitor.TestingDispatcherGateway;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class JobSubmitHandlerTest
extends TestLogger {
    @ClassRule
    public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
    private final Configuration configuration;
    private BlobServer blobServer;

    @Parameterized.Parameters(name="SSL enabled: {0}")
    public static Iterable<Tuple2<Boolean, String>> data() {
        ArrayList<Tuple2<Boolean, String>> parameters = new ArrayList<Tuple2<Boolean, String>>(3);
        parameters.add(Tuple2.of((Object)false, (Object)"no SSL"));
        for (String sslProvider : SSLUtilsTest.AVAILABLE_SSL_PROVIDERS) {
            parameters.add((Tuple2<Boolean, String>)Tuple2.of((Object)true, (Object)sslProvider));
        }
        return parameters;
    }

    public JobSubmitHandlerTest(Tuple2<Boolean, String> withSsl) {
        this.configuration = (Boolean)withSsl.f0 != false ? SSLUtilsTest.createInternalSslConfigWithKeyAndTrustStores((String)withSsl.f1) : new Configuration();
    }

    @Before
    public void setup() throws IOException {
        Configuration config = new Configuration(this.configuration);
        config.setString(BlobServerOptions.STORAGE_DIRECTORY, TEMPORARY_FOLDER.newFolder().getAbsolutePath());
        this.blobServer = new BlobServer(config, (BlobStore)new VoidBlobStore());
        this.blobServer.start();
    }

    @After
    public void teardown() throws IOException {
        if (this.blobServer != null) {
            this.blobServer.close();
        }
    }

    @Test
    public void testSerializationFailureHandling() throws Exception {
        Path jobGraphFile = TEMPORARY_FOLDER.newFile().toPath();
        TestingDispatcherGateway mockGateway = new TestingDispatcherGateway.Builder().setSubmitFunction(jobGraph -> CompletableFuture.completedFuture(Acknowledge.get())).build();
        JobSubmitHandler handler = new JobSubmitHandler(() -> CompletableFuture.completedFuture(mockGateway), RpcUtils.INF_TIMEOUT, Collections.emptyMap(), (Executor)TestingUtils.defaultExecutor(), this.configuration);
        JobSubmitRequestBody request = new JobSubmitRequestBody(jobGraphFile.toString(), Collections.emptyList(), Collections.emptyList());
        try {
            handler.handleRequest(new HandlerRequest((RequestBody)request, (MessageParameters)EmptyMessageParameters.getInstance()), (DispatcherGateway)mockGateway);
            Assert.fail();
        }
        catch (RestHandlerException rhe) {
            Assert.assertEquals((Object)HttpResponseStatus.BAD_REQUEST, (Object)rhe.getHttpResponseStatus());
        }
    }

    @Test
    public void testSuccessfulJobSubmission() throws Exception {
        Path jobGraphFile = TEMPORARY_FOLDER.newFile().toPath();
        try (ObjectOutputStream objectOut = new ObjectOutputStream(Files.newOutputStream(jobGraphFile, new OpenOption[0]));){
            objectOut.writeObject(new JobGraph("testjob"));
        }
        TestingDispatcherGateway.Builder builder = new TestingDispatcherGateway.Builder();
        builder.setBlobServerPort(this.blobServer.getPort()).setSubmitFunction(jobGraph -> CompletableFuture.completedFuture(Acknowledge.get())).setHostname("localhost");
        TestingDispatcherGateway mockGateway = builder.build();
        JobSubmitHandler handler = new JobSubmitHandler(() -> CompletableFuture.completedFuture(mockGateway), RpcUtils.INF_TIMEOUT, Collections.emptyMap(), (Executor)TestingUtils.defaultExecutor(), this.configuration);
        JobSubmitRequestBody request = new JobSubmitRequestBody(jobGraphFile.getFileName().toString(), Collections.emptyList(), Collections.emptyList());
        handler.handleRequest(new HandlerRequest((RequestBody)request, (MessageParameters)EmptyMessageParameters.getInstance(), Collections.emptyMap(), Collections.emptyMap(), Collections.singleton(jobGraphFile.toFile())), (DispatcherGateway)mockGateway).get();
    }

    @Test
    public void testRejectionOnCountMismatch() throws Exception {
        Path jobGraphFile = TEMPORARY_FOLDER.newFile().toPath();
        try (ObjectOutputStream objectOut = new ObjectOutputStream(Files.newOutputStream(jobGraphFile, new OpenOption[0]));){
            objectOut.writeObject(new JobGraph("testjob"));
        }
        Path countExceedingFile = TEMPORARY_FOLDER.newFile().toPath();
        TestingDispatcherGateway.Builder builder = new TestingDispatcherGateway.Builder();
        builder.setBlobServerPort(this.blobServer.getPort()).setSubmitFunction(jobGraph -> CompletableFuture.completedFuture(Acknowledge.get())).setHostname("localhost");
        TestingDispatcherGateway mockGateway = builder.build();
        JobSubmitHandler handler = new JobSubmitHandler(() -> CompletableFuture.completedFuture(mockGateway), RpcUtils.INF_TIMEOUT, Collections.emptyMap(), (Executor)TestingUtils.defaultExecutor(), this.configuration);
        JobSubmitRequestBody request = new JobSubmitRequestBody(jobGraphFile.getFileName().toString(), Collections.emptyList(), Collections.emptyList());
        try {
            handler.handleRequest(new HandlerRequest((RequestBody)request, (MessageParameters)EmptyMessageParameters.getInstance(), Collections.emptyMap(), Collections.emptyMap(), Arrays.asList(jobGraphFile.toFile(), countExceedingFile.toFile())), (DispatcherGateway)mockGateway).get();
        }
        catch (Exception e) {
            ExceptionUtils.findThrowable((Throwable)e, candidate -> candidate instanceof RestHandlerException && candidate.getMessage().contains("count"));
        }
    }

    @Test
    public void testFileHandling() throws Exception {
        String dcEntryName = "entry";
        CompletableFuture submittedJobGraphFuture = new CompletableFuture();
        TestingDispatcherGateway dispatcherGateway = new TestingDispatcherGateway.Builder().setBlobServerPort(this.blobServer.getPort()).setSubmitFunction(submittedJobGraph -> {
            submittedJobGraphFuture.complete(submittedJobGraph);
            return CompletableFuture.completedFuture(Acknowledge.get());
        }).build();
        JobSubmitHandler handler = new JobSubmitHandler(() -> CompletableFuture.completedFuture(dispatcherGateway), RpcUtils.INF_TIMEOUT, Collections.emptyMap(), (Executor)TestingUtils.defaultExecutor(), this.configuration);
        Path jobGraphFile = TEMPORARY_FOLDER.newFile().toPath();
        Path jarFile = TEMPORARY_FOLDER.newFile().toPath();
        Path artifactFile = TEMPORARY_FOLDER.newFile().toPath();
        JobGraph jobGraph = new JobGraph(new JobVertex[0]);
        jobGraph.addUserArtifact("entry", new DistributedCache.DistributedCacheEntry("random", Boolean.valueOf(false)));
        try (ObjectOutputStream objectOut = new ObjectOutputStream(Files.newOutputStream(jobGraphFile, new OpenOption[0]));){
            objectOut.writeObject(jobGraph);
        }
        JobSubmitRequestBody request = new JobSubmitRequestBody(jobGraphFile.getFileName().toString(), Collections.singletonList(jarFile.getFileName().toString()), Collections.singleton(new JobSubmitRequestBody.DistributedCacheFile("entry", artifactFile.getFileName().toString())));
        handler.handleRequest(new HandlerRequest((RequestBody)request, (MessageParameters)EmptyMessageParameters.getInstance(), Collections.emptyMap(), Collections.emptyMap(), Arrays.asList(jobGraphFile.toFile(), jarFile.toFile(), artifactFile.toFile())), (DispatcherGateway)dispatcherGateway).get();
        Assert.assertTrue((String)"No JobGraph was submitted.", (boolean)submittedJobGraphFuture.isDone());
        JobGraph submittedJobGraph2 = (JobGraph)submittedJobGraphFuture.get();
        Assert.assertEquals((long)1L, (long)submittedJobGraph2.getUserJarBlobKeys().size());
        Assert.assertEquals((long)1L, (long)submittedJobGraph2.getUserArtifacts().size());
        Assert.assertNotNull((Object)((DistributedCache.DistributedCacheEntry)submittedJobGraph2.getUserArtifacts().get((Object)"entry")).blobKey);
    }

    @Test
    public void testFailedJobSubmission() throws Exception {
        String errorMessage = "test";
        TestingDispatcherGateway mockGateway = new TestingDispatcherGateway.Builder().setSubmitFunction(jobgraph -> FutureUtils.completedExceptionally((Throwable)new Exception("test"))).build();
        JobSubmitHandler handler = new JobSubmitHandler(() -> CompletableFuture.completedFuture(mockGateway), RpcUtils.INF_TIMEOUT, Collections.emptyMap(), (Executor)TestingUtils.defaultExecutor(), this.configuration);
        Path jobGraphFile = TEMPORARY_FOLDER.newFile().toPath();
        JobGraph jobGraph = new JobGraph("testjob");
        try (ObjectOutputStream objectOut = new ObjectOutputStream(Files.newOutputStream(jobGraphFile, new OpenOption[0]));){
            objectOut.writeObject(jobGraph);
        }
        JobSubmitRequestBody request = new JobSubmitRequestBody(jobGraphFile.getFileName().toString(), Collections.emptyList(), Collections.emptyList());
        try {
            handler.handleRequest(new HandlerRequest((RequestBody)request, (MessageParameters)EmptyMessageParameters.getInstance(), Collections.emptyMap(), Collections.emptyMap(), Collections.singletonList(jobGraphFile.toFile())), (DispatcherGateway)mockGateway).get();
        }
        catch (Exception e) {
            Throwable t = ExceptionUtils.stripExecutionException((Throwable)e);
            Assert.assertEquals((Object)"test", (Object)t.getMessage());
        }
    }
}

