/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.rest.handler.async;

import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.BiFunction;
import javax.annotation.Nullable;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.rest.HttpMethodWrapper;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.HandlerRequestException;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
import org.apache.flink.runtime.rest.handler.async.AbstractAsynchronousOperationHandlers;
import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationResult;
import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationStatusMessageHeaders;
import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationTriggerMessageHeaders;
import org.apache.flink.runtime.rest.handler.async.OperationKey;
import org.apache.flink.runtime.rest.handler.async.TriggerResponse;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.MessageParameters;
import org.apache.flink.runtime.rest.messages.MessagePathParameter;
import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
import org.apache.flink.runtime.rest.messages.RequestBody;
import org.apache.flink.runtime.rest.messages.TriggerId;
import org.apache.flink.runtime.rest.messages.TriggerIdPathParameter;
import org.apache.flink.runtime.rest.messages.queue.QueueStatus;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.TestingRestfulGateway;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.concurrent.FutureUtils;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class AbstractAsynchronousOperationHandlersTest
extends TestLogger {
    private static final Time TIMEOUT = Time.seconds((long)10L);
    private static final TestingRestfulGateway DUMMY_GATEWAY = new TestingRestfulGateway();
    private TestingAsynchronousOperationHandlers.TestingTriggerHandler testingTriggerHandler;
    private TestingAsynchronousOperationHandlers.TestingStatusHandler testingStatusHandler;

    @Before
    public void setup() {
        TestingAsynchronousOperationHandlers testingAsynchronousOperationHandlers = new TestingAsynchronousOperationHandlers();
        this.testingTriggerHandler = testingAsynchronousOperationHandlers.new TestingAsynchronousOperationHandlers.TestingTriggerHandler((GatewayRetriever<? extends RestfulGateway>)((GatewayRetriever)() -> null), TIMEOUT, Collections.emptyMap(), (MessageHeaders<EmptyRequestBody, TriggerResponse, EmptyMessageParameters>)TestingTriggerMessageHeaders.INSTANCE);
        this.testingStatusHandler = testingAsynchronousOperationHandlers.new TestingAsynchronousOperationHandlers.TestingStatusHandler((GatewayRetriever<? extends RestfulGateway>)((GatewayRetriever)() -> null), TIMEOUT, Collections.emptyMap(), (MessageHeaders<EmptyRequestBody, AsynchronousOperationResult<OperationResult>, TriggerMessageParameters>)TestingStatusMessageHeaders.INSTANCE);
    }

    @Test
    public void testOperationCompletion() throws Exception {
        CompletableFuture<Acknowledge> acknowledgeFuture = new CompletableFuture<Acknowledge>();
        this.testingTriggerHandler.setGatewayCallback((request, gateway) -> acknowledgeFuture);
        TriggerId triggerId = ((TriggerResponse)this.testingTriggerHandler.handleRequest(AbstractAsynchronousOperationHandlersTest.triggerOperationRequest(), DUMMY_GATEWAY).get()).getTriggerId();
        AsynchronousOperationResult operationResult = (AsynchronousOperationResult)this.testingStatusHandler.handleRequest(AbstractAsynchronousOperationHandlersTest.statusOperationRequest(triggerId), DUMMY_GATEWAY).get();
        Assert.assertThat((Object)operationResult.queueStatus().getId(), (Matcher)Matchers.is((Object)QueueStatus.inProgress().getId()));
        acknowledgeFuture.complete(Acknowledge.get());
        operationResult = (AsynchronousOperationResult)this.testingStatusHandler.handleRequest(AbstractAsynchronousOperationHandlersTest.statusOperationRequest(triggerId), DUMMY_GATEWAY).get();
        Assert.assertThat((Object)operationResult.queueStatus().getId(), (Matcher)Matchers.is((Object)QueueStatus.completed().getId()));
        Assert.assertThat((Object)((OperationResult)operationResult.resource()).value, (Matcher)Matchers.is((Object)Acknowledge.get()));
    }

    @Test
    public void testOperationFailure() throws Exception {
        FlinkException testException = new FlinkException("Test exception");
        this.testingTriggerHandler.setGatewayCallback((request, gateway) -> FutureUtils.completedExceptionally((Throwable)testException));
        TriggerId triggerId = ((TriggerResponse)this.testingTriggerHandler.handleRequest(AbstractAsynchronousOperationHandlersTest.triggerOperationRequest(), DUMMY_GATEWAY).get()).getTriggerId();
        AsynchronousOperationResult operationResult = (AsynchronousOperationResult)this.testingStatusHandler.handleRequest(AbstractAsynchronousOperationHandlersTest.statusOperationRequest(triggerId), DUMMY_GATEWAY).get();
        Assert.assertThat((Object)operationResult.queueStatus().getId(), (Matcher)Matchers.is((Object)QueueStatus.completed().getId()));
        OperationResult resource = (OperationResult)operationResult.resource();
        Assert.assertThat((Object)resource.throwable, (Matcher)Matchers.is((Object)((Object)testException)));
    }

    @Test
    public void testUnknownTriggerId() throws Exception {
        try {
            this.testingStatusHandler.handleRequest(AbstractAsynchronousOperationHandlersTest.statusOperationRequest(new TriggerId()), DUMMY_GATEWAY).get();
            Assert.fail((String)"This should have failed with a RestHandlerException.");
        }
        catch (ExecutionException ee) {
            Optional optionalRestHandlerException = ExceptionUtils.findThrowable((Throwable)ee, RestHandlerException.class);
            Assert.assertThat((Object)optionalRestHandlerException.isPresent(), (Matcher)Matchers.is((Object)true));
            RestHandlerException restHandlerException = (RestHandlerException)((Object)optionalRestHandlerException.get());
            Assert.assertThat((Object)restHandlerException.getMessage(), (Matcher)Matchers.containsString((String)"Operation not found"));
            Assert.assertThat((Object)restHandlerException.getHttpResponseStatus(), (Matcher)Matchers.is((Object)HttpResponseStatus.NOT_FOUND));
        }
    }

    @Test
    public void testCloseShouldFinishOnFirstServedResult() throws Exception {
        CompletableFuture<Acknowledge> acknowledgeFuture = new CompletableFuture<Acknowledge>();
        this.testingTriggerHandler.setGatewayCallback((request, gateway) -> acknowledgeFuture);
        TriggerId triggerId = ((TriggerResponse)this.testingTriggerHandler.handleRequest(AbstractAsynchronousOperationHandlersTest.triggerOperationRequest(), DUMMY_GATEWAY).get()).getTriggerId();
        CompletableFuture closeFuture = this.testingStatusHandler.closeAsync();
        this.testingStatusHandler.handleRequest(AbstractAsynchronousOperationHandlersTest.statusOperationRequest(triggerId), DUMMY_GATEWAY).get();
        Assert.assertThat((Object)closeFuture.isDone(), (Matcher)Matchers.is((Object)false));
        acknowledgeFuture.complete(Acknowledge.get());
        this.testingStatusHandler.handleRequest(AbstractAsynchronousOperationHandlersTest.statusOperationRequest(triggerId), DUMMY_GATEWAY).get();
        Assert.assertThat((Object)closeFuture.isDone(), (Matcher)Matchers.is((Object)true));
    }

    private static HandlerRequest<EmptyRequestBody> triggerOperationRequest() {
        return HandlerRequest.create((RequestBody)EmptyRequestBody.getInstance(), (MessageParameters)EmptyMessageParameters.getInstance());
    }

    private static HandlerRequest<EmptyRequestBody> statusOperationRequest(TriggerId triggerId) throws HandlerRequestException {
        return HandlerRequest.resolveParametersAndCreate((RequestBody)EmptyRequestBody.getInstance(), (MessageParameters)new TriggerMessageParameters(), Collections.singletonMap("triggerid", triggerId.toString()), Collections.emptyMap(), Collections.emptyList());
    }

    private static final class TestingAsynchronousOperationHandlers
    extends AbstractAsynchronousOperationHandlers<TestOperationKey, Acknowledge> {
        protected TestingAsynchronousOperationHandlers() {
            super((Duration)RestOptions.ASYNC_OPERATION_STORE_DURATION.defaultValue());
        }

        class TestingStatusHandler
        extends AbstractAsynchronousOperationHandlers.StatusHandler<RestfulGateway, OperationResult, TriggerMessageParameters> {
            protected TestingStatusHandler(GatewayRetriever<? extends RestfulGateway> leaderRetriever, Time timeout, Map<String, String> responseHeaders, MessageHeaders<EmptyRequestBody, AsynchronousOperationResult<OperationResult>, TriggerMessageParameters> messageHeaders) {
                super((AbstractAsynchronousOperationHandlers)TestingAsynchronousOperationHandlers.this, leaderRetriever, timeout, responseHeaders, messageHeaders);
            }

            protected TestOperationKey getOperationKey(HandlerRequest<EmptyRequestBody> request) {
                TriggerId triggerId = (TriggerId)request.getPathParameter(TriggerIdPathParameter.class);
                return new TestOperationKey(triggerId);
            }

            protected OperationResult exceptionalOperationResultResponse(Throwable throwable) {
                return new OperationResult(null, throwable);
            }

            protected OperationResult operationResultResponse(Acknowledge operationResult) {
                return new OperationResult(operationResult, null);
            }
        }

        class TestingTriggerHandler
        extends AbstractAsynchronousOperationHandlers.TriggerHandler<RestfulGateway, EmptyRequestBody, EmptyMessageParameters> {
            private BiFunction<HandlerRequest<EmptyRequestBody>, RestfulGateway, CompletableFuture<Acknowledge>> gatewayCallback;

            protected TestingTriggerHandler(GatewayRetriever<? extends RestfulGateway> leaderRetriever, Time timeout, Map<String, String> responseHeaders, MessageHeaders<EmptyRequestBody, TriggerResponse, EmptyMessageParameters> messageHeaders) {
                super((AbstractAsynchronousOperationHandlers)TestingAsynchronousOperationHandlers.this, leaderRetriever, timeout, responseHeaders, messageHeaders);
                this.gatewayCallback = (handlerRequest, restfulGateway) -> {
                    throw new UnsupportedOperationException();
                };
            }

            public void setGatewayCallback(BiFunction<HandlerRequest<EmptyRequestBody>, RestfulGateway, CompletableFuture<Acknowledge>> callback) {
                this.gatewayCallback = callback;
            }

            protected CompletableFuture<Acknowledge> triggerOperation(HandlerRequest<EmptyRequestBody> request, RestfulGateway gateway) throws RestHandlerException {
                return this.gatewayCallback.apply(request, gateway);
            }

            protected TestOperationKey createOperationKey(HandlerRequest<EmptyRequestBody> request) {
                return new TestOperationKey(new TriggerId());
            }
        }
    }

    private static final class TestingStatusMessageHeaders
    extends AsynchronousOperationStatusMessageHeaders<OperationResult, TriggerMessageParameters> {
        private static final TestingStatusMessageHeaders INSTANCE = new TestingStatusMessageHeaders();

        private TestingStatusMessageHeaders() {
        }

        public Class<OperationResult> getValueClass() {
            return OperationResult.class;
        }

        public HttpResponseStatus getResponseStatusCode() {
            return HttpResponseStatus.OK;
        }

        public Class<EmptyRequestBody> getRequestClass() {
            return EmptyRequestBody.class;
        }

        public TriggerMessageParameters getUnresolvedMessageParameters() {
            return new TriggerMessageParameters();
        }

        public HttpMethodWrapper getHttpMethod() {
            return HttpMethodWrapper.GET;
        }

        public String getTargetRestEndpointURL() {
            return "foobar";
        }

        public String getDescription() {
            return "";
        }
    }

    private static final class TestingTriggerMessageHeaders
    extends AsynchronousOperationTriggerMessageHeaders<EmptyRequestBody, EmptyMessageParameters> {
        static final TestingTriggerMessageHeaders INSTANCE = new TestingTriggerMessageHeaders();

        private TestingTriggerMessageHeaders() {
        }

        public HttpResponseStatus getResponseStatusCode() {
            return HttpResponseStatus.OK;
        }

        public String getDescription() {
            return "";
        }

        protected String getAsyncOperationDescription() {
            return "";
        }

        public Class<EmptyRequestBody> getRequestClass() {
            return EmptyRequestBody.class;
        }

        public EmptyMessageParameters getUnresolvedMessageParameters() {
            return EmptyMessageParameters.getInstance();
        }

        public HttpMethodWrapper getHttpMethod() {
            return HttpMethodWrapper.POST;
        }

        public String getTargetRestEndpointURL() {
            return "barfoo";
        }
    }

    private static final class OperationResult {
        @Nullable
        private final Throwable throwable;
        @Nullable
        private final Acknowledge value;

        OperationResult(@Nullable Acknowledge value, @Nullable Throwable throwable) {
            this.value = value;
            this.throwable = throwable;
        }
    }

    private static final class TriggerMessageParameters
    extends MessageParameters {
        private final TriggerIdPathParameter triggerIdPathParameter = new TriggerIdPathParameter();

        private TriggerMessageParameters() {
        }

        public Collection<MessagePathParameter<?>> getPathParameters() {
            return Collections.singleton(this.triggerIdPathParameter);
        }

        public Collection<MessageQueryParameter<?>> getQueryParameters() {
            return Collections.emptyList();
        }
    }

    private static final class TestOperationKey
    extends OperationKey {
        protected TestOperationKey(TriggerId triggerId) {
            super(triggerId);
        }
    }
}

