/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.gateway.service.application;

import java.io.ByteArrayOutputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.client.program.StreamContextEnvironment;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.core.execution.PipelineExecutorServiceLoader;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
import org.apache.flink.runtime.minicluster.TestingMiniClusterConfiguration;
import org.apache.flink.table.gateway.AbstractSqlGatewayStatementITCase;
import org.apache.flink.table.gateway.AbstractSqlGatewayStatementITCaseBase;
import org.apache.flink.table.gateway.api.endpoint.EndpointVersion;
import org.apache.flink.table.gateway.api.session.SessionEnvironment;
import org.apache.flink.table.gateway.api.session.SessionHandle;
import org.apache.flink.table.gateway.rest.util.SqlGatewayRestAPIVersion;
import org.apache.flink.table.gateway.service.application.Printer;
import org.apache.flink.table.gateway.service.application.ScriptExecutor;
import org.apache.flink.table.gateway.service.context.DefaultContext;
import org.apache.flink.table.gateway.service.context.SessionContext;
import org.apache.flink.table.gateway.service.result.ResultFetcher;
import org.apache.flink.table.gateway.utils.TestSqlStatement;
import org.apache.flink.test.util.MiniClusterPipelineExecutorServiceLoader;
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.Executors;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith(value={ParameterizedTestExtension.class})
public class ScriptExecutorITCase
extends AbstractSqlGatewayStatementITCaseBase {
    private ScriptExecutor executor;

    @Override
    protected String runStatements(List<TestSqlStatement> statements) throws Exception {
        Configuration configuration = new Configuration();
        configuration.set(DeploymentOptions.TARGET, (Object)"embedded");
        configuration.set(DeploymentOptions.SHUTDOWN_ON_APPLICATION_FINISH, (Object)false);
        configuration.set(DeploymentOptions.SUBMIT_FAILED_JOB_ON_APPLICATION_ERROR, (Object)true);
        TestingMiniClusterConfiguration clusterConfiguration = TestingMiniClusterConfiguration.newBuilder().setConfiguration(configuration).build();
        try (MiniCluster miniCluster = new MiniCluster((MiniClusterConfiguration)clusterConfiguration);){
            String string;
            try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream(1024);){
                miniCluster.start();
                MiniClusterPipelineExecutorServiceLoader loader = new MiniClusterPipelineExecutorServiceLoader(miniCluster);
                StreamContextEnvironment.setAsContext((PipelineExecutorServiceLoader)loader, (Configuration)miniCluster.getConfiguration(), (ClassLoader)ScriptExecutor.class.getClassLoader(), (boolean)false, (boolean)false);
                this.executor = new TestScriptExecutor(SessionContext.create((DefaultContext)new DefaultContext(miniCluster.getConfiguration(), Collections.emptyList()), (SessionHandle)SessionHandle.create(), (SessionEnvironment)SessionEnvironment.newBuilder().setSessionEndpointVersion((EndpointVersion)SqlGatewayRestAPIVersion.getDefaultVersion()).build(), (ExecutorService)Executors.newDirectExecutorService()), new TestPrinter(statements.iterator(), outputStream));
                this.executor.execute(String.join((CharSequence)"\n", statements.stream().map(TestSqlStatement::getSql).collect(Collectors.joining())));
                string = ((Object)outputStream).toString();
            }
            return string;
        }
    }

    @Test
    void testParseStatementWithComments() throws Exception {
        Assertions.assertThat((String)this.runScript("comment.q")).contains(new CharSequence[]{"+------+------+------+-----+--------+-----------+" + System.lineSeparator() + "| name | type | null | key | extras | watermark |" + System.lineSeparator() + "+------+------+------+-----+--------+-----------+" + System.lineSeparator() + "|    a |  INT | TRUE |     |        |           |" + System.lineSeparator() + "|    b |  INT | TRUE |     |        |           |" + System.lineSeparator() + "+------+------+------+-----+--------+-----------+"});
    }

    @Test
    void testParseStatementWithoutSemicolon() throws Exception {
        Assertions.assertThat((String)this.runScript("no_semicolon.q")).contains(new CharSequence[]{"+------+------+------+-----+--------+-----------+" + System.lineSeparator() + "| name | type | null | key | extras | watermark |" + System.lineSeparator() + "+------+------+------+-----+--------+-----------+" + System.lineSeparator() + "|    a |  INT | TRUE |     |        |           |" + System.lineSeparator() + "|    b |  INT | TRUE |     |        |           |" + System.lineSeparator() + "+------+------+------+-----+--------+-----------+"});
    }

    @Test
    void testParseErrorPositionIsCorrect() throws Exception {
        Assertions.assertThat((String)this.runScript("error.q")).contains(new CharSequence[]{"org.apache.flink.table.api.SqlParserException: SQL parse failed. Encountered \")\" at line 26, column 1."});
    }

    @Override
    protected boolean isStreaming() {
        return this.executor.context.getSessionConf().get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.STREAMING;
    }

    @Override
    protected boolean skip() {
        return this.parameters.getSqlPath().contains("set.q");
    }

    private String runScript(String fileName) throws Exception {
        try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream(1024);){
            this.executor = new ScriptExecutor(SessionContext.create((DefaultContext)new DefaultContext(new Configuration(), Collections.emptyList()), (SessionHandle)SessionHandle.create(), (SessionEnvironment)SessionEnvironment.newBuilder().setSessionEndpointVersion((EndpointVersion)SqlGatewayRestAPIVersion.getDefaultVersion()).build(), (ExecutorService)Executors.newDirectExecutorService()), new Printer((OutputStream)outputStream));
            String input = IOUtils.toString((InputStream)((InputStream)Preconditions.checkNotNull((Object)AbstractSqlGatewayStatementITCase.class.getResourceAsStream("/application/" + fileName))), (Charset)StandardCharsets.UTF_8);
            try {
                this.executor.execute(input);
            }
            catch (Exception exception) {
                // empty catch block
            }
            String string = ((Object)outputStream).toString();
            return string;
        }
    }

    private class TestPrinter
    extends Printer {
        private final Iterator<TestSqlStatement> statements;
        private String statement;

        public TestPrinter(Iterator<TestSqlStatement> statements, OutputStream stream) {
            super(stream);
            this.statements = statements;
        }

        public void print(String statement) {
            if (this.statements.hasNext()) {
                this.writer.print(this.statements.next().getComment());
            }
            this.statement = statement;
            this.writer.print(statement);
            this.writer.flush();
        }

        public void print(ResultFetcher fetcher) {
            AbstractSqlGatewayStatementITCaseBase.StatementType statementType = AbstractSqlGatewayStatementITCaseBase.StatementType.match(this.statement);
            try {
                this.writer.print(ScriptExecutorITCase.this.toString(statementType, fetcher.getResultSchema(), fetcher.getConverter(), (Iterator)new Printer.RowDataIterator(fetcher)));
                this.writer.flush();
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        public void print(Throwable t) {
            Throwable root = t;
            while (root.getCause() != null && root.getCause().getMessage() != null && !root.getCause().getMessage().isEmpty()) {
                root = root.getCause();
            }
            this.writer.print(AbstractSqlGatewayStatementITCaseBase.Tag.ERROR.addTag(root.getClass().getName() + ": " + ScriptExecutorITCase.this.removeRowNumber(root.getMessage().trim()) + "\n"));
            this.writer.flush();
        }
    }

    private static class TestScriptExecutor
    extends ScriptExecutor {
        public TestScriptExecutor(SessionContext context, Printer printer) {
            super(context, printer);
        }

        public void execute(String script) {
            ScriptExecutor.ResultIterator iterator = new ScriptExecutor.ResultIterator((ScriptExecutor)this, script);
            while (true) {
                try {
                    while (iterator.hasNext()) {
                        ScriptExecutor.Result result = iterator.next();
                        this.printer.print(result.statement);
                        if (result.error != null) {
                            throw result.error;
                        }
                        this.printer.print((ResultFetcher)Preconditions.checkNotNull((Object)result.fetcher));
                    }
                }
                catch (Throwable t) {
                    this.printer.print(t);
                    continue;
                }
                break;
            }
        }
    }
}

