package org.apache.flink.table.gateway.service.materializedtable;

import java.io.IOException;
import java.net.URLClassLoader;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TimeZone;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
import org.apache.flink.client.deployment.application.ApplicationConfiguration;
import org.apache.flink.client.deployment.application.cli.ApplicationClusterDeployer;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.configuration.PipelineOptionsInternal;
import org.apache.flink.configuration.StateRecoveryOptions;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.config.TableConfigOptions;
import org.apache.flink.table.api.internal.TableEnvironmentInternal;
import org.apache.flink.table.api.internal.TableResultInternal;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogMaterializedTable;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.IntervalFreshness;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.TableChange;
import org.apache.flink.table.data.GenericMapData;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.factories.WorkflowSchedulerFactoryUtil;
import org.apache.flink.table.gateway.api.endpoint.SqlGatewayEndpointFactoryUtils;
import org.apache.flink.table.gateway.api.operation.OperationHandle;
import org.apache.flink.table.gateway.api.results.ResultSet;
import org.apache.flink.table.gateway.api.utils.SqlGatewayException;
import org.apache.flink.table.gateway.rest.SqlGatewayRestEndpointFactory;
import org.apache.flink.table.gateway.rest.util.SqlGatewayRestOptions;
import org.apache.flink.table.gateway.service.operation.OperationExecutor;
import org.apache.flink.table.gateway.service.result.ResultFetcher;
import org.apache.flink.table.gateway.service.utils.Constants;
import org.apache.flink.table.gateway.service.utils.SqlExecutionException;
import org.apache.flink.table.gateway.workflow.scheduler.QuartzSchedulerUtils;
import org.apache.flink.table.operations.command.DescribeJobOperation;
import org.apache.flink.table.operations.command.StopJobOperation;
import org.apache.flink.table.operations.materializedtable.AlterMaterializedTableAsQueryOperation;
import org.apache.flink.table.operations.materializedtable.AlterMaterializedTableChangeOperation;
import org.apache.flink.table.operations.materializedtable.AlterMaterializedTableRefreshOperation;
import org.apache.flink.table.operations.materializedtable.AlterMaterializedTableResumeOperation;
import org.apache.flink.table.operations.materializedtable.AlterMaterializedTableSuspendOperation;
import org.apache.flink.table.operations.materializedtable.CreateMaterializedTableOperation;
import org.apache.flink.table.operations.materializedtable.DropMaterializedTableOperation;
import org.apache.flink.table.operations.materializedtable.MaterializedTableOperation;
import org.apache.flink.table.refresh.ContinuousRefreshHandler;
import org.apache.flink.table.refresh.ContinuousRefreshHandlerSerializer;
import org.apache.flink.table.refresh.RefreshHandler;
import org.apache.flink.table.runtime.application.SqlDriver;
import org.apache.flink.table.types.logical.LogicalTypeFamily;
import org.apache.flink.table.utils.DateTimeUtils;
import org.apache.flink.table.utils.IntervalFreshnessUtils;
import org.apache.flink.table.workflow.CreatePeriodicRefreshWorkflow;
import org.apache.flink.table.workflow.DeleteRefreshWorkflow;
import org.apache.flink.table.workflow.ResumeRefreshWorkflow;
import org.apache.flink.table.workflow.SuspendRefreshWorkflow;
import org.apache.flink.table.workflow.WorkflowScheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.class */
public class MaterializedTableManager {
    private static final Logger LOG = LoggerFactory.getLogger(MaterializedTableManager.class);
    private final URLClassLoader userCodeClassLoader;

    @Nullable
    private final WorkflowScheduler<? extends RefreshHandler> workflowScheduler;
    private final String restEndpointUrl;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager$JobExecutionResult.class */
    public static class JobExecutionResult {
        private final String executionTarget;
        private final String clusterId;
        private final String jobId;

        public JobExecutionResult(String str, String str2, String str3) {
            this.executionTarget = str;
            this.clusterId = str2;
            this.jobId = str3;
        }
    }

    public MaterializedTableManager(Configuration configuration, URLClassLoader uRLClassLoader) {
        this.userCodeClassLoader = uRLClassLoader;
        this.restEndpointUrl = buildRestEndpointUrl(configuration);
        this.workflowScheduler = buildWorkflowScheduler(configuration, uRLClassLoader);
    }

    private String buildRestEndpointUrl(Configuration configuration) {
        Configuration fromMap = Configuration.fromMap(SqlGatewayEndpointFactoryUtils.getEndpointConfig(configuration, SqlGatewayRestEndpointFactory.IDENTIFIER));
        return String.format("http://%s:%s", (String) fromMap.get(SqlGatewayRestOptions.ADDRESS), Integer.valueOf(((Integer) fromMap.get(SqlGatewayRestOptions.PORT)).intValue()));
    }

    private WorkflowScheduler<? extends RefreshHandler> buildWorkflowScheduler(Configuration configuration, URLClassLoader uRLClassLoader) {
        return WorkflowSchedulerFactoryUtil.createWorkflowScheduler(configuration, uRLClassLoader);
    }

    public void open() throws Exception {
        if (this.workflowScheduler != null) {
            this.workflowScheduler.open();
        }
    }

    public void close() throws Exception {
        if (this.workflowScheduler != null) {
            this.workflowScheduler.close();
        }
    }

    public ResultFetcher callMaterializedTableOperation(OperationExecutor operationExecutor, OperationHandle operationHandle, MaterializedTableOperation materializedTableOperation, String str) {
        if (materializedTableOperation instanceof CreateMaterializedTableOperation) {
            return callCreateMaterializedTableOperation(operationExecutor, operationHandle, (CreateMaterializedTableOperation) materializedTableOperation);
        }
        if (materializedTableOperation instanceof AlterMaterializedTableRefreshOperation) {
            return callAlterMaterializedTableRefreshOperation(operationExecutor, operationHandle, (AlterMaterializedTableRefreshOperation) materializedTableOperation);
        }
        if (materializedTableOperation instanceof AlterMaterializedTableSuspendOperation) {
            return callAlterMaterializedTableSuspend(operationExecutor, operationHandle, (AlterMaterializedTableSuspendOperation) materializedTableOperation);
        }
        if (materializedTableOperation instanceof AlterMaterializedTableResumeOperation) {
            return callAlterMaterializedTableResume(operationExecutor, operationHandle, (AlterMaterializedTableResumeOperation) materializedTableOperation);
        }
        if (materializedTableOperation instanceof DropMaterializedTableOperation) {
            return callDropMaterializedTableOperation(operationExecutor, operationHandle, (DropMaterializedTableOperation) materializedTableOperation);
        }
        if (materializedTableOperation instanceof AlterMaterializedTableAsQueryOperation) {
            return callAlterMaterializedTableAsQueryOperation(operationExecutor, operationHandle, (AlterMaterializedTableAsQueryOperation) materializedTableOperation);
        }
        throw new SqlExecutionException(String.format("Unsupported Operation %s for materialized table.", materializedTableOperation.asSummaryString()));
    }

    private ResultFetcher callCreateMaterializedTableOperation(OperationExecutor operationExecutor, OperationHandle operationHandle, CreateMaterializedTableOperation createMaterializedTableOperation) {
        if (CatalogMaterializedTable.RefreshMode.CONTINUOUS == createMaterializedTableOperation.getCatalogMaterializedTable().getRefreshMode()) {
            createMaterializedTableInContinuousMode(operationExecutor, operationHandle, createMaterializedTableOperation);
        } else {
            createMaterializedTableInFullMode(operationExecutor, operationHandle, createMaterializedTableOperation);
        }
        return ResultFetcher.fromTableResult(operationHandle, TableResultInternal.TABLE_RESULT_OK, false);
    }

    private void createMaterializedTableInContinuousMode(OperationExecutor operationExecutor, OperationHandle operationHandle, CreateMaterializedTableOperation createMaterializedTableOperation) {
        operationExecutor.callExecutableOperation(operationHandle, createMaterializedTableOperation);
        ObjectIdentifier tableIdentifier = createMaterializedTableOperation.getTableIdentifier();
        try {
            executeContinuousRefreshJob(operationExecutor, operationHandle, createMaterializedTableOperation.getCatalogMaterializedTable(), tableIdentifier, Collections.emptyMap(), Optional.empty());
        } catch (Exception e) {
            operationExecutor.callExecutableOperation(operationHandle, new DropMaterializedTableOperation(tableIdentifier, true));
            throw new SqlExecutionException(String.format("Submit continuous refresh job for materialized table %s occur exception.", tableIdentifier), e);
        }
    }

    private void createMaterializedTableInFullMode(OperationExecutor operationExecutor, OperationHandle operationHandle, CreateMaterializedTableOperation createMaterializedTableOperation) {
        if (this.workflowScheduler == null) {
            throw new SqlExecutionException("The workflow scheduler must be configured when creating materialized table in full refresh mode.");
        }
        operationExecutor.callExecutableOperation(operationHandle, createMaterializedTableOperation);
        ObjectIdentifier tableIdentifier = createMaterializedTableOperation.getTableIdentifier();
        CatalogMaterializedTable catalogMaterializedTable = createMaterializedTableOperation.getCatalogMaterializedTable();
        try {
            RefreshHandler createRefreshWorkflow = this.workflowScheduler.createRefreshWorkflow(new CreatePeriodicRefreshWorkflow(tableIdentifier, catalogMaterializedTable.getDefinitionQuery(), IntervalFreshnessUtils.convertFreshnessToCron(catalogMaterializedTable.getDefinitionFreshness()), getSessionInitializationConf(operationExecutor), Collections.emptyMap(), this.restEndpointUrl));
            updateRefreshHandler(operationExecutor, operationHandle, tableIdentifier, catalogMaterializedTable, CatalogMaterializedTable.RefreshStatus.ACTIVATED, createRefreshWorkflow.asSummaryString(), this.workflowScheduler.getRefreshHandlerSerializer().serialize(createRefreshWorkflow));
        } catch (Exception e) {
            operationExecutor.callExecutableOperation(operationHandle, new DropMaterializedTableOperation(tableIdentifier, true));
            throw new SqlExecutionException(String.format("Failed to create refresh workflow for materialized table %s.", tableIdentifier), e);
        }
    }

    private ResultFetcher callAlterMaterializedTableSuspend(OperationExecutor operationExecutor, OperationHandle operationHandle, AlterMaterializedTableSuspendOperation alterMaterializedTableSuspendOperation) {
        ObjectIdentifier tableIdentifier = alterMaterializedTableSuspendOperation.getTableIdentifier();
        ResolvedCatalogMaterializedTable catalogMaterializedTable = getCatalogMaterializedTable(operationExecutor, tableIdentifier);
        if (CatalogMaterializedTable.RefreshStatus.INITIALIZING == catalogMaterializedTable.getRefreshStatus()) {
            throw new SqlExecutionException(String.format("Materialized table %s is being initialized and does not support suspend operation.", tableIdentifier));
        }
        if (CatalogMaterializedTable.RefreshMode.CONTINUOUS == catalogMaterializedTable.getRefreshMode()) {
            suspendContinuousRefreshJob(operationExecutor, operationHandle, tableIdentifier, catalogMaterializedTable);
        } else {
            suspendRefreshWorkflow(operationExecutor, operationHandle, tableIdentifier, catalogMaterializedTable);
        }
        return ResultFetcher.fromTableResult(operationHandle, TableResultInternal.TABLE_RESULT_OK, false);
    }

    private CatalogMaterializedTable suspendContinuousRefreshJob(OperationExecutor operationExecutor, OperationHandle operationHandle, ObjectIdentifier objectIdentifier, CatalogMaterializedTable catalogMaterializedTable) {
        try {
            ContinuousRefreshHandler deserializeContinuousHandler = deserializeContinuousHandler(catalogMaterializedTable.getSerializedRefreshHandler());
            if (CatalogMaterializedTable.RefreshStatus.SUSPENDED == catalogMaterializedTable.getRefreshStatus()) {
                throw new SqlExecutionException(String.format("Materialized table %s continuous refresh job has been suspended, jobId is %s.", objectIdentifier, deserializeContinuousHandler.getJobId()));
            }
            ContinuousRefreshHandler continuousRefreshHandler = new ContinuousRefreshHandler(deserializeContinuousHandler.getExecutionTarget(), deserializeContinuousHandler.getClusterId(), deserializeContinuousHandler.getJobId(), stopJobWithSavepoint(operationExecutor, operationHandle, deserializeContinuousHandler));
            return updateRefreshHandler(operationExecutor, operationHandle, objectIdentifier, catalogMaterializedTable, CatalogMaterializedTable.RefreshStatus.SUSPENDED, continuousRefreshHandler.asSummaryString(), serializeContinuousHandler(continuousRefreshHandler));
        } catch (Exception e) {
            throw new SqlExecutionException(String.format("Failed to suspend the continuous refresh job for materialized table %s.", objectIdentifier), e);
        }
    }

    private void suspendRefreshWorkflow(OperationExecutor operationExecutor, OperationHandle operationHandle, ObjectIdentifier objectIdentifier, CatalogMaterializedTable catalogMaterializedTable) {
        if (CatalogMaterializedTable.RefreshStatus.SUSPENDED == catalogMaterializedTable.getRefreshStatus()) {
            throw new SqlExecutionException(String.format("Materialized table %s refresh workflow has been suspended.", objectIdentifier));
        }
        if (this.workflowScheduler == null) {
            throw new SqlExecutionException("The workflow scheduler must be configured when suspending materialized table in full refresh mode.");
        }
        try {
            RefreshHandler deserialize = this.workflowScheduler.getRefreshHandlerSerializer().deserialize(catalogMaterializedTable.getSerializedRefreshHandler(), this.userCodeClassLoader);
            this.workflowScheduler.modifyRefreshWorkflow(new SuspendRefreshWorkflow(deserialize));
            updateRefreshHandler(operationExecutor, operationHandle, objectIdentifier, catalogMaterializedTable, CatalogMaterializedTable.RefreshStatus.SUSPENDED, deserialize.asSummaryString(), catalogMaterializedTable.getSerializedRefreshHandler());
        } catch (Exception e) {
            throw new SqlExecutionException(String.format("Failed to suspend the refresh workflow for materialized table %s.", objectIdentifier), e);
        }
    }

    private ResultFetcher callAlterMaterializedTableResume(OperationExecutor operationExecutor, OperationHandle operationHandle, AlterMaterializedTableResumeOperation alterMaterializedTableResumeOperation) {
        ObjectIdentifier tableIdentifier = alterMaterializedTableResumeOperation.getTableIdentifier();
        ResolvedCatalogMaterializedTable catalogMaterializedTable = getCatalogMaterializedTable(operationExecutor, tableIdentifier);
        if (CatalogMaterializedTable.RefreshStatus.INITIALIZING == catalogMaterializedTable.getRefreshStatus()) {
            throw new SqlExecutionException(String.format("Materialized table %s is being initialized and does not support resume operation.", tableIdentifier));
        }
        if (CatalogMaterializedTable.RefreshMode.CONTINUOUS == catalogMaterializedTable.getRefreshMode()) {
            resumeContinuousRefreshJob(operationExecutor, operationHandle, tableIdentifier, catalogMaterializedTable, alterMaterializedTableResumeOperation.getDynamicOptions());
        } else {
            resumeRefreshWorkflow(operationExecutor, operationHandle, tableIdentifier, catalogMaterializedTable, alterMaterializedTableResumeOperation.getDynamicOptions());
        }
        return ResultFetcher.fromTableResult(operationHandle, TableResultInternal.TABLE_RESULT_OK, false);
    }

    private void resumeContinuousRefreshJob(OperationExecutor operationExecutor, OperationHandle operationHandle, ObjectIdentifier objectIdentifier, CatalogMaterializedTable catalogMaterializedTable, Map<String, String> map) {
        ContinuousRefreshHandler deserializeContinuousHandler = deserializeContinuousHandler(catalogMaterializedTable.getSerializedRefreshHandler());
        if (CatalogMaterializedTable.RefreshStatus.ACTIVATED == catalogMaterializedTable.getRefreshStatus() && !getJobStatus(operationExecutor, operationHandle, deserializeContinuousHandler).isGloballyTerminalState()) {
            throw new SqlExecutionException(String.format("Materialized table %s continuous refresh job has been resumed, jobId is %s.", objectIdentifier, deserializeContinuousHandler.getJobId()));
        }
        try {
            executeContinuousRefreshJob(operationExecutor, operationHandle, catalogMaterializedTable, objectIdentifier, map, deserializeContinuousHandler.getRestorePath());
        } catch (Exception e) {
            throw new SqlExecutionException(String.format("Failed to resume the continuous refresh job for materialized table %s.", objectIdentifier), e);
        }
    }

    private void resumeRefreshWorkflow(OperationExecutor operationExecutor, OperationHandle operationHandle, ObjectIdentifier objectIdentifier, CatalogMaterializedTable catalogMaterializedTable, Map<String, String> map) {
        if (CatalogMaterializedTable.RefreshStatus.ACTIVATED == catalogMaterializedTable.getRefreshStatus()) {
            throw new SqlExecutionException(String.format("Materialized table %s refresh workflow has been resumed.", objectIdentifier));
        }
        if (this.workflowScheduler == null) {
            throw new SqlExecutionException("The workflow scheduler must be configured when resuming materialized table in full refresh mode.");
        }
        try {
            RefreshHandler deserialize = this.workflowScheduler.getRefreshHandlerSerializer().deserialize(catalogMaterializedTable.getSerializedRefreshHandler(), this.userCodeClassLoader);
            this.workflowScheduler.modifyRefreshWorkflow(new ResumeRefreshWorkflow(deserialize, map));
            updateRefreshHandler(operationExecutor, operationHandle, objectIdentifier, catalogMaterializedTable, CatalogMaterializedTable.RefreshStatus.ACTIVATED, deserialize.asSummaryString(), catalogMaterializedTable.getSerializedRefreshHandler());
        } catch (Exception e) {
            throw new SqlExecutionException(String.format("Failed to resume the refresh workflow for materialized table %s.", objectIdentifier), e);
        }
    }

    private void executeContinuousRefreshJob(OperationExecutor operationExecutor, OperationHandle operationHandle, CatalogMaterializedTable catalogMaterializedTable, ObjectIdentifier objectIdentifier, Map<String, String> map, Optional<String> optional) {
        Configuration configuration = new Configuration();
        configuration.set(PipelineOptions.NAME, String.format("Materialized_table_%s_continuous_refresh_job", objectIdentifier.asSerializableString()));
        configuration.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.STREAMING);
        optional.ifPresent(str -> {
            configuration.set(StateRecoveryOptions.SAVEPOINT_PATH, str);
        });
        if (!operationExecutor.getSessionContext().getSessionConf().contains(CheckpointingOptions.CHECKPOINTING_INTERVAL)) {
            configuration.set(CheckpointingOptions.CHECKPOINTING_INTERVAL, catalogMaterializedTable.getFreshness());
        }
        JobExecutionResult executeRefreshJob = executeRefreshJob(getInsertStatement(objectIdentifier, catalogMaterializedTable.getDefinitionQuery(), map), configuration, operationExecutor, operationHandle);
        ContinuousRefreshHandler continuousRefreshHandler = new ContinuousRefreshHandler(executeRefreshJob.executionTarget, executeRefreshJob.clusterId, executeRefreshJob.jobId);
        updateRefreshHandler(operationExecutor, operationHandle, objectIdentifier, catalogMaterializedTable, CatalogMaterializedTable.RefreshStatus.ACTIVATED, continuousRefreshHandler.asSummaryString(), serializeContinuousHandler(continuousRefreshHandler));
    }

    private ResultFetcher callAlterMaterializedTableRefreshOperation(OperationExecutor operationExecutor, OperationHandle operationHandle, AlterMaterializedTableRefreshOperation alterMaterializedTableRefreshOperation) {
        return refreshMaterializedTable(operationExecutor, operationHandle, alterMaterializedTableRefreshOperation.getTableIdentifier(), alterMaterializedTableRefreshOperation.getPartitionSpec(), Collections.emptyMap(), false, null);
    }

    public ResultFetcher refreshMaterializedTable(OperationExecutor operationExecutor, OperationHandle operationHandle, ObjectIdentifier objectIdentifier, Map<String, String> map, Map<String, String> map2, boolean z, @Nullable String str) {
        ResolvedCatalogMaterializedTable catalogMaterializedTable = getCatalogMaterializedTable(operationExecutor, objectIdentifier);
        Map<String, String> periodRefreshPartition = z ? getPeriodRefreshPartition(str, catalogMaterializedTable.getDefinitionFreshness(), objectIdentifier, catalogMaterializedTable.getOptions(), operationExecutor.getTableEnvironment().getConfig().getLocalTimeZone()) : map;
        validatePartitionSpec(periodRefreshPartition, catalogMaterializedTable);
        Configuration configuration = new Configuration();
        configuration.set(PipelineOptions.NAME, z ? String.format("Materialized_table_%s_periodic_refresh_job", objectIdentifier.asSerializableString()) : String.format("Materialized_table_%s_one_time_refresh_job", objectIdentifier.asSerializableString()));
        configuration.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH);
        String refreshStatement = getRefreshStatement(objectIdentifier, catalogMaterializedTable.getDefinitionQuery(), periodRefreshPartition, map2);
        try {
            LOG.info("Begin to refreshing the materialized table {}, statement: {}", objectIdentifier, refreshStatement);
            JobExecutionResult executeRefreshJob = executeRefreshJob(refreshStatement, configuration, operationExecutor, operationHandle);
            HashMap hashMap = new HashMap();
            hashMap.put(StringData.fromString(DeploymentOptions.TARGET.key()), StringData.fromString(executeRefreshJob.executionTarget));
            getClusterIdKeyName(executeRefreshJob.executionTarget).ifPresent(str2 -> {
                hashMap.put(StringData.fromString(str2), StringData.fromString(executeRefreshJob.clusterId));
            });
            return ResultFetcher.fromResults(operationHandle, ResolvedSchema.of(new Column[]{Column.physical(Constants.JOB_ID, DataTypes.STRING()), Column.physical(Constants.CLUSTER_INFO, DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()))}), Collections.singletonList(GenericRowData.of(new Object[]{StringData.fromString(executeRefreshJob.jobId), new GenericMapData(hashMap)})));
        } catch (Exception e) {
            throw new SqlExecutionException(String.format("Refreshing the materialized table %s occur exception.", objectIdentifier), e);
        }
    }

    @VisibleForTesting
    static Map<String, String> getPeriodRefreshPartition(String str, IntervalFreshness intervalFreshness, ObjectIdentifier objectIdentifier, Map<String, String> map, ZoneId zoneId) {
        if (str == null) {
            throw new ValidationException(String.format("The scheduler time must not be null during the periodic refresh of the materialized table %s.", objectIdentifier));
        }
        Set<String> set = (Set) map.keySet().stream().filter(str2 -> {
            return str2.startsWith("partition.fields");
        }).collect(Collectors.toSet());
        HashMap hashMap = new HashMap();
        for (String str3 : set) {
            String substring = str3.substring("partition.fields".length() + 1, str3.length() - ("date-formatter".length() + 1));
            String formatTimestampStringWithOffset = DateTimeUtils.formatTimestampStringWithOffset(str, QuartzSchedulerUtils.SCHEDULE_TIME_FORMAT, map.get(str3), TimeZone.getTimeZone(zoneId), -IntervalFreshnessUtils.convertFreshnessToDuration(intervalFreshness).toMillis());
            if (formatTimestampStringWithOffset == null) {
                throw new SqlExecutionException(String.format("Failed to parse a valid partition value for the field '%s' in materialized table %s using the scheduler time '%s' based on the date format '%s'.", substring, objectIdentifier.asSerializableString(), str, QuartzSchedulerUtils.SCHEDULE_TIME_FORMAT));
            }
            hashMap.put(substring, formatTimestampStringWithOffset);
        }
        return hashMap;
    }

    private void validatePartitionSpec(Map<String, String> map, ResolvedCatalogMaterializedTable resolvedCatalogMaterializedTable) {
        ResolvedSchema resolvedSchema = resolvedCatalogMaterializedTable.getResolvedSchema();
        HashSet hashSet = new HashSet(resolvedCatalogMaterializedTable.getPartitionKeys());
        HashSet hashSet2 = new HashSet();
        HashSet hashSet3 = new HashSet();
        for (String str : map.keySet()) {
            if (!resolvedSchema.getColumn(str).isPresent()) {
                hashSet2.add(str);
            } else if (!((Column) resolvedSchema.getColumn(str).get()).getDataType().getLogicalType().getTypeRoot().getFamilies().contains(LogicalTypeFamily.CHARACTER_STRING)) {
                hashSet3.add(str);
            }
        }
        if (!hashSet2.isEmpty()) {
            throw new ValidationException(String.format("The partition spec contains unknown partition keys:\n\n%s\n\nAll known partition keys are:\n\n%s", String.join("\n", hashSet2), String.join("\n", hashSet)));
        }
        if (!hashSet3.isEmpty()) {
            throw new ValidationException(String.format("Currently, refreshing materialized table only supports referring to char, varchar and string type partition keys. All specified partition keys in partition specs with unsupported types are:\n\n%s", String.join("\n", hashSet3)));
        }
    }

    @VisibleForTesting
    protected static String getRefreshStatement(ObjectIdentifier objectIdentifier, String str, Map<String, String> map, Map<String, String> map2) {
        StringBuilder sb = new StringBuilder(String.format("INSERT OVERWRITE %s\n  SELECT * FROM (%s)", generateTableWithDynamicOptions(objectIdentifier, map2), str));
        if (!map.isEmpty()) {
            sb.append("\n  WHERE ");
            sb.append((String) map.entrySet().stream().map(entry -> {
                return String.format("%s = '%s'", entry.getKey(), entry.getValue());
            }).reduce((str2, str3) -> {
                return str2 + " AND " + str3;
            }).get());
        }
        return sb.toString();
    }

    private ResultFetcher callAlterMaterializedTableAsQueryOperation(OperationExecutor operationExecutor, OperationHandle operationHandle, AlterMaterializedTableAsQueryOperation alterMaterializedTableAsQueryOperation) {
        ObjectIdentifier tableIdentifier = alterMaterializedTableAsQueryOperation.getTableIdentifier();
        ResolvedCatalogMaterializedTable catalogMaterializedTable = getCatalogMaterializedTable(operationExecutor, tableIdentifier);
        if (CatalogMaterializedTable.RefreshMode.FULL == catalogMaterializedTable.getRefreshMode()) {
            return operationExecutor.callExecutableOperation(operationHandle, new AlterMaterializedTableChangeOperation(tableIdentifier, alterMaterializedTableAsQueryOperation.getTableChanges(), alterMaterializedTableAsQueryOperation.getNewMaterializedTable()));
        }
        if (CatalogMaterializedTable.RefreshStatus.ACTIVATED == catalogMaterializedTable.getRefreshStatus()) {
            CatalogMaterializedTable suspendContinuousRefreshJob = suspendContinuousRefreshJob(operationExecutor, operationHandle, tableIdentifier, catalogMaterializedTable);
            CatalogMaterializedTable copy = alterMaterializedTableAsQueryOperation.getNewMaterializedTable().copy(suspendContinuousRefreshJob.getRefreshStatus(), (String) suspendContinuousRefreshJob.getRefreshHandlerDescription().orElse(null), suspendContinuousRefreshJob.getSerializedRefreshHandler());
            AlterMaterializedTableChangeOperation alterMaterializedTableChangeOperation = new AlterMaterializedTableChangeOperation(tableIdentifier, alterMaterializedTableAsQueryOperation.getTableChanges(), copy);
            operationExecutor.callExecutableOperation(operationHandle, alterMaterializedTableChangeOperation);
            try {
                executeContinuousRefreshJob(operationExecutor, operationHandle, copy, tableIdentifier, Collections.emptyMap(), Optional.empty());
            } catch (Exception e) {
                LOG.warn("Failed to start the continuous refresh job for materialized table {} using new query {}, rollback to origin query {}.", new Object[]{tableIdentifier, alterMaterializedTableAsQueryOperation.getNewMaterializedTable().getDefinitionQuery(), suspendContinuousRefreshJob.getDefinitionQuery(), e});
                operationExecutor.callExecutableOperation(operationHandle, generateRollbackAlterMaterializedTableOperation(suspendContinuousRefreshJob, alterMaterializedTableChangeOperation));
                executeContinuousRefreshJob(operationExecutor, operationHandle, suspendContinuousRefreshJob, tableIdentifier, Collections.emptyMap(), deserializeContinuousHandler(suspendContinuousRefreshJob.getSerializedRefreshHandler()).getRestorePath());
                throw new SqlExecutionException(String.format("Failed to start the continuous refresh job using new query %s when altering materialized table %s select query.", alterMaterializedTableAsQueryOperation.getNewMaterializedTable().getDefinitionQuery(), tableIdentifier), e);
            }
        } else {
            if (CatalogMaterializedTable.RefreshStatus.SUSPENDED != catalogMaterializedTable.getRefreshStatus()) {
                throw new SqlExecutionException(String.format("Materialized table %s is being initialized and does not support alter operation.", tableIdentifier));
            }
            ArrayList arrayList = new ArrayList(alterMaterializedTableAsQueryOperation.getTableChanges());
            TableChange.ModifyRefreshHandler generateResetSavepointTableChange = generateResetSavepointTableChange(catalogMaterializedTable.getSerializedRefreshHandler());
            arrayList.add(generateResetSavepointTableChange);
            operationExecutor.callExecutableOperation(operationHandle, new AlterMaterializedTableChangeOperation(tableIdentifier, arrayList, alterMaterializedTableAsQueryOperation.getNewMaterializedTable().copy(catalogMaterializedTable.getRefreshStatus(), generateResetSavepointTableChange.getRefreshHandlerDesc(), generateResetSavepointTableChange.getRefreshHandlerBytes())));
        }
        return ResultFetcher.fromTableResult(operationHandle, TableResultInternal.TABLE_RESULT_OK, false);
    }

    private AlterMaterializedTableChangeOperation generateRollbackAlterMaterializedTableOperation(CatalogMaterializedTable catalogMaterializedTable, AlterMaterializedTableChangeOperation alterMaterializedTableChangeOperation) {
        List<TableChange.AddColumn> tableChanges = alterMaterializedTableChangeOperation.getTableChanges();
        ArrayList arrayList = new ArrayList();
        for (TableChange.AddColumn addColumn : tableChanges) {
            if (addColumn instanceof TableChange.AddColumn) {
                arrayList.add(TableChange.dropColumn(addColumn.getColumn().getName()));
            } else if (addColumn instanceof TableChange.ModifyRefreshHandler) {
                arrayList.add(TableChange.modifyRefreshHandler((String) catalogMaterializedTable.getRefreshHandlerDescription().orElse(null), catalogMaterializedTable.getSerializedRefreshHandler()));
            } else {
                if (!(addColumn instanceof TableChange.ModifyDefinitionQuery)) {
                    throw new ValidationException(String.format("Failed to generate rollback changes for materialized table '%s'. Unsupported table change detected: %s. ", alterMaterializedTableChangeOperation.getTableIdentifier(), addColumn));
                }
                arrayList.add(TableChange.modifyDefinitionQuery(catalogMaterializedTable.getDefinitionQuery()));
            }
        }
        return new AlterMaterializedTableChangeOperation(alterMaterializedTableChangeOperation.getTableIdentifier(), arrayList, catalogMaterializedTable);
    }

    private TableChange.ModifyRefreshHandler generateResetSavepointTableChange(byte[] bArr) {
        ContinuousRefreshHandler deserializeContinuousHandler = deserializeContinuousHandler(bArr);
        ContinuousRefreshHandler continuousRefreshHandler = new ContinuousRefreshHandler(deserializeContinuousHandler.getExecutionTarget(), deserializeContinuousHandler.getClusterId(), deserializeContinuousHandler.getJobId());
        return TableChange.modifyRefreshHandler(continuousRefreshHandler.asSummaryString(), serializeContinuousHandler(continuousRefreshHandler));
    }

    private ResultFetcher callDropMaterializedTableOperation(OperationExecutor operationExecutor, OperationHandle operationHandle, DropMaterializedTableOperation dropMaterializedTableOperation) {
        ObjectIdentifier tableIdentifier = dropMaterializedTableOperation.getTableIdentifier();
        if (!operationExecutor.tableExists(tableIdentifier)) {
            if (!dropMaterializedTableOperation.isIfExists()) {
                throw new ValidationException(String.format("Materialized table with identifier %s does not exist.", tableIdentifier));
            }
            LOG.info("Materialized table {} does not exists, skip the drop operation.", tableIdentifier);
            return ResultFetcher.fromTableResult(operationHandle, TableResultInternal.TABLE_RESULT_OK, false);
        }
        ResolvedCatalogMaterializedTable catalogMaterializedTable = getCatalogMaterializedTable(operationExecutor, tableIdentifier);
        CatalogMaterializedTable.RefreshMode refreshMode = catalogMaterializedTable.getRefreshMode();
        CatalogMaterializedTable.RefreshStatus refreshStatus = catalogMaterializedTable.getRefreshStatus();
        if (CatalogMaterializedTable.RefreshStatus.ACTIVATED == refreshStatus || CatalogMaterializedTable.RefreshStatus.SUSPENDED == refreshStatus) {
            if (CatalogMaterializedTable.RefreshMode.FULL == refreshMode) {
                deleteRefreshWorkflow(tableIdentifier, catalogMaterializedTable);
            } else if (CatalogMaterializedTable.RefreshMode.CONTINUOUS == refreshMode && CatalogMaterializedTable.RefreshStatus.ACTIVATED == refreshStatus) {
                cancelContinuousRefreshJob(operationExecutor, operationHandle, tableIdentifier, catalogMaterializedTable);
            }
        } else if (CatalogMaterializedTable.RefreshStatus.INITIALIZING == catalogMaterializedTable.getRefreshStatus()) {
            throw new ValidationException(String.format("Current refresh status of materialized table %s is initializing, skip the drop operation.", tableIdentifier.asSerializableString()));
        }
        operationExecutor.callExecutableOperation(operationHandle, dropMaterializedTableOperation);
        return ResultFetcher.fromTableResult(operationHandle, TableResultInternal.TABLE_RESULT_OK, false);
    }

    private void cancelContinuousRefreshJob(OperationExecutor operationExecutor, OperationHandle operationHandle, ObjectIdentifier objectIdentifier, CatalogMaterializedTable catalogMaterializedTable) {
        ContinuousRefreshHandler deserializeContinuousHandler = deserializeContinuousHandler(catalogMaterializedTable.getSerializedRefreshHandler());
        if (getJobStatus(operationExecutor, operationHandle, deserializeContinuousHandler).isTerminalState()) {
            LOG.info("No need to cancel the continuous refresh job {} for materialized table {} as it is not currently running.", deserializeContinuousHandler.getJobId(), objectIdentifier);
            return;
        }
        try {
            cancelJob(operationExecutor, operationHandle, deserializeContinuousHandler);
        } catch (Exception e) {
            JobStatus jobStatus = getJobStatus(operationExecutor, operationHandle, deserializeContinuousHandler);
            if (!jobStatus.isTerminalState()) {
                throw new SqlExecutionException(String.format("Failed to drop the materialized table %s because the continuous refresh job %s could not be canceled. The current status of the continuous refresh job is %s.", objectIdentifier, deserializeContinuousHandler.getJobId(), jobStatus), e);
            }
            LOG.warn("An exception occurred while canceling the continuous refresh job {} for materialized table {}, but since the job is in a terminal state, skip the cancel operation.", deserializeContinuousHandler.getJobId(), objectIdentifier);
        }
    }

    private void deleteRefreshWorkflow(ObjectIdentifier objectIdentifier, CatalogMaterializedTable catalogMaterializedTable) {
        if (this.workflowScheduler == null) {
            throw new SqlExecutionException("The workflow scheduler must be configured when dropping materialized table in full refresh mode.");
        }
        try {
            this.workflowScheduler.deleteRefreshWorkflow(new DeleteRefreshWorkflow(this.workflowScheduler.getRefreshHandlerSerializer().deserialize(catalogMaterializedTable.getSerializedRefreshHandler(), this.userCodeClassLoader)));
        } catch (Exception e) {
            throw new SqlExecutionException(String.format("Failed to delete the refresh workflow for materialized table %s.", objectIdentifier), e);
        }
    }

    private Map<String, String> getSessionInitializationConf(OperationExecutor operationExecutor) {
        Map<String, String> map = operationExecutor.getSessionContext().getSessionConf().toMap();
        Map map2 = operationExecutor.getSessionContext().getDefaultContext().getFlinkConfig().toMap();
        map.entrySet().removeIf(entry -> {
            String str = (String) entry.getKey();
            return map2.containsKey(str) && ((String) map2.get(str)).equals((String) entry.getValue());
        });
        map.remove(TableConfigOptions.RESOURCES_DOWNLOAD_DIR.key());
        map.keySet().removeIf(str -> {
            return str.startsWith("workflow-scheduler");
        });
        return map;
    }

    private static JobStatus getJobStatus(OperationExecutor operationExecutor, OperationHandle operationHandle, ContinuousRefreshHandler continuousRefreshHandler) {
        return JobStatus.valueOf(fetchAllResults(operationExecutor.callDescribeJobOperation(getTableEnvironment(operationExecutor, continuousRefreshHandler), operationHandle, new DescribeJobOperation(continuousRefreshHandler.getJobId()))).get(0).getString(2).toString());
    }

    private static void cancelJob(OperationExecutor operationExecutor, OperationHandle operationHandle, ContinuousRefreshHandler continuousRefreshHandler) {
        operationExecutor.callStopJobOperation(getTableEnvironment(operationExecutor, continuousRefreshHandler), operationHandle, new StopJobOperation(continuousRefreshHandler.getJobId(), false, false));
    }

    private static String stopJobWithSavepoint(OperationExecutor operationExecutor, OperationHandle operationHandle, ContinuousRefreshHandler continuousRefreshHandler) {
        if (operationExecutor.getSessionContext().getSessionConf().getOptional(CheckpointingOptions.SAVEPOINT_DIRECTORY).isEmpty()) {
            throw new ValidationException("Savepoint directory is not configured, can't stop job with savepoint.");
        }
        return fetchAllResults(operationExecutor.callStopJobOperation(getTableEnvironment(operationExecutor, continuousRefreshHandler), operationHandle, new StopJobOperation(continuousRefreshHandler.getJobId(), true, false))).get(0).getString(0).toString();
    }

    private static TableEnvironmentInternal getTableEnvironment(OperationExecutor operationExecutor, ContinuousRefreshHandler continuousRefreshHandler) {
        String executionTarget = continuousRefreshHandler.getExecutionTarget();
        Configuration configuration = new Configuration();
        configuration.set(DeploymentOptions.TARGET, executionTarget);
        getClusterIdKeyName(executionTarget).ifPresent(str -> {
            configuration.setString(str, continuousRefreshHandler.getClusterId());
        });
        return operationExecutor.getTableEnvironment(operationExecutor.getSessionContext().getSessionState().resourceManager, configuration);
    }

    private ContinuousRefreshHandler deserializeContinuousHandler(byte[] bArr) {
        try {
            return ContinuousRefreshHandlerSerializer.INSTANCE.deserialize(bArr, this.userCodeClassLoader);
        } catch (IOException | ClassNotFoundException e) {
            throw new SqlExecutionException("Deserialize ContinuousRefreshHandler occur exception.", e);
        }
    }

    private byte[] serializeContinuousHandler(ContinuousRefreshHandler continuousRefreshHandler) {
        try {
            return ContinuousRefreshHandlerSerializer.INSTANCE.serialize(continuousRefreshHandler);
        } catch (IOException e) {
            throw new SqlExecutionException("Serialize ContinuousRefreshHandler occur exception.", e);
        }
    }

    private ResolvedCatalogMaterializedTable getCatalogMaterializedTable(OperationExecutor operationExecutor, ObjectIdentifier objectIdentifier) {
        ResolvedCatalogMaterializedTable table = operationExecutor.getTable(objectIdentifier);
        if (CatalogBaseTable.TableKind.MATERIALIZED_TABLE != table.getTableKind()) {
            throw new ValidationException(String.format("Table %s is not a materialized table, does not support materialized table related operation.", objectIdentifier));
        }
        return table;
    }

    private CatalogMaterializedTable updateRefreshHandler(OperationExecutor operationExecutor, OperationHandle operationHandle, ObjectIdentifier objectIdentifier, CatalogMaterializedTable catalogMaterializedTable, CatalogMaterializedTable.RefreshStatus refreshStatus, String str, byte[] bArr) {
        CatalogMaterializedTable copy = catalogMaterializedTable.copy(refreshStatus, str, bArr);
        ArrayList arrayList = new ArrayList();
        arrayList.add(TableChange.modifyRefreshStatus(refreshStatus));
        arrayList.add(TableChange.modifyRefreshHandler(str, bArr));
        operationExecutor.callExecutableOperation(operationHandle, new AlterMaterializedTableChangeOperation(objectIdentifier, arrayList, copy));
        return copy;
    }

    @VisibleForTesting
    protected static String getInsertStatement(ObjectIdentifier objectIdentifier, String str, Map<String, String> map) {
        return String.format("INSERT INTO %s\n%s", generateTableWithDynamicOptions(objectIdentifier, map), str);
    }

    private static String generateTableWithDynamicOptions(ObjectIdentifier objectIdentifier, Map<String, String> map) {
        StringBuilder sb = new StringBuilder(objectIdentifier.asSerializableString());
        if (!map.isEmpty()) {
            sb.append(String.format(" /*+ OPTIONS(%s) */", (String) map.entrySet().stream().map(entry -> {
                return String.format("'%s'='%s'", entry.getKey(), entry.getValue());
            }).collect(Collectors.joining(", "))));
        }
        return sb.toString();
    }

    private static List<RowData> fetchAllResults(ResultFetcher resultFetcher) {
        Long l = 0L;
        ArrayList arrayList = new ArrayList();
        while (l != null) {
            ResultSet fetchResults = resultFetcher.fetchResults(l.longValue(), Integer.MAX_VALUE);
            arrayList.addAll(fetchResults.getData());
            l = fetchResults.getNextToken();
        }
        return arrayList;
    }

    private static JobExecutionResult executeRefreshJob(String str, Configuration configuration, OperationExecutor operationExecutor, OperationHandle operationHandle) {
        String str2 = (String) operationExecutor.getSessionContext().getSessionConf().get(DeploymentOptions.TARGET);
        if (str2 != null && !str2.isEmpty() && !"local".equals(str2)) {
            return str2.endsWith("application") ? executeApplicationJob(str, configuration, operationExecutor) : executeNonApplicationJob(str, configuration, operationExecutor, operationHandle);
        }
        String format = String.format("Unsupported execution target detected: %s.Currently, only the following execution targets are supported: 'remote', 'yarn-session', 'yarn-application', 'kubernetes-session', 'kubernetes-application'. ", str2);
        LOG.error(format);
        throw new ValidationException(format);
    }

    private static JobExecutionResult executeNonApplicationJob(String str, Configuration configuration, OperationExecutor operationExecutor, OperationHandle operationHandle) {
        String str2 = (String) operationExecutor.getSessionContext().getSessionConf().get(DeploymentOptions.TARGET);
        return new JobExecutionResult(str2, operationExecutor.getSessionClusterId().orElseThrow(() -> {
            String format = String.format("No cluster ID found when executing materialized table refresh job. Execution target is : %s", str2);
            LOG.error(format);
            return new ValidationException(format);
        }), fetchAllResults(operationExecutor.executeStatement(operationHandle, configuration, str)).get(0).getString(0).toString());
    }

    private static JobExecutionResult executeApplicationJob(String str, Configuration configuration, OperationExecutor operationExecutor) {
        ArrayList arrayList = new ArrayList();
        arrayList.add("--" + SqlDriver.OPTION_SQL_SCRIPT.getLongOpt());
        arrayList.add(str);
        Configuration configuration2 = new Configuration(operationExecutor.getSessionContext().getSessionConf());
        configuration2.addAll(configuration);
        JobID jobID = new JobID();
        configuration2.set(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, jobID.toString());
        try {
            return new JobExecutionResult((String) configuration2.get(DeploymentOptions.TARGET), new ApplicationClusterDeployer(new DefaultClusterClientServiceLoader()).run(configuration2, new ApplicationConfiguration((String[]) arrayList.toArray(new String[0]), SqlDriver.class.getName())).toString(), jobID.toString());
        } catch (Throwable th) {
            LOG.error("Failed to deploy script {} to application cluster.", str, th);
            throw new SqlGatewayException("Failed to deploy script to cluster.", th);
        }
    }

    private static Optional<String> getClusterIdKeyName(String str) {
        return str.startsWith("yarn") ? Optional.of("yarn.application.id") : str.startsWith("kubernetes") ? Optional.of("kubernetes.cluster-id") : Optional.empty();
    }
}
