package org.apache.flink.table.api.internal;

import java.io.IOException;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.JobStatusHook;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.api.CompiledPlan;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.api.ExplainFormat;
import org.apache.flink.table.api.PlanReference;
import org.apache.flink.table.api.ResultKind;
import org.apache.flink.table.api.SqlParserException;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableDescriptor;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.config.TableConfigOptions;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogDescriptor;
import org.apache.flink.table.catalog.CatalogFunction;
import org.apache.flink.table.catalog.CatalogFunctionImpl;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.CatalogStore;
import org.apache.flink.table.catalog.CatalogStoreHolder;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ConnectorCatalogTable;
import org.apache.flink.table.catalog.ContextResolvedTable;
import org.apache.flink.table.catalog.DataTypeFactory;
import org.apache.flink.table.catalog.FunctionCatalog;
import org.apache.flink.table.catalog.FunctionLanguage;
import org.apache.flink.table.catalog.FunctionLookup;
import org.apache.flink.table.catalog.GenericInMemoryCatalog;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.QueryOperationCatalogView;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.UnresolvedIdentifier;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.SinkStagingContext;
import org.apache.flink.table.connector.sink.abilities.SupportsStaging;
import org.apache.flink.table.delegation.Executor;
import org.apache.flink.table.delegation.ExecutorFactory;
import org.apache.flink.table.delegation.InternalPlan;
import org.apache.flink.table.delegation.Parser;
import org.apache.flink.table.delegation.Planner;
import org.apache.flink.table.execution.StagingSinkJobStatusHook;
import org.apache.flink.table.expressions.ApiExpressionUtils;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.expressions.resolver.lookups.TableReferenceLookup;
import org.apache.flink.table.factories.CatalogStoreFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.factories.PlannerFactoryUtil;
import org.apache.flink.table.factories.TableFactoryUtil;
import org.apache.flink.table.functions.FunctionDefinition;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.functions.UserDefinedFunction;
import org.apache.flink.table.functions.UserDefinedFunctionHelper;
import org.apache.flink.table.module.Module;
import org.apache.flink.table.module.ModuleEntry;
import org.apache.flink.table.module.ModuleManager;
import org.apache.flink.table.operations.CollectModifyOperation;
import org.apache.flink.table.operations.CompileAndExecutePlanOperation;
import org.apache.flink.table.operations.CreateTableASOperation;
import org.apache.flink.table.operations.DeleteFromFilterOperation;
import org.apache.flink.table.operations.ExecutableOperation;
import org.apache.flink.table.operations.ExplainOperation;
import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.operations.NopOperation;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.QueryOperation;
import org.apache.flink.table.operations.ReplaceTableAsOperation;
import org.apache.flink.table.operations.SinkModifyOperation;
import org.apache.flink.table.operations.SourceQueryOperation;
import org.apache.flink.table.operations.StatementSetOperation;
import org.apache.flink.table.operations.TableSourceQueryOperation;
import org.apache.flink.table.operations.command.ExecutePlanOperation;
import org.apache.flink.table.operations.ddl.AnalyzeTableOperation;
import org.apache.flink.table.operations.ddl.CompilePlanOperation;
import org.apache.flink.table.operations.ddl.CreateTableOperation;
import org.apache.flink.table.operations.utils.ExecutableOperationUtils;
import org.apache.flink.table.operations.utils.OperationTreeBuilder;
import org.apache.flink.table.resource.ResourceManager;
import org.apache.flink.table.resource.ResourceType;
import org.apache.flink.table.resource.ResourceUri;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.sources.TableSource;
import org.apache.flink.table.sources.TableSourceValidation;
import org.apache.flink.table.types.AbstractDataType;
import org.apache.flink.table.types.utils.DataTypeUtils;
import org.apache.flink.table.utils.print.PrintStyle;
import org.apache.flink.types.Row;
import org.apache.flink.util.FlinkUserCodeClassLoaders;
import org.apache.flink.util.Preconditions;

@Internal
/* loaded from: input_file:org/apache/flink/table/api/internal/TableEnvironmentImpl.class */
public class TableEnvironmentImpl implements TableEnvironmentInternal {
    private static final boolean IS_STREAM_TABLE = true;
    private final CatalogManager catalogManager;
    private final ModuleManager moduleManager;
    protected final ResourceManager resourceManager;
    private final OperationTreeBuilder operationTreeBuilder;
    protected final TableConfig tableConfig;
    protected final Executor execEnv;
    protected final FunctionCatalog functionCatalog;
    protected final Planner planner;
    private final boolean isStreamingMode;
    private final ExecutableOperation.Context operationCtx;
    private static final String UNSUPPORTED_QUERY_IN_EXECUTE_SQL_MSG = "Unsupported SQL query! executeSql() only accepts a single SQL statement of type CREATE TABLE, DROP TABLE, ALTER TABLE, CREATE DATABASE, DROP DATABASE, ALTER DATABASE, CREATE FUNCTION, DROP FUNCTION, ALTER FUNCTION, CREATE CATALOG, DROP CATALOG, USE CATALOG, USE [CATALOG.]DATABASE, SHOW CATALOGS, SHOW DATABASES, SHOW TABLES, SHOW [USER] FUNCTIONS, SHOW PARTITIONSCREATE VIEW, DROP VIEW, SHOW VIEWS, INSERT, DESCRIBE, LOAD MODULE, UNLOAD MODULE, USE MODULES, SHOW [FULL] MODULES.";
    private static final String UNSUPPORTED_QUERY_IN_COMPILE_PLAN_SQL_MSG = "Unsupported SQL query! compilePlanSql() only accepts a single SQL statement of type INSERT";

    protected TableEnvironmentImpl(CatalogManager catalogManager, ModuleManager moduleManager, ResourceManager resourceManager, TableConfig tableConfig, Executor executor, FunctionCatalog functionCatalog, Planner planner, boolean z) {
        this.catalogManager = catalogManager;
        this.moduleManager = moduleManager;
        this.resourceManager = resourceManager;
        this.execEnv = executor;
        this.tableConfig = tableConfig;
        this.functionCatalog = functionCatalog;
        this.planner = planner;
        this.isStreamingMode = z;
        URLClassLoader userClassLoader = resourceManager.getUserClassLoader();
        Parser parser = getParser();
        Objects.requireNonNull(parser);
        FunctionLookup asLookup = functionCatalog.asLookup(parser::parseIdentifier);
        DataTypeFactory dataTypeFactory = catalogManager.getDataTypeFactory();
        TableReferenceLookup tableReferenceLookup = str -> {
            try {
                return scanInternal(getParser().parseIdentifier(str)).map(sourceQueryOperation -> {
                    return ApiExpressionUtils.tableRef(str, sourceQueryOperation);
                });
            } catch (SqlParserException e) {
                return Optional.empty();
            }
        };
        Parser parser2 = getParser();
        Objects.requireNonNull(parser2);
        this.operationTreeBuilder = OperationTreeBuilder.create(tableConfig, userClassLoader, asLookup, dataTypeFactory, tableReferenceLookup, parser2::parseSqlExpression, z);
        catalogManager.initSchemaResolver(z, this.operationTreeBuilder.getResolverBuilder(new QueryOperation[0]));
        this.operationCtx = new ExecutableOperationContextImpl(catalogManager, functionCatalog, moduleManager, resourceManager, tableConfig, z);
    }

    public static TableEnvironmentImpl create(Configuration configuration) {
        return create(EnvironmentSettings.newInstance().withConfiguration(configuration).build());
    }

    public static TableEnvironmentImpl create(EnvironmentSettings environmentSettings) {
        ClassLoader create = FlinkUserCodeClassLoaders.create(new URL[0], environmentSettings.getUserClassLoader(), environmentSettings.getConfiguration());
        Executor create2 = ((ExecutorFactory) FactoryUtil.discoverFactory(create, ExecutorFactory.class, "default")).create(environmentSettings.getConfiguration());
        CatalogStoreFactory findAndCreateCatalogStoreFactory = TableFactoryUtil.findAndCreateCatalogStoreFactory(environmentSettings.getConfiguration(), create);
        findAndCreateCatalogStoreFactory.open(TableFactoryUtil.buildCatalogStoreFactoryContext(environmentSettings.getConfiguration(), create));
        CatalogStore catalogStore = environmentSettings.getCatalogStore() != null ? environmentSettings.getCatalogStore() : findAndCreateCatalogStoreFactory.createCatalogStore();
        TableConfig tableConfig = TableConfig.getDefault();
        tableConfig.setRootConfiguration(create2.getConfiguration());
        tableConfig.addConfiguration(environmentSettings.getConfiguration());
        ResourceManager resourceManager = new ResourceManager(environmentSettings.getConfiguration(), create);
        ModuleManager moduleManager = new ModuleManager();
        CatalogManager build = CatalogManager.newBuilder().classLoader(create).config(tableConfig).defaultCatalog(environmentSettings.getBuiltInCatalogName(), new GenericInMemoryCatalog(environmentSettings.getBuiltInCatalogName(), environmentSettings.getBuiltInDatabaseName())).catalogModificationListeners(TableFactoryUtil.findCatalogModificationListenerList(environmentSettings.getConfiguration(), create)).catalogStoreHolder(CatalogStoreHolder.newBuilder().catalogStore(catalogStore).factory(findAndCreateCatalogStoreFactory).config(tableConfig).classloader(create).build()).build();
        FunctionCatalog functionCatalog = new FunctionCatalog(tableConfig, resourceManager, build, moduleManager);
        return new TableEnvironmentImpl(build, moduleManager, resourceManager, tableConfig, create2, functionCatalog, PlannerFactoryUtil.createPlanner(create2, tableConfig, create, moduleManager, build, functionCatalog), environmentSettings.isStreamingMode());
    }

    @Override // org.apache.flink.table.api.TableEnvironment
    public Table fromValues(Object... objArr) {
        return fromValues(Arrays.asList(objArr));
    }

    @Override // org.apache.flink.table.api.TableEnvironment
    public Table fromValues(AbstractDataType<?> abstractDataType, Object... objArr) {
        return fromValues(abstractDataType, Arrays.asList(objArr));
    }

    @Override // org.apache.flink.table.api.TableEnvironment
    public Table fromValues(Expression... expressionArr) {
        return createTable(this.operationTreeBuilder.values(expressionArr));
    }

    @Override // org.apache.flink.table.api.TableEnvironment
    public Table fromValues(AbstractDataType<?> abstractDataType, Expression... expressionArr) {
        return createTable(this.operationTreeBuilder.values(this.catalogManager.getDataTypeFactory().createDataType(abstractDataType), expressionArr));
    }

    @Override // org.apache.flink.table.api.TableEnvironment
    public Table fromValues(Iterable<?> iterable) {
        return fromValues((Expression[]) StreamSupport.stream(iterable.spliterator(), false).map(ApiExpressionUtils::objectToExpression).toArray(i -> {
            return new Expression[i];
        }));
    }

    @Override // org.apache.flink.table.api.TableEnvironment
    public Table fromValues(AbstractDataType<?> abstractDataType, Iterable<?> iterable) {
        return fromValues(abstractDataType, (Expression[]) StreamSupport.stream(iterable.spliterator(), false).map(ApiExpressionUtils::objectToExpression).toArray(i -> {
            return new Expression[i];
        }));
    }

    @VisibleForTesting
    public Planner getPlanner() {
        return this.planner;
    }

    @Override // org.apache.flink.table.api.internal.TableEnvironmentInternal
    public Table fromTableSource(TableSource<?> tableSource) {
        return createTable(new TableSourceQueryOperation(tableSource, false));
    }

    @Override // org.apache.flink.table.api.TableEnvironment
    public void registerCatalog(String str, Catalog catalog) {
        this.catalogManager.registerCatalog(str, catalog);
    }

    @Override // org.apache.flink.table.api.TableEnvironment
    public void createCatalog(String str, CatalogDescriptor catalogDescriptor) {
        this.catalogManager.createCatalog(str, catalogDescriptor);
    }

    @Override // org.apache.flink.table.api.TableEnvironment
    public Optional<Catalog> getCatalog(String str) {
        return this.catalogManager.getCatalog(str);
    }

    @Override // org.apache.flink.table.api.TableEnvironment
    public void loadModule(String str, Module module) {
        this.moduleManager.loadModule(str, module);
    }

    @Override // org.apache.flink.table.api.TableEnvironment
    public void useModules(String... strArr) {
        this.moduleManager.useModules(strArr);
    }

    @Override // org.apache.flink.table.api.TableEnvironment
    public void unloadModule(String str) {
        this.moduleManager.unloadModule(str);
    }

    @Override // org.apache.flink.table.api.TableEnvironment
    public void registerFunction(String str, ScalarFunction scalarFunction) {
        this.functionCatalog.registerTempSystemScalarFunction(str, scalarFunction);
    }

    @Override // org.apache.flink.table.api.TableEnvironment
    public void createTemporarySystemFunction(String str, Class<? extends UserDefinedFunction> cls) {
        createTemporarySystemFunction(str, UserDefinedFunctionHelper.instantiateFunction(cls));
    }

    @Override // org.apache.flink.table.api.TableEnvironment
    public void createTemporarySystemFunction(String str, UserDefinedFunction userDefinedFunction) {
        this.functionCatalog.registerTemporarySystemFunction(str, (FunctionDefinition) userDefinedFunction, false);
    }

    @Override // org.apache.flink.table.api.TableEnvironment
    public void createTemporarySystemFunction(String str, String str2, List<ResourceUri> list) {
        this.functionCatalog.registerTemporarySystemFunction(str, str2, list);
    }

    @Override // org.apache.flink.table.api.TableEnvironment
    public boolean dropTemporarySystemFunction(String str) {
        return this.functionCatalog.dropTemporarySystemFunction(str, true);
    }

    @Override // org.apache.flink.table.api.TableEnvironment
    public void createFunction(String str, Class<? extends UserDefinedFunction> cls) {
        createFunction(str, cls, false);
    }

    @Override // org.apache.flink.table.api.TableEnvironment
    public void createFunction(String str, Class<? extends UserDefinedFunction> cls, boolean z) {
        this.functionCatalog.registerCatalogFunction(getParser().parseIdentifier(str), cls, z);
    }

    @Override // org.apache.flink.table.api.TableEnvironment
    public void createFunction(String str, String str2, List<ResourceUri> list) {
        createFunction(str, str2, list, false);
    }

    @Override // org.apache.flink.table.api.TableEnvironment
    public void createFunction(String str, String str2, List<ResourceUri> list, boolean z) {
        this.functionCatalog.registerCatalogFunction(getParser().parseIdentifier(str), str2, list, z);
    }

    @Override // org.apache.flink.table.api.TableEnvironment
    public boolean dropFunction(String str) {
        return this.functionCatalog.dropCatalogFunction(getParser().parseIdentifier(str), true);
    }

    @Override // org.apache.flink.table.api.TableEnvironment
    public void createTemporaryFunction(String str, Class<? extends UserDefinedFunction> cls) {
        createTemporaryFunction(str, UserDefinedFunctionHelper.instantiateFunction(cls));
    }

    @Override // org.apache.flink.table.api.TableEnvironment
    public void createTemporaryFunction(String str, UserDefinedFunction userDefinedFunction) {
        this.functionCatalog.registerTemporaryCatalogFunction(getParser().parseIdentifier(str), (FunctionDefinition) userDefinedFunction, false);
    }

    @Override // org.apache.flink.table.api.TableEnvironment
    public void createTemporaryFunction(String str, String str2, List<ResourceUri> list) {
        this.functionCatalog.registerTemporaryCatalogFunction(getParser().parseIdentifier(str), (CatalogFunction) new CatalogFunctionImpl(str2, FunctionLanguage.JAVA, list), false);
    }

    @Override // org.apache.flink.table.api.TableEnvironment
    public boolean dropTemporaryFunction(String str) {
        return this.functionCatalog.dropTemporaryCatalogFunction(getParser().parseIdentifier(str), true);
    }

    @Override // org.apache.flink.table.api.TableEnvironment
    public void createTemporaryTable(String str, TableDescriptor tableDescriptor) {
        Preconditions.checkNotNull(str, "Path must not be null.");
        Preconditions.checkNotNull(tableDescriptor, "Table descriptor must not be null.");
        this.catalogManager.createTemporaryTable(tableDescriptor.toCatalogTable(), this.catalogManager.qualifyIdentifier(getParser().parseIdentifier(str)), false);
    }

    @Override // org.apache.flink.table.api.TableEnvironment
    public void createTable(String str, TableDescriptor tableDescriptor) {
        Preconditions.checkNotNull(str, "Path must not be null.");
        Preconditions.checkNotNull(tableDescriptor, "Table descriptor must not be null.");
        this.catalogManager.createTable(tableDescriptor.toCatalogTable(), this.catalogManager.qualifyIdentifier(getParser().parseIdentifier(str)), false);
    }

    @Override // org.apache.flink.table.api.TableEnvironment
    public void registerTable(String str, Table table) {
        createTemporaryView(UnresolvedIdentifier.of(new String[]{str}), table);
    }

    @Override // org.apache.flink.table.api.TableEnvironment
    public void createTemporaryView(String str, Table table) {
        Preconditions.checkNotNull(str, "Path must not be null.");
        Preconditions.checkNotNull(table, "Table view must not be null.");
        createTemporaryView(getParser().parseIdentifier(str), table);
    }

    private void createTemporaryView(UnresolvedIdentifier unresolvedIdentifier, Table table) {
        if (((TableImpl) table).getTableEnvironment() != this) {
            throw new TableException("Only table API objects that belong to this TableEnvironment can be registered.");
        }
        ObjectIdentifier qualifyIdentifier = this.catalogManager.qualifyIdentifier(unresolvedIdentifier);
        this.catalogManager.createTemporaryTable(new QueryOperationCatalogView(qualifyQueryOperation(qualifyIdentifier, table.getQueryOperation())), qualifyIdentifier, false);
    }

    @Override // org.apache.flink.table.api.TableEnvironment
    public Table scan(String... strArr) {
        UnresolvedIdentifier of = UnresolvedIdentifier.of(strArr);
        return (Table) scanInternal(of).map((v1) -> {
            return createTable(v1);
        }).orElseThrow(() -> {
            return new ValidationException(String.format("Table %s was not found.", of));
        });
    }

    @Override // org.apache.flink.table.api.TableEnvironment
    public Table from(String str) {
        UnresolvedIdentifier parseIdentifier = getParser().parseIdentifier(str);
        return (Table) scanInternal(parseIdentifier).map((v1) -> {
            return createTable(v1);
        }).orElseThrow(() -> {
            return new ValidationException(String.format("Table %s was not found.", parseIdentifier));
        });
    }

    @Override // org.apache.flink.table.api.TableEnvironment
    public Table from(TableDescriptor tableDescriptor) {
        Preconditions.checkNotNull(tableDescriptor, "Table descriptor must not be null.");
        return createTable(new SourceQueryOperation(ContextResolvedTable.anonymous(this.catalogManager.resolveCatalogTable(tableDescriptor.toCatalogTable()))));
    }

    private Optional<SourceQueryOperation> scanInternal(UnresolvedIdentifier unresolvedIdentifier) {
        return this.catalogManager.getTable(this.catalogManager.qualifyIdentifier(unresolvedIdentifier)).map(SourceQueryOperation::new);
    }

    @Override // org.apache.flink.table.api.TableEnvironment
    public String[] listCatalogs() {
        return (String[]) this.catalogManager.listCatalogs().stream().sorted().toArray(i -> {
            return new String[i];
        });
    }

    @Override // org.apache.flink.table.api.TableEnvironment
    public String[] listModules() {
        return (String[]) this.moduleManager.listModules().toArray(new String[0]);
    }

    @Override // org.apache.flink.table.api.TableEnvironment
    public ModuleEntry[] listFullModules() {
        return (ModuleEntry[]) this.moduleManager.listFullModules().toArray(new ModuleEntry[0]);
    }

    @Override // org.apache.flink.table.api.TableEnvironment
    public String[] listDatabases() {
        return (String[]) this.catalogManager.getCatalog(this.catalogManager.getCurrentCatalog()).get().listDatabases().stream().sorted().toArray(i -> {
            return new String[i];
        });
    }

    @Override // org.apache.flink.table.api.TableEnvironment
    public String[] listTables() {
        return (String[]) this.catalogManager.listTables().stream().sorted().toArray(i -> {
            return new String[i];
        });
    }

    @Override // org.apache.flink.table.api.TableEnvironment
    public String[] listTables(String str, String str2) {
        return (String[]) this.catalogManager.listTables(str, str2).stream().sorted().toArray(i -> {
            return new String[i];
        });
    }

    @Override // org.apache.flink.table.api.TableEnvironment
    public String[] listViews() {
        return (String[]) this.catalogManager.listViews().stream().sorted().toArray(i -> {
            return new String[i];
        });
    }

    @Override // org.apache.flink.table.api.TableEnvironment
    public String[] listTemporaryTables() {
        return (String[]) this.catalogManager.listTemporaryTables().stream().sorted().toArray(i -> {
            return new String[i];
        });
    }

    @Override // org.apache.flink.table.api.TableEnvironment
    public String[] listTemporaryViews() {
        return (String[]) this.catalogManager.listTemporaryViews().stream().sorted().toArray(i -> {
            return new String[i];
        });
    }

    @Override // org.apache.flink.table.api.TableEnvironment
    public boolean dropTemporaryTable(String str) {
        try {
            this.catalogManager.dropTemporaryTable(this.catalogManager.qualifyIdentifier(getParser().parseIdentifier(str)), false);
            return true;
        } catch (ValidationException e) {
            return false;
        }
    }

    @Override // org.apache.flink.table.api.TableEnvironment
    public boolean dropTemporaryView(String str) {
        try {
            this.catalogManager.dropTemporaryView(this.catalogManager.qualifyIdentifier(getParser().parseIdentifier(str)), false);
            return true;
        } catch (ValidationException e) {
            return false;
        }
    }

    @Override // org.apache.flink.table.api.TableEnvironment
    public String[] listUserDefinedFunctions() {
        String[] userDefinedFunctions = this.functionCatalog.getUserDefinedFunctions();
        Arrays.sort(userDefinedFunctions);
        return userDefinedFunctions;
    }

    @Override // org.apache.flink.table.api.TableEnvironment
    public String[] listFunctions() {
        String[] functions = this.functionCatalog.getFunctions();
        Arrays.sort(functions);
        return functions;
    }

    @Override // org.apache.flink.table.api.TableEnvironment
    public String explainSql(String str, ExplainFormat explainFormat, ExplainDetail... explainDetailArr) {
        List<Operation> parse = getParser().parse(str);
        if (parse.size() != IS_STREAM_TABLE) {
            throw new TableException("Unsupported SQL query! explainSql() only accepts a single SQL query.");
        }
        if (parse.get(0) instanceof StatementSetOperation) {
            parse = new ArrayList(((StatementSetOperation) parse.get(0)).getOperations());
        }
        return explainInternal(parse, explainFormat, explainDetailArr);
    }

    @Override // org.apache.flink.table.api.internal.TableEnvironmentInternal
    public String explainInternal(List<Operation> list, ExplainFormat explainFormat, ExplainDetail... explainDetailArr) {
        List<Operation> list2 = (List) list.stream().filter(operation -> {
            return !(operation instanceof NopOperation);
        }).collect(Collectors.toList());
        if (list2.isEmpty()) {
            return "";
        }
        if (list2.size() <= IS_STREAM_TABLE || !list2.stream().anyMatch(this::isRowLevelModification)) {
            return this.planner.explain(list2, explainFormat, explainDetailArr);
        }
        throw new TableException("Unsupported SQL query! Only accept a single SQL statement of type DELETE, UPDATE.");
    }

    @Override // org.apache.flink.table.api.TableEnvironment
    public String[] getCompletionHints(String str, int i) {
        return this.planner.getParser().getCompletionHints(str, i);
    }

    @Override // org.apache.flink.table.api.TableEnvironment
    public Table sqlQuery(String str) {
        List<Operation> parse = getParser().parse(str);
        if (parse.size() != IS_STREAM_TABLE) {
            throw new ValidationException("Unsupported SQL query! sqlQuery() only accepts a single SQL query.");
        }
        Operation operation = parse.get(0);
        if (!(operation instanceof QueryOperation) || (operation instanceof ModifyOperation)) {
            throw new ValidationException("Unsupported SQL query! sqlQuery() only accepts a single SQL query of type SELECT, UNION, INTERSECT, EXCEPT, VALUES, and ORDER_BY.");
        }
        return createTable((QueryOperation) operation);
    }

    @Override // org.apache.flink.table.api.TableEnvironment
    public TableResult executeSql(String str) {
        List<Operation> parse = getParser().parse(str);
        if (parse.size() != IS_STREAM_TABLE) {
            throw new TableException(UNSUPPORTED_QUERY_IN_EXECUTE_SQL_MSG);
        }
        return executeInternal(parse.get(0));
    }

    @Override // org.apache.flink.table.api.internal.TableEnvironmentInternal
    public TableResultInternal executeCachedPlanInternal(CachedPlan cachedPlan) {
        if (!(cachedPlan instanceof DQLCachedPlan)) {
            throw new TableException(String.format("Unsupported CachedPlan type: %s.", cachedPlan.getClass()));
        }
        DQLCachedPlan dQLCachedPlan = (DQLCachedPlan) cachedPlan;
        return executeQueryOperation(dQLCachedPlan.getOperation(), dQLCachedPlan.getSinkOperation(), dQLCachedPlan.getTransformations());
    }

    @Override // org.apache.flink.table.api.TableEnvironment
    public StatementSet createStatementSet() {
        return new StatementSetImpl(this);
    }

    @Override // org.apache.flink.table.api.TableEnvironment
    public CompiledPlan loadPlan(PlanReference planReference) {
        try {
            return new CompiledPlanImpl(this, this.planner.loadPlan(planReference));
        } catch (IOException e) {
            throw new TableException(String.format("Cannot load %s.", planReference), e);
        }
    }

    @Override // org.apache.flink.table.api.TableEnvironment
    public CompiledPlan compilePlanSql(String str) {
        List<Operation> parse = getParser().parse(str);
        if (parse.size() != IS_STREAM_TABLE || !(parse.get(0) instanceof ModifyOperation) || isRowLevelModification(parse.get(0)) || (parse.get(0) instanceof CreateTableASOperation)) {
            throw new TableException(UNSUPPORTED_QUERY_IN_COMPILE_PLAN_SQL_MSG);
        }
        return new CompiledPlanImpl(this, this.planner.compilePlan(Collections.singletonList((ModifyOperation) parse.get(0))));
    }

    @Override // org.apache.flink.table.api.internal.TableEnvironmentInternal
    public TableResultInternal executePlan(InternalPlan internalPlan) {
        return executeInternal(this.planner.translatePlan(internalPlan), deduplicateSinkIdentifierNames(internalPlan.getSinkIdentifiers()));
    }

    private CompiledPlan compilePlanAndWrite(String str, boolean z, Operation operation) {
        CompiledPlan compilePlan;
        try {
            ResourceUri resourceUri = new ResourceUri(ResourceType.FILE, str);
            if (this.resourceManager.exists(new Path(str))) {
                if (z) {
                    return loadPlan(PlanReference.fromFile(this.resourceManager.registerFileResource(resourceUri)));
                }
                if (!((Boolean) this.tableConfig.get(TableConfigOptions.PLAN_FORCE_RECOMPILE)).booleanValue()) {
                    throw new TableException(String.format("Cannot overwrite the plan file '%s'. Either manually remove the file or, if you're debugging your job, set the option '%s' to true.", str, TableConfigOptions.PLAN_FORCE_RECOMPILE.key()));
                }
            }
            if (operation instanceof StatementSetOperation) {
                compilePlan = compilePlan(((StatementSetOperation) operation).getOperations());
            } else {
                if (!(operation instanceof ModifyOperation)) {
                    throw new TableException("Unsupported operation to compile: " + operation.getClass() + ". This is a bug, please file an issue.");
                }
                compilePlan = compilePlan(Collections.singletonList((ModifyOperation) operation));
            }
            CompiledPlan compiledPlan = compilePlan;
            this.resourceManager.syncFileResource(resourceUri, str2 -> {
                compiledPlan.writeToFile(str2, false);
            });
            return compilePlan;
        } catch (IOException e) {
            throw new TableException(String.format("Failed to execute %s statement.", operation.asSummaryString()), e);
        }
    }

    @Override // org.apache.flink.table.api.internal.TableEnvironmentInternal
    public CompiledPlan compilePlan(List<ModifyOperation> list) {
        return new CompiledPlanImpl(this, this.planner.compilePlan(list));
    }

    @Override // org.apache.flink.table.api.internal.TableEnvironmentInternal
    public TableResultInternal executeInternal(List<ModifyOperation> list) {
        ArrayList arrayList = new ArrayList();
        LinkedList linkedList = new LinkedList();
        for (ModifyOperation modifyOperation : list) {
            if (modifyOperation instanceof CreateTableASOperation) {
                arrayList.add(getModifyOperation((CreateTableASOperation) modifyOperation, linkedList));
            } else if (modifyOperation instanceof ReplaceTableAsOperation) {
                arrayList.add(getModifyOperation((ReplaceTableAsOperation) modifyOperation, linkedList));
            } else {
                if (isRowLevelModification(modifyOperation)) {
                    String str = ((SinkModifyOperation) modifyOperation).isDelete() ? "DELETE" : "UPDATE";
                    if (list.size() > IS_STREAM_TABLE) {
                        throw new TableException(String.format("Unsupported SQL query! Only accept a single SQL statement of type %s.", str));
                    }
                    if (this.isStreamingMode) {
                        throw new TableException(String.format("%s statement is not supported for streaming mode now.", str));
                    }
                    if (modifyOperation instanceof DeleteFromFilterOperation) {
                        return executeInternal((DeleteFromFilterOperation) modifyOperation);
                    }
                }
                arrayList.add(modifyOperation);
            }
        }
        return executeInternal(translate(arrayList), extractSinkIdentifierNames(arrayList), linkedList);
    }

    private ModifyOperation getModifyOperation(ReplaceTableAsOperation replaceTableAsOperation, List<JobStatusHook> list) {
        CreateTableOperation createTableOperation = replaceTableAsOperation.getCreateTableOperation();
        ObjectIdentifier tableIdentifier = createTableOperation.getTableIdentifier();
        Optional<ContextResolvedTable> table = this.catalogManager.getTable(tableIdentifier);
        if (!replaceTableAsOperation.isCreateOrReplace() && !table.isPresent()) {
            throw new TableException(String.format("The table %s to be replaced doesn't exist. You can try to use CREATE TABLE AS statement or CREATE OR REPLACE TABLE AS statement.", tableIdentifier));
        }
        Catalog catalogOrThrowException = this.catalogManager.getCatalogOrThrowException(tableIdentifier.getCatalogName());
        ResolvedCatalogTable resolveCatalogTable = this.catalogManager.resolveCatalogTable(createTableOperation.getCatalogTable());
        Optional<DynamicTableSink> supportsStagingDynamicTableSink = getSupportsStagingDynamicTableSink(createTableOperation, catalogOrThrowException, resolveCatalogTable);
        if (supportsStagingDynamicTableSink.isPresent()) {
            SupportsStaging supportsStaging = (DynamicTableSink) supportsStagingDynamicTableSink.get();
            list.add(new StagingSinkJobStatusHook(supportsStaging.applyStaging(new SinkStagingContext(replaceTableAsOperation.isCreateOrReplace() ? SupportsStaging.StagingPurpose.CREATE_OR_REPLACE_TABLE_AS : SupportsStaging.StagingPurpose.REPLACE_TABLE_AS))));
            return replaceTableAsOperation.toStagedSinkModifyOperation(tableIdentifier, resolveCatalogTable, catalogOrThrowException, supportsStaging);
        }
        if (table.isPresent()) {
            this.catalogManager.dropTable(tableIdentifier, false);
        }
        executeInternal(createTableOperation);
        return replaceTableAsOperation.toSinkModifyOperation(this.catalogManager);
    }

    private ModifyOperation getModifyOperation(CreateTableASOperation createTableASOperation, List<JobStatusHook> list) {
        CreateTableOperation createTableOperation = createTableASOperation.getCreateTableOperation();
        ObjectIdentifier tableIdentifier = createTableOperation.getTableIdentifier();
        Catalog catalogOrThrowException = this.catalogManager.getCatalogOrThrowException(tableIdentifier.getCatalogName());
        ResolvedCatalogTable resolveCatalogTable = this.catalogManager.resolveCatalogTable(createTableOperation.getCatalogTable());
        Optional<DynamicTableSink> supportsStagingDynamicTableSink = getSupportsStagingDynamicTableSink(createTableOperation, catalogOrThrowException, resolveCatalogTable);
        if (!supportsStagingDynamicTableSink.isPresent()) {
            executeInternal(createTableOperation);
            return createTableASOperation.toSinkModifyOperation(this.catalogManager);
        }
        SupportsStaging supportsStaging = (DynamicTableSink) supportsStagingDynamicTableSink.get();
        list.add(new StagingSinkJobStatusHook(supportsStaging.applyStaging(new SinkStagingContext(createTableOperation.isIgnoreIfExists() ? SupportsStaging.StagingPurpose.CREATE_TABLE_AS_IF_NOT_EXISTS : SupportsStaging.StagingPurpose.CREATE_TABLE_AS))));
        return createTableASOperation.toStagedSinkModifyOperation(tableIdentifier, resolveCatalogTable, catalogOrThrowException, supportsStaging);
    }

    private Optional<DynamicTableSink> getSupportsStagingDynamicTableSink(CreateTableOperation createTableOperation, Catalog catalog, ResolvedCatalogTable resolvedCatalogTable) {
        if (((Boolean) this.tableConfig.get(TableConfigOptions.TABLE_RTAS_CTAS_ATOMICITY_ENABLED)).booleanValue() && !TableFactoryUtil.isLegacyConnectorOptions(catalog, this.tableConfig, this.isStreamingMode, createTableOperation.getTableIdentifier(), resolvedCatalogTable, createTableOperation.isTemporary())) {
            try {
                DynamicTableSink createDynamicTableSink = ExecutableOperationUtils.createDynamicTableSink(catalog, () -> {
                    return this.moduleManager.getFactory((v0) -> {
                        return v0.getTableSinkFactory();
                    });
                }, createTableOperation.getTableIdentifier(), resolvedCatalogTable, Collections.emptyMap(), this.tableConfig, this.resourceManager.getUserClassLoader(), createTableOperation.isTemporary());
                if (createDynamicTableSink instanceof SupportsStaging) {
                    return Optional.of(createDynamicTableSink);
                }
            } catch (Exception e) {
                throw new TableException(String.format("Fail to create DynamicTableSink for the table %s, maybe the table does not support atomicity of CTAS/RTAS, please set %s to false and try again.", createTableOperation.getTableIdentifier(), TableConfigOptions.TABLE_RTAS_CTAS_ATOMICITY_ENABLED.key()), e);
            }
        }
        return Optional.empty();
    }

    private TableResultInternal executeInternal(DeleteFromFilterOperation deleteFromFilterOperation) {
        Optional executeDeletion = deleteFromFilterOperation.getSupportsDeletePushDownSink().executeDeletion();
        return executeDeletion.isPresent() ? TableResultImpl.builder().resultKind(ResultKind.SUCCESS_WITH_CONTENT).schema(ResolvedSchema.of(new Column[]{Column.physical("rows affected", DataTypes.BIGINT())})).data(Collections.singletonList(Row.of(new Object[]{executeDeletion.get()}))).build() : TableResultImpl.TABLE_RESULT_OK;
    }

    private TableResultInternal executeInternal(List<Transformation<?>> list, List<String> list2) {
        return executeInternal(list, list2, Collections.emptyList());
    }

    private TableResultInternal executeInternal(List<Transformation<?>> list, List<String> list2, List<JobStatusHook> list3) {
        String str = "insert-into_" + String.join(",", list2);
        this.resourceManager.addJarConfiguration(this.tableConfig);
        try {
            JobClient executeAsync = this.execEnv.executeAsync(this.execEnv.createPipeline(list, this.tableConfig.getConfiguration(), str, list3));
            ArrayList arrayList = new ArrayList();
            Long[] lArr = new Long[list.size()];
            for (int i = 0; i < list.size(); i += IS_STREAM_TABLE) {
                arrayList.add(Column.physical(list2.get(i), DataTypes.BIGINT()));
                lArr[i] = -1L;
            }
            TableResultInternal build = TableResultImpl.builder().jobClient(executeAsync).resultKind(ResultKind.SUCCESS_WITH_CONTENT).schema(ResolvedSchema.of(arrayList)).resultProvider(new InsertResultProvider(lArr).setJobClient(executeAsync)).build();
            if (((Boolean) this.tableConfig.get(TableConfigOptions.TABLE_DML_SYNC)).booleanValue()) {
                try {
                    build.await();
                } catch (InterruptedException | ExecutionException e) {
                    build.getJobClient().ifPresent((v0) -> {
                        v0.cancel();
                    });
                    throw new TableException("Fail to wait execution finish.", e);
                }
            }
            return build;
        } catch (Exception e2) {
            throw new TableException("Failed to execute sql", e2);
        }
    }

    private TableResultInternal executeQueryOperation(QueryOperation queryOperation, CollectModifyOperation collectModifyOperation, List<Transformation<?>> list) {
        this.resourceManager.addJarConfiguration(this.tableConfig);
        try {
            JobClient executeAsync = this.execEnv.executeAsync(this.execEnv.createPipeline(list, this.tableConfig.getConfiguration(), "collect"));
            ResultProvider selectResultProvider = collectModifyOperation.getSelectResultProvider();
            selectResultProvider.reset();
            selectResultProvider.setJobClient(executeAsync);
            return TableResultImpl.builder().jobClient(executeAsync).resultKind(ResultKind.SUCCESS_WITH_CONTENT).schema(queryOperation.getResolvedSchema()).resultProvider(selectResultProvider).setPrintStyle(PrintStyle.tableauWithTypeInferredColumnWidths(DataTypeUtils.expandCompositeTypeToSchema(collectModifyOperation.getConsumedDataType()), selectResultProvider.getRowDataStringConverter(), ((Integer) getConfig().get(TableConfigOptions.DISPLAY_MAX_COLUMN_WIDTH)).intValue(), false, this.isStreamingMode)).setCachedPlan(new DQLCachedPlan(queryOperation, collectModifyOperation, list)).build();
        } catch (Exception e) {
            throw new TableException("Failed to execute sql", e);
        }
    }

    @Override // org.apache.flink.table.api.internal.TableEnvironmentInternal
    public TableResultInternal executeInternal(Operation operation) {
        if (operation instanceof ExecutableOperation) {
            return ((ExecutableOperation) operation).execute(this.operationCtx);
        }
        if (operation instanceof ModifyOperation) {
            return executeInternal(Collections.singletonList((ModifyOperation) operation));
        }
        if (operation instanceof StatementSetOperation) {
            return executeInternal(((StatementSetOperation) operation).getOperations());
        }
        if (operation instanceof ExplainOperation) {
            ExplainDetail[] explainDetailArr = (ExplainDetail[]) ((ExplainOperation) operation).getExplainDetails().stream().map(ExplainDetail::valueOf).toArray(i -> {
                return new ExplainDetail[i];
            });
            Operation child = ((ExplainOperation) operation).getChild();
            return TableResultImpl.builder().resultKind(ResultKind.SUCCESS_WITH_CONTENT).schema(ResolvedSchema.of(new Column[]{Column.physical("result", DataTypes.STRING())})).data(Collections.singletonList(Row.of(new Object[]{explainInternal(child instanceof StatementSetOperation ? new ArrayList(((StatementSetOperation) child).getOperations()) : Collections.singletonList(child), explainDetailArr)}))).build();
        }
        if (operation instanceof QueryOperation) {
            QueryOperation queryOperation = (QueryOperation) operation;
            CollectModifyOperation collectModifyOperation = new CollectModifyOperation(queryOperation);
            return executeQueryOperation(queryOperation, collectModifyOperation, translate(Collections.singletonList(collectModifyOperation)));
        }
        if (operation instanceof ExecutePlanOperation) {
            try {
                return (TableResultInternal) executePlan(PlanReference.fromFile(this.resourceManager.registerFileResource(new ResourceUri(ResourceType.FILE, ((ExecutePlanOperation) operation).getFilePath()))));
            } catch (IOException e) {
                throw new TableException(String.format("Failed to execute %s statement.", operation.asSummaryString()), e);
            }
        }
        if (operation instanceof CompilePlanOperation) {
            CompilePlanOperation compilePlanOperation = (CompilePlanOperation) operation;
            compilePlanAndWrite(compilePlanOperation.getFilePath(), compilePlanOperation.isIfNotExists(), compilePlanOperation.getOperation());
            return TableResultImpl.TABLE_RESULT_OK;
        }
        if (operation instanceof CompileAndExecutePlanOperation) {
            CompileAndExecutePlanOperation compileAndExecutePlanOperation = (CompileAndExecutePlanOperation) operation;
            return (TableResultInternal) compilePlanAndWrite(compileAndExecutePlanOperation.getFilePath(), true, compileAndExecutePlanOperation.getOperation()).execute();
        }
        if (!(operation instanceof AnalyzeTableOperation)) {
            if (operation instanceof NopOperation) {
                return TableResultImpl.TABLE_RESULT_OK;
            }
            throw new TableException(UNSUPPORTED_QUERY_IN_EXECUTE_SQL_MSG);
        }
        if (this.isStreamingMode) {
            throw new TableException("ANALYZE TABLE is not supported for streaming mode now");
        }
        try {
            return AnalyzeTableUtil.analyzeTable(this, (AnalyzeTableOperation) operation);
        } catch (Exception e2) {
            throw new TableException("Failed to execute ANALYZE TABLE command", e2);
        }
    }

    private List<String> extractSinkIdentifierNames(List<ModifyOperation> list) {
        ArrayList arrayList = new ArrayList(list.size());
        for (ModifyOperation modifyOperation : list) {
            if (!(modifyOperation instanceof SinkModifyOperation)) {
                throw new UnsupportedOperationException("Unsupported operation: " + modifyOperation);
            }
            arrayList.add(((SinkModifyOperation) modifyOperation).getContextResolvedTable().getIdentifier().asSummaryString());
        }
        return deduplicateSinkIdentifierNames(arrayList);
    }

    private List<String> deduplicateSinkIdentifierNames(List<String> list) {
        HashMap hashMap = new HashMap();
        for (String str : list) {
            hashMap.put(str, Integer.valueOf(((Integer) hashMap.getOrDefault(str, 0)).intValue() + IS_STREAM_TABLE));
        }
        HashMap hashMap2 = new HashMap();
        return (List) list.stream().map(str2 -> {
            if (((Integer) hashMap.get(str2)).intValue() == IS_STREAM_TABLE) {
                return str2;
            }
            Integer valueOf = Integer.valueOf(((Integer) hashMap2.getOrDefault(str2, 0)).intValue() + IS_STREAM_TABLE);
            hashMap2.put(str2, valueOf);
            return str2 + "_" + valueOf;
        }).collect(Collectors.toList());
    }

    @Override // org.apache.flink.table.api.TableEnvironment
    public String getCurrentCatalog() {
        return this.catalogManager.getCurrentCatalog();
    }

    @Override // org.apache.flink.table.api.TableEnvironment
    public void useCatalog(String str) {
        this.catalogManager.setCurrentCatalog(str);
    }

    @Override // org.apache.flink.table.api.TableEnvironment
    public String getCurrentDatabase() {
        return this.catalogManager.getCurrentDatabase();
    }

    @Override // org.apache.flink.table.api.TableEnvironment
    public void useDatabase(String str) {
        this.catalogManager.setCurrentDatabase(str);
    }

    @Override // org.apache.flink.table.api.TableEnvironment
    public TableConfig getConfig() {
        return this.tableConfig;
    }

    @Override // org.apache.flink.table.api.internal.TableEnvironmentInternal
    public Parser getParser() {
        return getPlanner().getParser();
    }

    @Override // org.apache.flink.table.api.internal.TableEnvironmentInternal
    public CatalogManager getCatalogManager() {
        return this.catalogManager;
    }

    @Override // org.apache.flink.table.api.internal.TableEnvironmentInternal
    public OperationTreeBuilder getOperationTreeBuilder() {
        return this.operationTreeBuilder;
    }

    protected QueryOperation qualifyQueryOperation(ObjectIdentifier objectIdentifier, QueryOperation queryOperation) {
        return queryOperation;
    }

    protected void validateTableSource(TableSource<?> tableSource) {
        TableSourceValidation.validateTableSource(tableSource, tableSource.getTableSchema());
    }

    protected List<Transformation<?>> translate(List<ModifyOperation> list) {
        return this.planner.translate(list);
    }

    @Override // org.apache.flink.table.api.internal.TableEnvironmentInternal
    public void registerTableSourceInternal(String str, TableSource<?> tableSource) {
        validateTableSource(tableSource);
        ObjectIdentifier qualifyIdentifier = this.catalogManager.qualifyIdentifier(UnresolvedIdentifier.of(new String[]{str}));
        Optional<CatalogBaseTable> temporaryTable = getTemporaryTable(qualifyIdentifier);
        if (!temporaryTable.isPresent()) {
            this.catalogManager.createTemporaryTable(ConnectorCatalogTable.source(tableSource, false), qualifyIdentifier, false);
            return;
        }
        if (!(temporaryTable.get() instanceof ConnectorCatalogTable)) {
            throw new ValidationException(String.format("Table '%s' already exists. Please choose a different name.", str));
        }
        ConnectorCatalogTable connectorCatalogTable = temporaryTable.get();
        if (connectorCatalogTable.getTableSource().isPresent()) {
            throw new ValidationException(String.format("Table '%s' already exists. Please choose a different name.", str));
        }
        CatalogBaseTable sourceAndSink = ConnectorCatalogTable.sourceAndSink(tableSource, (TableSink) connectorCatalogTable.getTableSink().get(), false);
        this.catalogManager.dropTemporaryTable(qualifyIdentifier, false);
        this.catalogManager.createTemporaryTable(sourceAndSink, qualifyIdentifier, false);
    }

    @Override // org.apache.flink.table.api.internal.TableEnvironmentInternal
    public void registerTableSinkInternal(String str, TableSink<?> tableSink) {
        ObjectIdentifier qualifyIdentifier = this.catalogManager.qualifyIdentifier(UnresolvedIdentifier.of(new String[]{str}));
        Optional<CatalogBaseTable> temporaryTable = getTemporaryTable(qualifyIdentifier);
        if (!temporaryTable.isPresent()) {
            this.catalogManager.createTemporaryTable(ConnectorCatalogTable.sink(tableSink, false), qualifyIdentifier, false);
            return;
        }
        if (!(temporaryTable.get() instanceof ConnectorCatalogTable)) {
            throw new ValidationException(String.format("Table '%s' already exists. Please choose a different name.", str));
        }
        ConnectorCatalogTable connectorCatalogTable = temporaryTable.get();
        if (connectorCatalogTable.getTableSink().isPresent()) {
            throw new ValidationException(String.format("Table '%s' already exists. Please choose a different name.", str));
        }
        CatalogBaseTable sourceAndSink = ConnectorCatalogTable.sourceAndSink((TableSource) connectorCatalogTable.getTableSource().get(), tableSink, false);
        this.catalogManager.dropTemporaryTable(qualifyIdentifier, false);
        this.catalogManager.createTemporaryTable(sourceAndSink, qualifyIdentifier, false);
    }

    private Optional<CatalogBaseTable> getTemporaryTable(ObjectIdentifier objectIdentifier) {
        return this.catalogManager.getTable(objectIdentifier).filter((v0) -> {
            return v0.isTemporary();
        }).map((v0) -> {
            return v0.getResolvedTable();
        });
    }

    @VisibleForTesting
    public TableImpl createTable(QueryOperation queryOperation) {
        OperationTreeBuilder operationTreeBuilder = this.operationTreeBuilder;
        FunctionCatalog functionCatalog = this.functionCatalog;
        Parser parser = getParser();
        Objects.requireNonNull(parser);
        return TableImpl.createTable(this, queryOperation, operationTreeBuilder, functionCatalog.asLookup(parser::parseIdentifier));
    }

    @Override // org.apache.flink.table.api.internal.TableEnvironmentInternal
    public String explainPlan(InternalPlan internalPlan, ExplainDetail... explainDetailArr) {
        return this.planner.explainPlan(internalPlan, explainDetailArr);
    }

    private boolean isRowLevelModification(Operation operation) {
        if (!(operation instanceof SinkModifyOperation)) {
            return false;
        }
        SinkModifyOperation sinkModifyOperation = (SinkModifyOperation) operation;
        return sinkModifyOperation.isDelete() || sinkModifyOperation.isUpdate();
    }
}
