/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.gateway.workflow;

import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.util.RestClientException;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.factories.WorkflowSchedulerFactoryUtil;
import org.apache.flink.table.gateway.rest.RestAPIITCaseBase;
import org.apache.flink.table.gateway.rest.header.materializedtable.scheduler.CreateEmbeddedSchedulerWorkflowHeaders;
import org.apache.flink.table.gateway.rest.header.materializedtable.scheduler.DeleteEmbeddedSchedulerWorkflowHeaders;
import org.apache.flink.table.gateway.rest.header.materializedtable.scheduler.ResumeEmbeddedSchedulerWorkflowHeaders;
import org.apache.flink.table.gateway.rest.header.materializedtable.scheduler.SuspendEmbeddedSchedulerWorkflowHeaders;
import org.apache.flink.table.gateway.rest.message.materializedtable.scheduler.CreateEmbeddedSchedulerWorkflowRequestBody;
import org.apache.flink.table.gateway.rest.message.materializedtable.scheduler.CreateEmbeddedSchedulerWorkflowResponseBody;
import org.apache.flink.table.gateway.rest.message.materializedtable.scheduler.EmbeddedSchedulerWorkflowRequestBody;
import org.apache.flink.table.gateway.rest.message.materializedtable.scheduler.ResumeEmbeddedSchedulerWorkflowRequestBody;
import org.apache.flink.table.gateway.workflow.EmbeddedRefreshHandler;
import org.apache.flink.table.gateway.workflow.EmbeddedWorkflowScheduler;
import org.apache.flink.table.refresh.RefreshHandler;
import org.apache.flink.table.workflow.CreatePeriodicRefreshWorkflow;
import org.apache.flink.table.workflow.CreateRefreshWorkflow;
import org.apache.flink.table.workflow.DeleteRefreshWorkflow;
import org.apache.flink.table.workflow.ModifyRefreshWorkflow;
import org.apache.flink.table.workflow.ResumeRefreshWorkflow;
import org.apache.flink.table.workflow.SuspendRefreshWorkflow;
import org.apache.flink.table.workflow.WorkflowException;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

public class EmbeddedSchedulerRelatedITCase
extends RestAPIITCaseBase {
    private static final EmbeddedSchedulerWorkflowRequestBody nonExistsWorkflow = new EmbeddedSchedulerWorkflowRequestBody("non-exists", "default_group");
    private static final ObjectIdentifier materializedTableIdentifier = ObjectIdentifier.of((String)"cat", (String)"db", (String)"t1");
    private static final String descriptionStatement = String.format("ALTER MATERIALIZED TABLE %s REFRESH", materializedTableIdentifier.asSerializableString());
    private static final String cronExpression = "0 0/1 * * * ?";
    private static final EmbeddedRefreshHandler nonExistsHandler = new EmbeddedRefreshHandler("non-exits", "default_group");
    private CreateEmbeddedSchedulerWorkflowRequestBody createRequestBody;
    private CreatePeriodicRefreshWorkflow createPeriodicWorkflow;
    private EmbeddedWorkflowScheduler embeddedWorkflowScheduler;

    @BeforeEach
    void setup() throws Exception {
        String gatewayRestEndpointURL = String.format("http://%s:%s", targetAddress, port);
        this.createRequestBody = new CreateEmbeddedSchedulerWorkflowRequestBody(materializedTableIdentifier.asSerializableString(), cronExpression, null, null, gatewayRestEndpointURL);
        this.createPeriodicWorkflow = new CreatePeriodicRefreshWorkflow(materializedTableIdentifier, descriptionStatement, cronExpression, null, null, gatewayRestEndpointURL);
        Configuration configuration = new Configuration();
        configuration.set(FactoryUtil.WORKFLOW_SCHEDULER_TYPE, (Object)"embedded");
        configuration.setString("sql-gateway.endpoint.rest.address", targetAddress);
        configuration.setString("sql-gateway.endpoint.rest.port", String.valueOf(port));
        this.embeddedWorkflowScheduler = (EmbeddedWorkflowScheduler)WorkflowSchedulerFactoryUtil.createWorkflowScheduler((Configuration)configuration, (ClassLoader)EmbeddedSchedulerRelatedITCase.class.getClassLoader());
        this.embeddedWorkflowScheduler.open();
    }

    @AfterEach
    void cleanup() throws Exception {
        this.embeddedWorkflowScheduler.close();
    }

    @Test
    void testCreateWorkflow() throws Exception {
        CreateEmbeddedSchedulerWorkflowResponseBody createResponse = (CreateEmbeddedSchedulerWorkflowResponseBody)this.sendRequest(CreateEmbeddedSchedulerWorkflowHeaders.getInstance(), EmptyMessageParameters.getInstance(), this.createRequestBody).get(5L, TimeUnit.SECONDS);
        Assertions.assertThat((String)createResponse.getWorkflowName()).isEqualTo("quartz_job_" + materializedTableIdentifier.asSerializableString());
        Assertions.assertThat((String)createResponse.getWorkflowGroup()).isEqualTo("default_group");
        CompletableFuture repeatedCreateFuture = this.sendRequest(CreateEmbeddedSchedulerWorkflowHeaders.getInstance(), EmptyMessageParameters.getInstance(), this.createRequestBody);
        FlinkAssertions.assertThatFuture(repeatedCreateFuture).failsWithin(5L, TimeUnit.SECONDS).withThrowableOfType(ExecutionException.class).withCauseInstanceOf(RestClientException.class).withMessageContaining("Materialized table `cat`.`db`.`t1` quartz schedule job already exist, job info: default_group.quartz_job_`cat`.`db`.`t1`.").satisfies(new ThrowingConsumer[]{e -> Assertions.assertThat((Comparable)((RestClientException)e.getCause()).getHttpResponseStatus()).isEqualTo((Object)HttpResponseStatus.INTERNAL_SERVER_ERROR)});
        EmbeddedSchedulerWorkflowRequestBody deleteRequestBody = new EmbeddedSchedulerWorkflowRequestBody(createResponse.getWorkflowName(), createResponse.getWorkflowGroup());
        CompletableFuture deleteFuture = this.sendRequest(DeleteEmbeddedSchedulerWorkflowHeaders.getInstance(), EmptyMessageParameters.getInstance(), deleteRequestBody);
        FlinkAssertions.assertThatFuture(deleteFuture).succeedsWithin(5L, TimeUnit.SECONDS);
    }

    @Test
    void testSuspendNonExistsWorkflow() throws Exception {
        CompletableFuture suspendFuture = this.sendRequest(SuspendEmbeddedSchedulerWorkflowHeaders.getInstance(), EmptyMessageParameters.getInstance(), nonExistsWorkflow);
        FlinkAssertions.assertThatFuture(suspendFuture).failsWithin(5L, TimeUnit.SECONDS).withThrowableOfType(ExecutionException.class).withCauseInstanceOf(RestClientException.class).withMessageContaining("Failed to suspend a non-existent quartz schedule job: default_group.non-exists").satisfies(new ThrowingConsumer[]{e -> Assertions.assertThat((Comparable)((RestClientException)e.getCause()).getHttpResponseStatus()).isEqualTo((Object)HttpResponseStatus.INTERNAL_SERVER_ERROR)});
    }

    @Test
    void testResumeNonExistsWorkflow() throws Exception {
        ResumeEmbeddedSchedulerWorkflowRequestBody resumeRequestBody = new ResumeEmbeddedSchedulerWorkflowRequestBody(nonExistsWorkflow.getWorkflowName(), nonExistsWorkflow.getWorkflowGroup(), null);
        CompletableFuture suspendFuture = this.sendRequest(ResumeEmbeddedSchedulerWorkflowHeaders.getInstance(), EmptyMessageParameters.getInstance(), resumeRequestBody);
        FlinkAssertions.assertThatFuture(suspendFuture).failsWithin(5L, TimeUnit.SECONDS).withThrowableOfType(ExecutionException.class).withCauseInstanceOf(RestClientException.class).withMessageContaining("Failed to resume a non-existent quartz schedule job: default_group.non-exists").satisfies(new ThrowingConsumer[]{e -> Assertions.assertThat((Comparable)((RestClientException)e.getCause()).getHttpResponseStatus()).isEqualTo((Object)HttpResponseStatus.INTERNAL_SERVER_ERROR)});
    }

    @Test
    void testDeleteNonExistsWorkflow() throws Exception {
        CompletableFuture suspendFuture = this.sendRequest(DeleteEmbeddedSchedulerWorkflowHeaders.getInstance(), EmptyMessageParameters.getInstance(), nonExistsWorkflow);
        FlinkAssertions.assertThatFuture(suspendFuture).succeedsWithin(5L, TimeUnit.SECONDS);
    }

    @Test
    void testCreateWorkflowByWorkflowSchedulerInterface() throws Exception {
        EmbeddedRefreshHandler actual = this.embeddedWorkflowScheduler.createRefreshWorkflow((CreateRefreshWorkflow)this.createPeriodicWorkflow);
        EmbeddedRefreshHandler expected = new EmbeddedRefreshHandler("quartz_job_" + materializedTableIdentifier.asSerializableString(), "default_group");
        Assertions.assertThat((Object)actual).isEqualTo((Object)expected);
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.embeddedWorkflowScheduler.createRefreshWorkflow((CreateRefreshWorkflow)this.createPeriodicWorkflow)).isInstanceOf(WorkflowException.class)).hasMessage("Failed to create periodic refresh workflow for materialized table `cat`.`db`.`t1`.");
        SuspendRefreshWorkflow suspendRefreshWorkflow = new SuspendRefreshWorkflow((RefreshHandler)actual);
        this.embeddedWorkflowScheduler.modifyRefreshWorkflow((ModifyRefreshWorkflow)suspendRefreshWorkflow);
        ResumeRefreshWorkflow resumeRefreshWorkflow = new ResumeRefreshWorkflow((RefreshHandler)actual, Collections.emptyMap());
        this.embeddedWorkflowScheduler.modifyRefreshWorkflow((ModifyRefreshWorkflow)resumeRefreshWorkflow);
        DeleteRefreshWorkflow deleteRefreshWorkflow = new DeleteRefreshWorkflow((RefreshHandler)actual);
        this.embeddedWorkflowScheduler.deleteRefreshWorkflow(deleteRefreshWorkflow);
    }

    @Test
    void testCreateWorkflowWithUnsupportedTypeByWorkflowSchedulerInterface() {
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.embeddedWorkflowScheduler.createRefreshWorkflow((CreateRefreshWorkflow)new UnsupportedCreateRefreshWorkflow())).isInstanceOf(WorkflowException.class)).hasMessage("Unsupported create refresh workflow type UnsupportedCreateRefreshWorkflow.");
    }

    @Test
    void testModifyWorkflowWithUnsupportedTypeByWorkflowSchedulerInterface() {
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.embeddedWorkflowScheduler.modifyRefreshWorkflow((ModifyRefreshWorkflow)new UnsupportedModifyRefreshWorkflow())).isInstanceOf(WorkflowException.class)).hasMessage("Unsupported modify refresh workflow type UnsupportedModifyRefreshWorkflow.");
    }

    @Test
    void testNonExistsWorkflowByWorkflowSchedulerInterface() throws WorkflowException {
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.embeddedWorkflowScheduler.modifyRefreshWorkflow((ModifyRefreshWorkflow)new SuspendRefreshWorkflow((RefreshHandler)nonExistsHandler))).isInstanceOf(WorkflowException.class)).hasMessage("Failed to suspend refresh workflow {\n  workflowName: non-exits,\n  workflowGroup: default_group\n}.");
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.embeddedWorkflowScheduler.modifyRefreshWorkflow((ModifyRefreshWorkflow)new ResumeRefreshWorkflow((RefreshHandler)nonExistsHandler, Collections.emptyMap()))).isInstanceOf(WorkflowException.class)).hasMessage("Failed to resume refresh workflow {\n  workflowName: non-exits,\n  workflowGroup: default_group\n}.");
        this.embeddedWorkflowScheduler.deleteRefreshWorkflow(new DeleteRefreshWorkflow((RefreshHandler)nonExistsHandler));
    }

    @Test
    void testCloseSchedulerWithoutOpen() throws Exception {
        Configuration configuration = new Configuration();
        configuration.set(FactoryUtil.WORKFLOW_SCHEDULER_TYPE, (Object)"embedded");
        configuration.setString("sql-gateway.endpoint.rest.address", targetAddress);
        configuration.setString("sql-gateway.endpoint.rest.port", String.valueOf(port));
        EmbeddedWorkflowScheduler embeddedWorkflowScheduler = (EmbeddedWorkflowScheduler)WorkflowSchedulerFactoryUtil.createWorkflowScheduler((Configuration)configuration, (ClassLoader)EmbeddedSchedulerRelatedITCase.class.getClassLoader());
        embeddedWorkflowScheduler.close();
    }

    private static class UnsupportedModifyRefreshWorkflow
    implements ModifyRefreshWorkflow<EmbeddedRefreshHandler> {
        private UnsupportedModifyRefreshWorkflow() {
        }

        public EmbeddedRefreshHandler getRefreshHandler() {
            return new EmbeddedRefreshHandler("a", "b");
        }
    }

    private static class UnsupportedCreateRefreshWorkflow
    implements CreateRefreshWorkflow {
        private UnsupportedCreateRefreshWorkflow() {
        }
    }
}

