package org.apache.flink.table.gateway.service;

import java.nio.file.Path;
import java.time.Duration;
import java.util.Collections;
import java.util.Iterator;
import java.util.function.BiFunction;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.table.api.ResultKind;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.gateway.AbstractSqlGatewayStatementITCase;
import org.apache.flink.table.gateway.api.operation.OperationHandle;
import org.apache.flink.table.gateway.api.results.ResultSet;
import org.apache.flink.table.gateway.api.session.SessionEnvironment;
import org.apache.flink.table.gateway.api.session.SessionHandle;
import org.apache.flink.table.gateway.api.utils.MockedEndpointVersion;
import org.apache.flink.table.gateway.service.utils.SqlGatewayServiceTestUtil;
import org.apache.flink.table.planner.functions.casting.RowDataToStringConverterImpl;
import org.apache.flink.table.utils.DateTimeUtils;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

/* loaded from: input_file:org/apache/flink/table/gateway/service/SqlGatewayServiceStatementITCase.class */
public class SqlGatewayServiceStatementITCase extends AbstractSqlGatewayStatementITCase {
    private final SessionEnvironment defaultSessionEnvironment = SessionEnvironment.newBuilder().setSessionEndpointVersion(MockedEndpointVersion.V1).build();
    private SessionHandle sessionHandle;

    /* loaded from: input_file:org/apache/flink/table/gateway/service/SqlGatewayServiceStatementITCase$RowDataIterator.class */
    private static class RowDataIterator implements Iterator<RowData> {
        private final SessionHandle sessionHandle;
        private final OperationHandle operationHandle;
        private Long token = 0L;
        private Iterator<RowData> fetchedRows = Collections.emptyIterator();

        public RowDataIterator(SessionHandle sessionHandle, OperationHandle operationHandle) {
            this.sessionHandle = sessionHandle;
            this.operationHandle = operationHandle;
            fetch();
        }

        @Override // java.util.Iterator
        public boolean hasNext() {
            while (this.token != null && !this.fetchedRows.hasNext()) {
                fetch();
            }
            return this.token != null;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.Iterator
        public RowData next() {
            return this.fetchedRows.next();
        }

        private void fetch() {
            ResultSet fetchResults = SqlGatewayServiceStatementITCase.service.fetchResults(this.sessionHandle, this.operationHandle, this.token.longValue(), Integer.MAX_VALUE);
            this.token = fetchResults.getNextToken();
            this.fetchedRows = fetchResults.getData().iterator();
        }
    }

    @Override // org.apache.flink.table.gateway.AbstractSqlGatewayStatementITCase
    @BeforeEach
    public void before(@TempDir Path path) throws Exception {
        super.before(path);
        this.sessionHandle = service.openSession(this.defaultSessionEnvironment);
    }

    @Override // org.apache.flink.table.gateway.AbstractSqlGatewayStatementITCase
    protected String runSingleStatement(String str) throws Exception {
        OperationHandle executeStatement = service.executeStatement(this.sessionHandle, str, -1L, new Configuration());
        CommonTestUtils.waitUtil(() -> {
            return Boolean.valueOf(service.getOperationInfo(this.sessionHandle, executeStatement).getStatus().isTerminalStatus());
        }, Duration.ofSeconds(100L), "Failed to wait operation finish.");
        ResultSet fetchResults = service.fetchResults(this.sessionHandle, executeStatement, 0L, Integer.MAX_VALUE);
        return toString(AbstractSqlGatewayStatementITCase.StatementType.match(str), fetchResults.getResultSchema(), new RowDataToStringConverterImpl(fetchResults.getResultSchema().toPhysicalRowDataType(), DateTimeUtils.UTC_ZONE.toZoneId(), SqlGatewayServiceStatementITCase.class.getClassLoader(), false), new RowDataIterator(this.sessionHandle, executeStatement));
    }

    @Override // org.apache.flink.table.gateway.AbstractSqlGatewayStatementITCase
    protected String stringifyException(Throwable th) {
        Throwable th2;
        Throwable th3 = th;
        while (true) {
            th2 = th3;
            if (th2.getCause() == null || th2.getCause().getMessage() == null || th2.getCause().getMessage().isEmpty()) {
                break;
            }
            th3 = th2.getCause();
        }
        return th2.getClass().getName() + ": " + th2.getMessage();
    }

    @Override // org.apache.flink.table.gateway.AbstractSqlGatewayStatementITCase
    protected boolean isStreaming() {
        return ((RuntimeExecutionMode) Configuration.fromMap(service.getSessionConfig(this.sessionHandle)).get(ExecutionOptions.RUNTIME_MODE)).equals(RuntimeExecutionMode.STREAMING);
    }

    @Test
    void testIsQueryResult() throws Exception {
        SessionHandle createInitializedSession = SqlGatewayServiceTestUtil.createInitializedSession(service);
        BiFunction biFunction = (sessionHandle, operationHandle) -> {
            return Boolean.valueOf(SqlGatewayServiceTestUtil.fetchResults(service, sessionHandle, operationHandle).isQueryResult());
        };
        validateResultSetField(createInitializedSession, "SELECT * FROM cat1.db1.tbl1;", biFunction, true);
        validateResultSetField(createInitializedSession, "WITH hub AS (SELECT * FROM cat1.db1.tbl1)\nSELECT * FROM hub;", biFunction, true);
        validateResultSetField(createInitializedSession, "INSERT INTO cat1.db1.tbl1 SELECT * FROM cat1.db1.tbl2;", biFunction, false);
    }

    @Test
    void testHasJobID() throws Exception {
        SessionHandle createInitializedSession = SqlGatewayServiceTestUtil.createInitializedSession(service);
        BiFunction biFunction = (sessionHandle, operationHandle) -> {
            return Boolean.valueOf(SqlGatewayServiceTestUtil.fetchResults(service, sessionHandle, operationHandle).getJobID() != null);
        };
        validateResultSetField(createInitializedSession, "SELECT * FROM cat1.db1.tbl1;", biFunction, true);
        validateResultSetField(createInitializedSession, "INSERT INTO cat1.db1.tbl1 SELECT * FROM cat1.db1.tbl2;", biFunction, true);
        validateResultSetField(createInitializedSession, "CREATE TABLE test (f0 INT) WITH ('connector' = 'values');", biFunction, false);
    }

    @Test
    void testResultKind() throws Exception {
        SessionHandle createInitializedSession = SqlGatewayServiceTestUtil.createInitializedSession(service);
        BiFunction biFunction = (sessionHandle, operationHandle) -> {
            return SqlGatewayServiceTestUtil.fetchResults(service, sessionHandle, operationHandle).getResultKind();
        };
        validateResultSetField(createInitializedSession, "SELECT * FROM cat1.db1.tbl1;", biFunction, ResultKind.SUCCESS_WITH_CONTENT);
        validateResultSetField(createInitializedSession, "INSERT INTO cat1.db1.tbl1 SELECT * FROM cat1.db1.tbl2;", biFunction, ResultKind.SUCCESS_WITH_CONTENT);
        validateResultSetField(createInitializedSession, "CREATE TABLE test (f0 INT) WITH ('connector' = 'values');", biFunction, ResultKind.SUCCESS);
        validateResultSetField(createInitializedSession, "SET 'key' = 'value';", biFunction, ResultKind.SUCCESS);
        validateResultSetField(createInitializedSession, "SET;", biFunction, ResultKind.SUCCESS_WITH_CONTENT);
    }

    private <T> void validateResultSetField(SessionHandle sessionHandle, String str, BiFunction<SessionHandle, OperationHandle, T> biFunction, T t) throws Exception {
        OperationHandle executeStatement = service.executeStatement(sessionHandle, str, -1L, new Configuration());
        SqlGatewayServiceTestUtil.awaitOperationTermination(service, sessionHandle, executeStatement);
        Assertions.assertThat(biFunction.apply(sessionHandle, executeStatement)).isEqualTo(t);
    }
}
