package org.apache.flink.runtime.rest.handler.job;

import java.io.IOException;
import java.io.ObjectOutputStream;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.blob.VoidBlobStore;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.net.SSLUtilsTest;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.webmonitor.TestingDispatcherGateway;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
import org.apache.flink.testutils.junit.utils.TempDirUtils;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.concurrent.Executors;
import org.apache.flink.util.concurrent.FutureUtils;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.CompletableFutureAssert;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;

@ExtendWith({ParameterizedTestExtension.class})
/* loaded from: input_file:org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.class */
public class JobSubmitHandlerTest {

    @TempDir
    private Path temporaryFolder;
    private final Configuration configuration;
    private BlobServer blobServer;

    @Parameters(name = "SSL enabled: {0}")
    public static Iterable<Tuple2<Boolean, String>> data() {
        ArrayList arrayList = new ArrayList(3);
        arrayList.add(Tuple2.of(false, "no SSL"));
        Iterator<String> it = SSLUtilsTest.AVAILABLE_SSL_PROVIDERS.iterator();
        while (it.hasNext()) {
            arrayList.add(Tuple2.of(true, it.next()));
        }
        return arrayList;
    }

    public JobSubmitHandlerTest(Tuple2<Boolean, String> tuple2) {
        this.configuration = ((Boolean) tuple2.f0).booleanValue() ? SSLUtilsTest.createInternalSslConfigWithKeyAndTrustStores((String) tuple2.f1) : new Configuration();
    }

    @BeforeEach
    void setup() throws IOException {
        this.blobServer = new BlobServer(new Configuration(this.configuration), TempDirUtils.newFolder(this.temporaryFolder), new VoidBlobStore());
        this.blobServer.start();
    }

    @AfterEach
    void teardown() throws IOException {
        if (this.blobServer != null) {
            this.blobServer.close();
        }
    }

    @TestTemplate
    void testSerializationFailureHandling() throws Exception {
        Path path = TempDirUtils.newFile(this.temporaryFolder).toPath();
        TestingDispatcherGateway build = TestingDispatcherGateway.newBuilder().setSubmitFunction(jobGraph -> {
            return CompletableFuture.completedFuture(Acknowledge.get());
        }).build();
        JobSubmitHandler jobSubmitHandler = new JobSubmitHandler(() -> {
            return CompletableFuture.completedFuture(build);
        }, RpcUtils.INF_TIMEOUT, Collections.emptyMap(), Executors.directExecutor(), this.configuration);
        JobSubmitRequestBody jobSubmitRequestBody = new JobSubmitRequestBody(path.toString(), Collections.emptyList(), Collections.emptyList());
        Assertions.assertThatThrownBy(() -> {
            jobSubmitHandler.handleRequest(HandlerRequest.create(jobSubmitRequestBody, EmptyMessageParameters.getInstance()), build);
        }).isInstanceOf(RestHandlerException.class).satisfies(new ThrowingConsumer[]{th -> {
            Assertions.assertThat(((RestHandlerException) th).getHttpResponseStatus()).isEqualTo(HttpResponseStatus.BAD_REQUEST);
        }});
    }

    @TestTemplate
    void testSuccessfulJobSubmission() throws Exception {
        Path path = TempDirUtils.newFile(this.temporaryFolder).toPath();
        ObjectOutputStream objectOutputStream = new ObjectOutputStream(Files.newOutputStream(path, new OpenOption[0]));
        Throwable th = null;
        try {
            try {
                objectOutputStream.writeObject(JobGraphTestUtils.emptyJobGraph());
                if (objectOutputStream != null) {
                    if (0 != 0) {
                        try {
                            objectOutputStream.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        objectOutputStream.close();
                    }
                }
                TestingDispatcherGateway.Builder newBuilder = TestingDispatcherGateway.newBuilder();
                newBuilder.setBlobServerPort(this.blobServer.getPort()).setSubmitFunction(jobGraph -> {
                    return CompletableFuture.completedFuture(Acknowledge.get());
                }).setHostname("localhost");
                TestingDispatcherGateway build = newBuilder.build();
                new JobSubmitHandler(() -> {
                    return CompletableFuture.completedFuture(build);
                }, RpcUtils.INF_TIMEOUT, Collections.emptyMap(), Executors.directExecutor(), this.configuration).handleRequest(HandlerRequest.create(new JobSubmitRequestBody(path.getFileName().toString(), Collections.emptyList(), Collections.emptyList()), EmptyMessageParameters.getInstance(), Collections.singleton(path.toFile())), build).get();
            } finally {
            }
        } catch (Throwable th3) {
            if (objectOutputStream != null) {
                if (th != null) {
                    try {
                        objectOutputStream.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    objectOutputStream.close();
                }
            }
            throw th3;
        }
    }

    @TestTemplate
    void testRejectionOnCountMismatch() throws Exception {
        Path path = TempDirUtils.newFile(this.temporaryFolder).toPath();
        ObjectOutputStream objectOutputStream = new ObjectOutputStream(Files.newOutputStream(path, new OpenOption[0]));
        Throwable th = null;
        try {
            try {
                objectOutputStream.writeObject(JobGraphTestUtils.emptyJobGraph());
                if (objectOutputStream != null) {
                    if (0 != 0) {
                        try {
                            objectOutputStream.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        objectOutputStream.close();
                    }
                }
                Path path2 = TempDirUtils.newFile(this.temporaryFolder).toPath();
                TestingDispatcherGateway.Builder newBuilder = TestingDispatcherGateway.newBuilder();
                newBuilder.setBlobServerPort(this.blobServer.getPort()).setSubmitFunction(jobGraph -> {
                    return CompletableFuture.completedFuture(Acknowledge.get());
                }).setHostname("localhost");
                TestingDispatcherGateway build = newBuilder.build();
                try {
                    new JobSubmitHandler(() -> {
                        return CompletableFuture.completedFuture(build);
                    }, RpcUtils.INF_TIMEOUT, Collections.emptyMap(), Executors.directExecutor(), this.configuration).handleRequest(HandlerRequest.create(new JobSubmitRequestBody(path.getFileName().toString(), Collections.emptyList(), Collections.emptyList()), EmptyMessageParameters.getInstance(), Arrays.asList(path.toFile(), path2.toFile())), build).get();
                } catch (Exception e) {
                    ExceptionUtils.findThrowable(e, th3 -> {
                        return (th3 instanceof RestHandlerException) && th3.getMessage().contains("count");
                    });
                }
            } catch (Throwable th4) {
                th = th4;
                throw th4;
            }
        } catch (Throwable th5) {
            if (objectOutputStream != null) {
                if (th != null) {
                    try {
                        objectOutputStream.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    objectOutputStream.close();
                }
            }
            throw th5;
        }
    }

    @TestTemplate
    void testFileHandling() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        TestingDispatcherGateway build = TestingDispatcherGateway.newBuilder().setBlobServerPort(this.blobServer.getPort()).setSubmitFunction(jobGraph -> {
            completableFuture.complete(jobGraph);
            return CompletableFuture.completedFuture(Acknowledge.get());
        }).build();
        JobSubmitHandler jobSubmitHandler = new JobSubmitHandler(() -> {
            return CompletableFuture.completedFuture(build);
        }, RpcUtils.INF_TIMEOUT, Collections.emptyMap(), Executors.directExecutor(), this.configuration);
        Path path = TempDirUtils.newFile(this.temporaryFolder).toPath();
        Path path2 = TempDirUtils.newFile(this.temporaryFolder).toPath();
        Path path3 = TempDirUtils.newFile(this.temporaryFolder).toPath();
        JobGraph emptyJobGraph = JobGraphTestUtils.emptyJobGraph();
        emptyJobGraph.addUserArtifact("entry", new DistributedCache.DistributedCacheEntry("random", false));
        ObjectOutputStream objectOutputStream = new ObjectOutputStream(Files.newOutputStream(path, new OpenOption[0]));
        Throwable th = null;
        try {
            try {
                objectOutputStream.writeObject(emptyJobGraph);
                if (objectOutputStream != null) {
                    if (0 != 0) {
                        try {
                            objectOutputStream.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        objectOutputStream.close();
                    }
                }
                jobSubmitHandler.handleRequest(HandlerRequest.create(new JobSubmitRequestBody(path.getFileName().toString(), Collections.singletonList(path2.getFileName().toString()), Collections.singleton(new JobSubmitRequestBody.DistributedCacheFile("entry", path3.getFileName().toString()))), EmptyMessageParameters.getInstance(), Arrays.asList(path.toFile(), path2.toFile(), path3.toFile())), build).get();
                ((CompletableFutureAssert) Assertions.assertThat(completableFuture).as("No JobGraph was submitted.", new Object[0])).isCompleted();
                JobGraph jobGraph2 = (JobGraph) completableFuture.get();
                Assertions.assertThat(jobGraph2.getUserJarBlobKeys()).hasSize(1);
                Assertions.assertThat(jobGraph2.getUserArtifacts()).hasSize(1);
                Assertions.assertThat(((DistributedCache.DistributedCacheEntry) jobGraph2.getUserArtifacts().get("entry")).blobKey).isNotNull();
            } finally {
            }
        } catch (Throwable th3) {
            if (objectOutputStream != null) {
                if (th != null) {
                    try {
                        objectOutputStream.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    objectOutputStream.close();
                }
            }
            throw th3;
        }
    }

    @TestTemplate
    void testFailedJobSubmission() throws Exception {
        TestingDispatcherGateway build = TestingDispatcherGateway.newBuilder().setSubmitFunction(jobGraph -> {
            return FutureUtils.completedExceptionally(new Exception("test"));
        }).build();
        JobSubmitHandler jobSubmitHandler = new JobSubmitHandler(() -> {
            return CompletableFuture.completedFuture(build);
        }, RpcUtils.INF_TIMEOUT, Collections.emptyMap(), Executors.directExecutor(), this.configuration);
        Path path = TempDirUtils.newFile(this.temporaryFolder).toPath();
        JobGraph emptyJobGraph = JobGraphTestUtils.emptyJobGraph();
        ObjectOutputStream objectOutputStream = new ObjectOutputStream(Files.newOutputStream(path, new OpenOption[0]));
        Throwable th = null;
        try {
            try {
                objectOutputStream.writeObject(emptyJobGraph);
                if (objectOutputStream != null) {
                    if (0 != 0) {
                        try {
                            objectOutputStream.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        objectOutputStream.close();
                    }
                }
                FlinkAssertions.assertThatFuture(jobSubmitHandler.handleRequest(HandlerRequest.create(new JobSubmitRequestBody(path.getFileName().toString(), Collections.emptyList(), Collections.emptyList()), EmptyMessageParameters.getInstance(), Collections.singletonList(path.toFile())), build)).eventuallyFailsWith(Exception.class).withMessageContaining("test");
            } finally {
            }
        } catch (Throwable th3) {
            if (objectOutputStream != null) {
                if (th != null) {
                    try {
                        objectOutputStream.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    objectOutputStream.close();
                }
            }
            throw th3;
        }
    }
}
