/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.rest;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.io.Writer;
import java.net.HttpURLConnection;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.net.ssl.HttpsURLConnection;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSocketFactory;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import org.apache.commons.io.IOUtils;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.net.SSLUtils;
import org.apache.flink.runtime.net.SSLUtilsTest;
import org.apache.flink.runtime.rest.ConnectionClosedException;
import org.apache.flink.runtime.rest.HttpMethodWrapper;
import org.apache.flink.runtime.rest.RestClient;
import org.apache.flink.runtime.rest.RestClientConfiguration;
import org.apache.flink.runtime.rest.RestServerEndpoint;
import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
import org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandler;
import org.apache.flink.runtime.rest.handler.legacy.files.WebContentHandlerSpecification;
import org.apache.flink.runtime.rest.messages.ConversionException;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.MessageParameter;
import org.apache.flink.runtime.rest.messages.MessageParameters;
import org.apache.flink.runtime.rest.messages.MessagePathParameter;
import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
import org.apache.flink.runtime.rest.messages.RequestBody;
import org.apache.flink.runtime.rest.messages.ResponseBody;
import org.apache.flink.runtime.rest.util.RestClientException;
import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.TestingRestfulGateway;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.TooLongFrameException;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class RestServerEndpointITCase
extends TestLogger {
    private static final JobID PATH_JOB_ID = new JobID();
    private static final JobID QUERY_JOB_ID = new JobID();
    private static final String JOB_ID_KEY = "jobid";
    private static final Time timeout = Time.seconds((long)10L);
    private static final int TEST_REST_MAX_CONTENT_LENGTH = 4096;
    @Rule
    public TemporaryFolder temporaryFolder = new TemporaryFolder();
    private RestServerEndpoint serverEndpoint;
    private RestClient restClient;
    private TestUploadHandler testUploadHandler;
    private InetSocketAddress serverAddress;
    private final Configuration config;
    private SSLContext defaultSSLContext;
    private SSLSocketFactory defaultSSLSocketFactory;
    private TestHandler testHandler;

    public RestServerEndpointITCase(Configuration config) {
        this.config = Objects.requireNonNull(config);
    }

    @Parameterized.Parameters
    public static Collection<Object[]> data() throws Exception {
        Configuration config = RestServerEndpointITCase.getBaseConfig();
        String truststorePath = RestServerEndpointITCase.getTestResource("local127.truststore").getAbsolutePath();
        String keystorePath = RestServerEndpointITCase.getTestResource("local127.keystore").getAbsolutePath();
        Configuration sslConfig = new Configuration(config);
        sslConfig.setBoolean(SecurityOptions.SSL_REST_ENABLED, true);
        sslConfig.setString(SecurityOptions.SSL_REST_TRUSTSTORE, truststorePath);
        sslConfig.setString(SecurityOptions.SSL_REST_TRUSTSTORE_PASSWORD, "password");
        sslConfig.setString(SecurityOptions.SSL_REST_KEYSTORE, keystorePath);
        sslConfig.setString(SecurityOptions.SSL_REST_KEYSTORE_PASSWORD, "password");
        sslConfig.setString(SecurityOptions.SSL_REST_KEY_PASSWORD, "password");
        Configuration sslRestAuthConfig = new Configuration(sslConfig);
        sslRestAuthConfig.setBoolean(SecurityOptions.SSL_REST_AUTHENTICATION_ENABLED, true);
        Configuration sslPinningRestAuthConfig = new Configuration(sslRestAuthConfig);
        sslPinningRestAuthConfig.setString(SecurityOptions.SSL_REST_CERT_FINGERPRINT, SSLUtilsTest.getRestCertificateFingerprint(sslPinningRestAuthConfig, "flink.test"));
        return Arrays.asList({config}, {sslConfig}, {sslRestAuthConfig}, {sslPinningRestAuthConfig});
    }

    private static Configuration getBaseConfig() {
        String loopbackAddress = InetAddress.getLoopbackAddress().getHostAddress();
        Configuration config = new Configuration();
        config.setString(RestOptions.BIND_PORT, "0");
        config.setString(RestOptions.BIND_ADDRESS, loopbackAddress);
        config.setString(RestOptions.ADDRESS, loopbackAddress);
        config.setInteger(RestOptions.SERVER_MAX_CONTENT_LENGTH, 4096);
        config.setInteger(RestOptions.CLIENT_MAX_CONTENT_LENGTH, 4096);
        return config;
    }

    @Before
    public void setup() throws Exception {
        this.config.setString(WebOptions.UPLOAD_DIR, this.temporaryFolder.newFolder().getCanonicalPath());
        this.defaultSSLContext = SSLContext.getDefault();
        this.defaultSSLSocketFactory = HttpsURLConnection.getDefaultSSLSocketFactory();
        SSLContext sslClientContext = SSLUtils.createRestSSLContext((Configuration)this.config, (boolean)true);
        if (sslClientContext != null) {
            SSLContext.setDefault(sslClientContext);
            HttpsURLConnection.setDefaultSSLSocketFactory(sslClientContext.getSocketFactory());
        }
        RestServerEndpointConfiguration serverConfig = RestServerEndpointConfiguration.fromConfiguration((Configuration)this.config);
        RestClientConfiguration clientConfig = RestClientConfiguration.fromConfiguration((Configuration)this.config);
        TestingRestfulGateway mockRestfulGateway = new TestingRestfulGateway.Builder().build();
        GatewayRetriever mockGatewayRetriever = () -> CompletableFuture.completedFuture(mockRestfulGateway);
        this.testHandler = new TestHandler((GatewayRetriever<RestfulGateway>)mockGatewayRetriever, RpcUtils.INF_TIMEOUT);
        TestVersionHandler testVersionHandler = new TestVersionHandler((GatewayRetriever<? extends RestfulGateway>)mockGatewayRetriever, RpcUtils.INF_TIMEOUT);
        TestVersionSelectionHandler1 testVersionSelectionHandler1 = new TestVersionSelectionHandler1(mockGatewayRetriever, RpcUtils.INF_TIMEOUT);
        TestVersionSelectionHandler2 testVersionSelectionHandler2 = new TestVersionSelectionHandler2(mockGatewayRetriever, RpcUtils.INF_TIMEOUT);
        this.testUploadHandler = new TestUploadHandler(mockGatewayRetriever, RpcUtils.INF_TIMEOUT);
        StaticFileServerHandler staticFileServerHandler = new StaticFileServerHandler(mockGatewayRetriever, RpcUtils.INF_TIMEOUT, this.temporaryFolder.getRoot());
        List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers = Arrays.asList(Tuple2.of((Object)new TestHeaders(), (Object)((Object)this.testHandler)), Tuple2.of((Object)((Object)TestUploadHeaders.INSTANCE), (Object)((Object)this.testUploadHandler)), Tuple2.of((Object)testVersionHandler.getMessageHeaders(), (Object)((Object)testVersionHandler)), Tuple2.of((Object)testVersionSelectionHandler1.getMessageHeaders(), (Object)((Object)testVersionSelectionHandler1)), Tuple2.of((Object)testVersionSelectionHandler2.getMessageHeaders(), (Object)((Object)testVersionSelectionHandler2)), Tuple2.of((Object)WebContentHandlerSpecification.getInstance(), (Object)staticFileServerHandler));
        this.serverEndpoint = new TestRestServerEndpoint(serverConfig, handlers);
        this.restClient = new TestRestClient(clientConfig);
        this.serverEndpoint.start();
        this.serverAddress = this.serverEndpoint.getServerAddress();
    }

    @After
    public void teardown() throws Exception {
        if (this.defaultSSLContext != null) {
            SSLContext.setDefault(this.defaultSSLContext);
            HttpsURLConnection.setDefaultSSLSocketFactory(this.defaultSSLSocketFactory);
        }
        if (this.restClient != null) {
            this.restClient.shutdown(timeout);
            this.restClient = null;
        }
        if (this.serverEndpoint != null) {
            this.serverEndpoint.closeAsync().get(timeout.getSize(), timeout.getUnit());
            this.serverEndpoint = null;
        }
    }

    @Test
    public void testRequestInterleaving() throws Exception {
        HandlerBlocker handlerBlocker = new HandlerBlocker(timeout);
        this.testHandler.handlerBody = id -> {
            if (id == 1) {
                handlerBlocker.arriveAndBlock();
            }
            return CompletableFuture.completedFuture(new TestResponse((int)id));
        };
        CompletableFuture<TestResponse> response1 = this.sendRequestToTestHandler(new TestRequest(1));
        handlerBlocker.awaitRequestToArrive();
        CompletableFuture<TestResponse> response2 = this.sendRequestToTestHandler(new TestRequest(2));
        Assert.assertEquals((long)2L, (long)response2.get().id);
        handlerBlocker.unblockRequest();
        Assert.assertEquals((long)1L, (long)response1.get().id);
    }

    @Test
    public void testBadHandlerRequest() throws Exception {
        FaultyTestParameters parameters = new FaultyTestParameters();
        parameters.faultyJobIDPathParameter.resolve(PATH_JOB_ID);
        ((TestParameters)parameters).jobIDQueryParameter.resolve(Collections.singletonList(QUERY_JOB_ID));
        CompletableFuture response = this.restClient.sendRequest(this.serverAddress.getHostName(), this.serverAddress.getPort(), (MessageHeaders)new TestHeaders(), (MessageParameters)parameters, (RequestBody)new TestRequest(2));
        try {
            response.get();
            Assert.fail((String)"The request should fail with a bad request return code.");
        }
        catch (ExecutionException ee) {
            Throwable t = ExceptionUtils.stripExecutionException((Throwable)ee);
            Assert.assertTrue((boolean)(t instanceof RestClientException));
            RestClientException rce = (RestClientException)t;
            Assert.assertEquals((Object)HttpResponseStatus.BAD_REQUEST, (Object)rce.getHttpResponseStatus());
        }
    }

    @Test
    public void testShouldRespectMaxContentLengthLimitForRequests() throws Exception {
        this.testHandler.handlerBody = id -> {
            throw new AssertionError((Object)"Request should not arrive at server.");
        };
        try {
            this.sendRequestToTestHandler(new TestRequest(2, RestServerEndpointITCase.createStringOfSize(4096))).get();
            Assert.fail((String)"Expected exception not thrown");
        }
        catch (ExecutionException e) {
            Throwable throwable = ExceptionUtils.stripExecutionException((Throwable)e);
            Assert.assertThat((Object)throwable, (Matcher)Matchers.instanceOf(RestClientException.class));
            Assert.assertThat((Object)throwable.getMessage(), (Matcher)Matchers.containsString((String)"Try to raise"));
        }
    }

    @Test
    public void testShouldRespectMaxContentLengthLimitForResponses() throws Exception {
        this.testHandler.handlerBody = id -> CompletableFuture.completedFuture(new TestResponse((int)id, RestServerEndpointITCase.createStringOfSize(4096)));
        try {
            this.sendRequestToTestHandler(new TestRequest(1)).get();
            Assert.fail((String)"Expected exception not thrown");
        }
        catch (ExecutionException e) {
            Throwable throwable = ExceptionUtils.stripExecutionException((Throwable)e);
            Assert.assertThat((Object)throwable, (Matcher)Matchers.instanceOf(TooLongFrameException.class));
            Assert.assertThat((Object)throwable.getMessage(), (Matcher)Matchers.containsString((String)"Try to raise"));
        }
    }

    @Test
    public void testFileUpload() throws Exception {
        String boundary = RestServerEndpointITCase.generateMultiPartBoundary();
        String crlf = "\r\n";
        String uploadedContent = "hello";
        HttpURLConnection connection = this.openHttpConnectionForUpload(boundary);
        try (OutputStream output = connection.getOutputStream();
             PrintWriter writer = new PrintWriter((Writer)new OutputStreamWriter(output, StandardCharsets.UTF_8), true);){
            writer.append("--" + boundary).append("\r\n");
            writer.append("Content-Disposition: form-data; name=\"foo\"; filename=\"bar\"").append("\r\n");
            writer.append("Content-Type: plain/text; charset=utf8").append("\r\n");
            writer.append("\r\n").flush();
            output.write("hello".getBytes(StandardCharsets.UTF_8));
            output.flush();
            writer.append("\r\n").flush();
            writer.append("--" + boundary + "--").append("\r\n").flush();
        }
        Assert.assertEquals((long)200L, (long)connection.getResponseCode());
        byte[] lastUploadedFileContents = this.testUploadHandler.getLastUploadedFileContents();
        Assert.assertEquals((Object)"hello", (Object)new String(lastUploadedFileContents, StandardCharsets.UTF_8));
    }

    @Test
    public void testMultiPartFormDataWithoutFileUpload() throws Exception {
        String boundary = RestServerEndpointITCase.generateMultiPartBoundary();
        String crlf = "\r\n";
        HttpURLConnection connection = this.openHttpConnectionForUpload(boundary);
        try (OutputStream output = connection.getOutputStream();
             PrintWriter writer = new PrintWriter((Writer)new OutputStreamWriter(output, StandardCharsets.UTF_8), true);){
            writer.append("--" + boundary).append("\r\n");
            writer.append("Content-Disposition: form-data; name=\"foo\"").append("\r\n");
            writer.append("\r\n").flush();
            output.write("test".getBytes(StandardCharsets.UTF_8));
            output.flush();
            writer.append("\r\n").flush();
            writer.append("--" + boundary + "--").append("\r\n").flush();
        }
        Assert.assertEquals((long)400L, (long)connection.getResponseCode());
    }

    @Test
    public void testStaticFileServerHandler() throws Exception {
        File file = this.temporaryFolder.newFile();
        Files.write(file.toPath(), Collections.singletonList("foobar"), new OpenOption[0]);
        URL url = new URL(this.serverEndpoint.getRestBaseUrl() + "/" + file.getName());
        HttpURLConnection connection = (HttpURLConnection)url.openConnection();
        connection.setRequestMethod("GET");
        String fileContents = IOUtils.toString((InputStream)connection.getInputStream());
        Assert.assertEquals((Object)"foobar", (Object)fileContents.trim());
    }

    @Test
    public void testVersioning() throws Exception {
        CompletableFuture unspecifiedVersionResponse = this.restClient.sendRequest(this.serverAddress.getHostName(), this.serverAddress.getPort(), (MessageHeaders)TestVersionHeaders.INSTANCE, (MessageParameters)EmptyMessageParameters.getInstance(), (RequestBody)EmptyRequestBody.getInstance(), Collections.emptyList());
        unspecifiedVersionResponse.get(5L, TimeUnit.SECONDS);
        CompletableFuture specifiedVersionResponse = this.restClient.sendRequest(this.serverAddress.getHostName(), this.serverAddress.getPort(), (MessageHeaders)TestVersionHeaders.INSTANCE, (MessageParameters)EmptyMessageParameters.getInstance(), (RequestBody)EmptyRequestBody.getInstance(), Collections.emptyList(), RestAPIVersion.V1);
        specifiedVersionResponse.get(5L, TimeUnit.SECONDS);
    }

    @Test
    public void testVersionSelection() throws Exception {
        CompletableFuture version1Response = this.restClient.sendRequest(this.serverAddress.getHostName(), this.serverAddress.getPort(), (MessageHeaders)TestVersionSelectionHeaders1.INSTANCE, (MessageParameters)EmptyMessageParameters.getInstance(), (RequestBody)EmptyRequestBody.getInstance(), Collections.emptyList(), RestAPIVersion.V0);
        try {
            version1Response.get(5L, TimeUnit.SECONDS);
            Assert.fail();
        }
        catch (ExecutionException ee) {
            RestClientException rce = (RestClientException)ee.getCause();
            Assert.assertEquals((Object)HttpResponseStatus.OK, (Object)rce.getHttpResponseStatus());
        }
        CompletableFuture version2Response = this.restClient.sendRequest(this.serverAddress.getHostName(), this.serverAddress.getPort(), (MessageHeaders)TestVersionSelectionHeaders2.INSTANCE, (MessageParameters)EmptyMessageParameters.getInstance(), (RequestBody)EmptyRequestBody.getInstance(), Collections.emptyList(), RestAPIVersion.V1);
        try {
            version2Response.get(5L, TimeUnit.SECONDS);
            Assert.fail();
        }
        catch (ExecutionException ee) {
            RestClientException rce = (RestClientException)ee.getCause();
            Assert.assertEquals((Object)HttpResponseStatus.ACCEPTED, (Object)rce.getHttpResponseStatus());
        }
    }

    @Test
    public void testDefaultVersionRouting() throws Exception {
        Assume.assumeFalse((String)"Ignoring SSL-enabled test to keep OkHttp usage simple.", (boolean)this.config.getBoolean(SecurityOptions.SSL_REST_ENABLED));
        OkHttpClient client = new OkHttpClient();
        Request request = new Request.Builder().url(this.serverEndpoint.getRestBaseUrl() + TestVersionSelectionHeaders2.INSTANCE.getTargetRestEndpointURL()).build();
        try (Response response = client.newCall(request).execute();){
            Assert.assertEquals((long)HttpResponseStatus.ACCEPTED.code(), (long)response.code());
        }
    }

    @Test
    public void testNonSslRedirectForEnabledSsl() throws Exception {
        Assume.assumeTrue((boolean)this.config.getBoolean(SecurityOptions.SSL_REST_ENABLED));
        OkHttpClient client = new OkHttpClient.Builder().followRedirects(false).build();
        String httpsUrl = this.serverEndpoint.getRestBaseUrl() + "/path";
        String httpUrl = httpsUrl.replace("https://", "http://");
        Request request = new Request.Builder().url(httpUrl).build();
        try (Response response = client.newCall(request).execute();){
            Assert.assertEquals((long)HttpResponseStatus.MOVED_PERMANENTLY.code(), (long)response.code());
            Assert.assertThat((Object)response.headers().names(), (Matcher)CoreMatchers.hasItems((Object[])new String[]{"Location"}));
            Assert.assertEquals((Object)httpsUrl, (Object)response.header("Location"));
        }
    }

    @Test
    public void testShouldWaitForHandlersWhenClosing() throws Exception {
        HandlerBlocker handlerBlocker = new HandlerBlocker(timeout);
        this.testHandler.closeFuture = new CompletableFuture();
        this.testHandler.handlerBody = id -> CompletableFuture.supplyAsync(() -> {
            handlerBlocker.arriveAndBlock();
            return new TestResponse((int)id);
        });
        CompletableFuture closeRestServerEndpointFuture = this.serverEndpoint.closeAsync();
        Assert.assertThat((Object)closeRestServerEndpointFuture.isDone(), (Matcher)Matchers.is((Object)false));
        CompletableFuture<TestResponse> request = this.sendRequestToTestHandler(new TestRequest(1));
        handlerBlocker.awaitRequestToArrive();
        this.testHandler.closeFuture.complete(null);
        Assert.assertThat((Object)closeRestServerEndpointFuture.isDone(), (Matcher)Matchers.is((Object)false));
        handlerBlocker.unblockRequest();
        request.get(timeout.getSize(), timeout.getUnit());
        closeRestServerEndpointFuture.get(timeout.getSize(), timeout.getUnit());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRequestsRejectedAfterShutdownOfHandlerIsCompleted() throws Exception {
        this.testHandler.handlerBody = id -> CompletableFuture.completedFuture(new TestResponse((int)id, "foobar"));
        this.testUploadHandler.closeFuture = new CompletableFuture();
        CompletableFuture closeRestServerEndpointFuture = this.serverEndpoint.closeAsync();
        Assert.assertThat((Object)closeRestServerEndpointFuture.isDone(), (Matcher)Matchers.is((Object)false));
        this.testHandler.closeLatch.await();
        CompletableFuture<TestResponse> request = this.sendRequestToTestHandler(new TestRequest(1));
        try {
            request.get(timeout.getSize(), timeout.getUnit());
            Assert.fail((String)"Expected a ConnectionClosedException");
        }
        catch (ExecutionException ee) {
            if (!ExceptionUtils.findThrowable((Throwable)ee, ConnectionClosedException.class).isPresent()) {
                throw ee;
            }
        }
        finally {
            this.testUploadHandler.closeFuture.complete(null);
            closeRestServerEndpointFuture.get();
        }
    }

    @Test
    public void testRestServerBindPort() throws Exception {
        int portRangeStart = 52300;
        int portRangeEnd = 52400;
        Configuration config = new Configuration();
        config.setString(RestOptions.ADDRESS, "localhost");
        config.setString(RestOptions.BIND_PORT, "52300-52400");
        RestServerEndpointConfiguration serverConfig = RestServerEndpointConfiguration.fromConfiguration((Configuration)config);
        try (TestRestServerEndpoint serverEndpoint1 = new TestRestServerEndpoint(serverConfig, Collections.emptyList());
             TestRestServerEndpoint serverEndpoint2 = new TestRestServerEndpoint(serverConfig, Collections.emptyList());){
            serverEndpoint1.start();
            serverEndpoint2.start();
            Assert.assertNotEquals((long)serverEndpoint1.getServerAddress().getPort(), (long)serverEndpoint2.getServerAddress().getPort());
            Assert.assertThat((Object)serverEndpoint1.getServerAddress().getPort(), (Matcher)Matchers.is((Matcher)Matchers.greaterThanOrEqualTo((Comparable)Integer.valueOf(52300))));
            Assert.assertThat((Object)serverEndpoint1.getServerAddress().getPort(), (Matcher)Matchers.is((Matcher)Matchers.lessThanOrEqualTo((Comparable)Integer.valueOf(52400))));
            Assert.assertThat((Object)serverEndpoint2.getServerAddress().getPort(), (Matcher)Matchers.is((Matcher)Matchers.greaterThanOrEqualTo((Comparable)Integer.valueOf(52300))));
            Assert.assertThat((Object)serverEndpoint2.getServerAddress().getPort(), (Matcher)Matchers.is((Matcher)Matchers.lessThanOrEqualTo((Comparable)Integer.valueOf(52400))));
        }
    }

    @Test
    public void testEndpointsMustBeUnique() throws Exception {
        RestServerEndpointConfiguration serverConfig = RestServerEndpointConfiguration.fromConfiguration((Configuration)this.config);
        List<Tuple2> handlers = Arrays.asList(Tuple2.of((Object)new TestHeaders(), (Object)((Object)this.testHandler)), Tuple2.of((Object)new TestHeaders(), (Object)((Object)this.testUploadHandler)));
        CommonTestUtils.assertThrows((String)"REST handler registration", FlinkRuntimeException.class, () -> {
            try (TestRestServerEndpoint restServerEndpoint = new TestRestServerEndpoint(serverConfig, handlers);){
                restServerEndpoint.start();
                Object var4_4 = null;
                return var4_4;
            }
        });
    }

    @Test
    public void testDuplicateHandlerRegistrationIsForbidden() throws Exception {
        RestServerEndpointConfiguration serverConfig = RestServerEndpointConfiguration.fromConfiguration((Configuration)this.config);
        List<Tuple2> handlers = Arrays.asList(Tuple2.of((Object)new TestHeaders(), (Object)((Object)this.testHandler)), Tuple2.of((Object)((Object)TestUploadHeaders.INSTANCE), (Object)((Object)this.testHandler)));
        CommonTestUtils.assertThrows((String)"Duplicate REST handler", FlinkRuntimeException.class, () -> {
            try (TestRestServerEndpoint restServerEndpoint = new TestRestServerEndpoint(serverConfig, handlers);){
                restServerEndpoint.start();
                Object var4_4 = null;
                return var4_4;
            }
        });
    }

    private static File getTestResource(String fileName) {
        ClassLoader classLoader = ClassLoader.getSystemClassLoader();
        URL resource = classLoader.getResource(fileName);
        if (resource == null) {
            throw new IllegalArgumentException(String.format("Test resource %s does not exist", fileName));
        }
        return new File(resource.getFile());
    }

    private HttpURLConnection openHttpConnectionForUpload(String boundary) throws IOException {
        HttpURLConnection connection = (HttpURLConnection)new URL(this.serverEndpoint.getRestBaseUrl() + "/upload").openConnection();
        connection.setDoOutput(true);
        connection.setRequestProperty("Content-Type", "multipart/form-data; boundary=" + boundary);
        return connection;
    }

    private static String generateMultiPartBoundary() {
        return Long.toHexString(System.currentTimeMillis());
    }

    private static String createStringOfSize(int size) {
        StringBuilder sb = new StringBuilder(size);
        for (int i = 0; i < size; ++i) {
            sb.append('a');
        }
        return sb.toString();
    }

    private CompletableFuture<TestResponse> sendRequestToTestHandler(TestRequest testRequest) {
        try {
            return this.restClient.sendRequest(this.serverAddress.getHostName(), this.serverAddress.getPort(), (MessageHeaders)new TestHeaders(), (MessageParameters)RestServerEndpointITCase.createTestParameters(), (RequestBody)testRequest);
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private static TestParameters createTestParameters() {
        TestParameters parameters = new TestParameters();
        parameters.jobIDPathParameter.resolve(PATH_JOB_ID);
        parameters.jobIDQueryParameter.resolve(Collections.singletonList(QUERY_JOB_ID));
        return parameters;
    }

    private static enum TestUploadHeaders implements MessageHeaders<EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters>
    {
        INSTANCE;


        public Class<EmptyResponseBody> getResponseClass() {
            return EmptyResponseBody.class;
        }

        public HttpResponseStatus getResponseStatusCode() {
            return HttpResponseStatus.OK;
        }

        public Class<EmptyRequestBody> getRequestClass() {
            return EmptyRequestBody.class;
        }

        public EmptyMessageParameters getUnresolvedMessageParameters() {
            return EmptyMessageParameters.getInstance();
        }

        public HttpMethodWrapper getHttpMethod() {
            return HttpMethodWrapper.POST;
        }

        public String getTargetRestEndpointURL() {
            return "/upload";
        }

        public String getDescription() {
            return "";
        }

        public boolean acceptsFileUploads() {
            return true;
        }
    }

    private static class TestVersionSelectionHandler2
    extends AbstractRestHandler<RestfulGateway, EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters> {
        private TestVersionSelectionHandler2(GatewayRetriever<? extends RestfulGateway> leaderRetriever, Time timeout) {
            super(leaderRetriever, timeout, Collections.emptyMap(), (MessageHeaders)TestVersionSelectionHeaders2.INSTANCE);
        }

        protected CompletableFuture<EmptyResponseBody> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request, @Nonnull RestfulGateway gateway) throws RestHandlerException {
            throw new RestHandlerException("test failure 2", HttpResponseStatus.ACCEPTED);
        }
    }

    private static class TestVersionSelectionHandler1
    extends AbstractRestHandler<RestfulGateway, EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters> {
        private TestVersionSelectionHandler1(GatewayRetriever<? extends RestfulGateway> leaderRetriever, Time timeout) {
            super(leaderRetriever, timeout, Collections.emptyMap(), (MessageHeaders)TestVersionSelectionHeaders1.INSTANCE);
        }

        protected CompletableFuture<EmptyResponseBody> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request, @Nonnull RestfulGateway gateway) throws RestHandlerException {
            throw new RestHandlerException("test failure 1", HttpResponseStatus.OK);
        }
    }

    private static enum TestVersionSelectionHeaders2 implements TestVersionSelectionHeadersBase
    {
        INSTANCE;


        public Collection<RestAPIVersion> getSupportedAPIVersions() {
            return Collections.singleton(RestAPIVersion.V1);
        }
    }

    private static enum TestVersionSelectionHeaders1 implements TestVersionSelectionHeadersBase
    {
        INSTANCE;


        public Collection<RestAPIVersion> getSupportedAPIVersions() {
            return Collections.singleton(RestAPIVersion.V0);
        }
    }

    private static interface TestVersionSelectionHeadersBase
    extends MessageHeaders<EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters> {
        default public Class<EmptyRequestBody> getRequestClass() {
            return EmptyRequestBody.class;
        }

        default public HttpMethodWrapper getHttpMethod() {
            return HttpMethodWrapper.GET;
        }

        default public String getTargetRestEndpointURL() {
            return "/test/select-version";
        }

        default public Class<EmptyResponseBody> getResponseClass() {
            return EmptyResponseBody.class;
        }

        default public HttpResponseStatus getResponseStatusCode() {
            return HttpResponseStatus.OK;
        }

        default public String getDescription() {
            return null;
        }

        default public EmptyMessageParameters getUnresolvedMessageParameters() {
            return EmptyMessageParameters.getInstance();
        }
    }

    static enum TestVersionHeaders implements MessageHeaders<EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters>
    {
        INSTANCE;


        public Class<EmptyRequestBody> getRequestClass() {
            return EmptyRequestBody.class;
        }

        public HttpMethodWrapper getHttpMethod() {
            return HttpMethodWrapper.GET;
        }

        public String getTargetRestEndpointURL() {
            return "/test/versioning";
        }

        public Class<EmptyResponseBody> getResponseClass() {
            return EmptyResponseBody.class;
        }

        public HttpResponseStatus getResponseStatusCode() {
            return HttpResponseStatus.OK;
        }

        public String getDescription() {
            return null;
        }

        public EmptyMessageParameters getUnresolvedMessageParameters() {
            return EmptyMessageParameters.getInstance();
        }

        public Collection<RestAPIVersion> getSupportedAPIVersions() {
            return Collections.singleton(RestAPIVersion.V1);
        }
    }

    static class TestVersionHandler
    extends AbstractRestHandler<RestfulGateway, EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters> {
        TestVersionHandler(GatewayRetriever<? extends RestfulGateway> leaderRetriever, Time timeout) {
            super(leaderRetriever, timeout, Collections.emptyMap(), (MessageHeaders)TestVersionHeaders.INSTANCE);
        }

        protected CompletableFuture<EmptyResponseBody> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request, @Nonnull RestfulGateway gateway) throws RestHandlerException {
            return CompletableFuture.completedFuture(EmptyResponseBody.getInstance());
        }
    }

    private static class TestUploadHandler
    extends AbstractRestHandler<RestfulGateway, EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters> {
        private volatile CompletableFuture<Void> closeFuture = CompletableFuture.completedFuture(null);
        private volatile byte[] lastUploadedFileContents;

        private TestUploadHandler(GatewayRetriever<? extends RestfulGateway> leaderRetriever, Time timeout) {
            super(leaderRetriever, timeout, Collections.emptyMap(), (MessageHeaders)TestUploadHeaders.INSTANCE);
        }

        protected CompletableFuture<EmptyResponseBody> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request, @Nonnull RestfulGateway gateway) throws RestHandlerException {
            Collection uploadedFiles = request.getUploadedFiles().stream().map(File::toPath).collect(Collectors.toList());
            if (uploadedFiles.size() != 1) {
                throw new RestHandlerException("Expected 1 file, received " + uploadedFiles.size() + '.', HttpResponseStatus.BAD_REQUEST);
            }
            try {
                this.lastUploadedFileContents = Files.readAllBytes((Path)uploadedFiles.iterator().next());
            }
            catch (IOException e) {
                throw new RestHandlerException("Could not read contents of uploaded file.", HttpResponseStatus.INTERNAL_SERVER_ERROR, (Throwable)e);
            }
            return CompletableFuture.completedFuture(EmptyResponseBody.getInstance());
        }

        public byte[] getLastUploadedFileContents() {
            return this.lastUploadedFileContents;
        }

        protected CompletableFuture<Void> closeHandlerAsync() {
            return this.closeFuture;
        }
    }

    static class JobIDQueryParameter
    extends MessageQueryParameter<JobID> {
        JobIDQueryParameter() {
            super(RestServerEndpointITCase.JOB_ID_KEY, MessageParameter.MessageParameterRequisiteness.MANDATORY);
        }

        public JobID convertStringToValue(String value) {
            return JobID.fromHexString((String)value);
        }

        public String convertValueToString(JobID value) {
            return value.toString();
        }

        public String getDescription() {
            return "query JobID parameter";
        }
    }

    static class FaultyJobIDPathParameter
    extends MessagePathParameter<JobID> {
        FaultyJobIDPathParameter() {
            super(RestServerEndpointITCase.JOB_ID_KEY);
        }

        protected JobID convertFromString(String value) throws ConversionException {
            return JobID.fromHexString((String)value);
        }

        protected String convertToString(JobID value) {
            return "foobar";
        }

        public String getDescription() {
            return "faulty JobID parameter";
        }
    }

    static class JobIDPathParameter
    extends MessagePathParameter<JobID> {
        JobIDPathParameter() {
            super(RestServerEndpointITCase.JOB_ID_KEY);
        }

        public JobID convertFromString(String value) {
            return JobID.fromHexString((String)value);
        }

        protected String convertToString(JobID value) {
            return value.toString();
        }

        public String getDescription() {
            return "correct JobID parameter";
        }
    }

    private static class FaultyTestParameters
    extends TestParameters {
        private final FaultyJobIDPathParameter faultyJobIDPathParameter = new FaultyJobIDPathParameter();

        private FaultyTestParameters() {
        }

        @Override
        public Collection<MessagePathParameter<?>> getPathParameters() {
            return Collections.singleton(this.faultyJobIDPathParameter);
        }
    }

    private static class TestParameters
    extends MessageParameters {
        private final JobIDPathParameter jobIDPathParameter = new JobIDPathParameter();
        private final JobIDQueryParameter jobIDQueryParameter = new JobIDQueryParameter();

        private TestParameters() {
        }

        public Collection<MessagePathParameter<?>> getPathParameters() {
            return Collections.singleton(this.jobIDPathParameter);
        }

        public Collection<MessageQueryParameter<?>> getQueryParameters() {
            return Collections.singleton(this.jobIDQueryParameter);
        }
    }

    private static class TestHeaders
    implements MessageHeaders<TestRequest, TestResponse, TestParameters> {
        private TestHeaders() {
        }

        public HttpMethodWrapper getHttpMethod() {
            return HttpMethodWrapper.POST;
        }

        public String getTargetRestEndpointURL() {
            return "/test/:jobid";
        }

        public Class<TestRequest> getRequestClass() {
            return TestRequest.class;
        }

        public Class<TestResponse> getResponseClass() {
            return TestResponse.class;
        }

        public HttpResponseStatus getResponseStatusCode() {
            return HttpResponseStatus.OK;
        }

        public String getDescription() {
            return "";
        }

        public TestParameters getUnresolvedMessageParameters() {
            return new TestParameters();
        }
    }

    private static class TestResponse
    implements ResponseBody {
        public final int id;
        public final String content;

        public TestResponse(int id) {
            this(id, null);
        }

        @JsonCreator
        public TestResponse(@JsonProperty(value="id") int id, @JsonProperty(value="content") String content) {
            this.id = id;
            this.content = content;
        }
    }

    private static class TestRequest
    implements RequestBody {
        public final int id;
        public final String content;

        public TestRequest(int id) {
            this(id, null);
        }

        @JsonCreator
        public TestRequest(@JsonProperty(value="id") int id, @JsonProperty(value="content") String content) {
            this.id = id;
            this.content = content;
        }
    }

    static class TestRestClient
    extends RestClient {
        TestRestClient(RestClientConfiguration configuration) {
            super(configuration, (Executor)TestingUtils.defaultExecutor());
        }
    }

    private static class HandlerBlocker {
        private final Time timeout;
        private final CountDownLatch requestArrivedLatch = new CountDownLatch(1);
        private final CountDownLatch finishRequestLatch = new CountDownLatch(1);

        private HandlerBlocker(Time timeout) {
            this.timeout = (Time)Preconditions.checkNotNull((Object)timeout);
        }

        public void awaitRequestToArrive() {
            try {
                Assert.assertTrue((boolean)this.requestArrivedLatch.await(this.timeout.getSize(), this.timeout.getUnit()));
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }

        public void arriveAndBlock() {
            this.markRequestArrived();
            try {
                Assert.assertTrue((boolean)this.finishRequestLatch.await(this.timeout.getSize(), this.timeout.getUnit()));
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }

        public void unblockRequest() {
            this.finishRequestLatch.countDown();
        }

        private void markRequestArrived() {
            this.requestArrivedLatch.countDown();
        }
    }

    private static class TestHandler
    extends AbstractRestHandler<RestfulGateway, TestRequest, TestResponse, TestParameters> {
        private final OneShotLatch closeLatch = new OneShotLatch();
        private CompletableFuture<Void> closeFuture = CompletableFuture.completedFuture(null);
        private Function<Integer, CompletableFuture<TestResponse>> handlerBody;

        TestHandler(GatewayRetriever<RestfulGateway> leaderRetriever, Time timeout) {
            super(leaderRetriever, timeout, Collections.emptyMap(), (MessageHeaders)new TestHeaders());
        }

        protected CompletableFuture<TestResponse> handleRequest(@Nonnull HandlerRequest<TestRequest, TestParameters> request, RestfulGateway gateway) {
            Assert.assertEquals((Object)request.getPathParameter(JobIDPathParameter.class), (Object)PATH_JOB_ID);
            Assert.assertEquals(request.getQueryParameter(JobIDQueryParameter.class).get(0), (Object)QUERY_JOB_ID);
            int id = ((TestRequest)request.getRequestBody()).id;
            return this.handlerBody.apply(id);
        }

        public CompletableFuture<Void> closeHandlerAsync() {
            this.closeLatch.trigger();
            return this.closeFuture;
        }
    }

    static class TestRestServerEndpoint
    extends RestServerEndpoint {
        private final List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers;

        TestRestServerEndpoint(RestServerEndpointConfiguration configuration, List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers) throws IOException {
            super(configuration);
            this.handlers = Objects.requireNonNull(handlers);
        }

        protected List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> initializeHandlers(CompletableFuture<String> localAddressFuture) {
            return this.handlers;
        }

        protected void startInternal() {
        }
    }
}

