package io.confluent.flink.plugin.internal;

import io.confluent.flink.plugin.internal.ConfluentOperations;
import io.confluent.flink.plugin.internal.PluginContext;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.dag.Pipeline;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.JobStatusHook;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.delegation.Executor;
import org.apache.flink.table.operations.CollectModifyOperation;
import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.QueryOperation;
import org.apache.flink.table.operations.SinkModifyOperation;

/* loaded from: input_file:io/confluent/flink/plugin/internal/ConfluentExecutor.class */
class ConfluentExecutor implements Executor {

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/confluent/flink/plugin/internal/ConfluentExecutor$ConfluentPipeline.class */
    public static class ConfluentPipeline implements Pipeline {
        final InternalEnvironment internalEnv;
        final List<ModifyOperation> operations;

        ConfluentPipeline(InternalEnvironment internalEnvironment, List<ModifyOperation> list) {
            this.internalEnv = internalEnvironment;
            this.operations = list;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/confluent/flink/plugin/internal/ConfluentExecutor$ConfluentTransformation.class */
    public static class ConfluentTransformation extends Transformation<Void> {
        private final InternalEnvironment internalEnv;
        private final List<ModifyOperation> operations;

        /* JADX INFO: Access modifiers changed from: package-private */
        public ConfluentTransformation(InternalEnvironment internalEnvironment, List<ModifyOperation> list) {
            super("Confluent Transformation", Types.VOID, -1);
            this.internalEnv = internalEnvironment;
            this.operations = list;
        }

        public List<Transformation<?>> getTransitivePredecessorsInternal() {
            return List.of();
        }

        public List<Transformation<?>> getInputs() {
            return List.of();
        }

        ConfluentPipeline toPipeline() {
            return new ConfluentPipeline(this.internalEnv, this.operations);
        }
    }

    public ReadableConfig getConfiguration() {
        return new Configuration();
    }

    public Pipeline createPipeline(List<Transformation<?>> list, ReadableConfig readableConfig, @Nullable String str) {
        return createPipeline(list, readableConfig, str, List.of());
    }

    public Pipeline createPipeline(List<Transformation<?>> list, ReadableConfig readableConfig, @Nullable String str, List<JobStatusHook> list2) {
        if (list.size() == 1 || (list.get(0) instanceof ConfluentTransformation)) {
            return ((ConfluentTransformation) list.get(0)).toPipeline();
        }
        throw Utils.illegalState("Confluent-specific transformation expected.");
    }

    public JobExecutionResult execute(Pipeline pipeline) {
        throw Utils.unsupportedMethod(getClass(), "execute");
    }

    public JobClient executeAsync(Pipeline pipeline) {
        if (!(pipeline instanceof ConfluentPipeline)) {
            throw Utils.illegalState("Confluent-specific pipeline expected.");
        }
        ConfluentPipeline confluentPipeline = (ConfluentPipeline) pipeline;
        Configuration configuration = confluentPipeline.internalEnv.tableConfig;
        List<ModifyOperation> list = confluentPipeline.operations;
        if (list.size() > 1) {
            return executeModify(String.format("EXECUTE %s", Utils.toSql(list)), configuration, null);
        }
        SinkModifyOperation sinkModifyOperation = (ModifyOperation) list.get(0);
        if (sinkModifyOperation instanceof CollectModifyOperation) {
            return executeCollect((CollectModifyOperation) sinkModifyOperation, configuration);
        }
        if (!(sinkModifyOperation instanceof SinkModifyOperation)) {
            throw Utils.unsupportedFeature("operation", sinkModifyOperation.getClass());
        }
        SinkModifyOperation sinkModifyOperation2 = sinkModifyOperation;
        return executeModify(Utils.toSql(sinkModifyOperation2), configuration, toStatementName(sinkModifyOperation2));
    }

    public boolean isCheckpointingEnabled() {
        throw Utils.unsupportedMethod(getClass(), "isCheckpointingEnabled");
    }

    private JobClient executeCollect(CollectModifyOperation collectModifyOperation, Configuration configuration) {
        QueryOperation child = collectModifyOperation.getChild();
        String statementName = toStatementName(child);
        String asSerializableString = child.asSerializableString();
        PluginContext pluginContext = PluginContext.get(configuration);
        PluginContext.CollectResult queryCollect = pluginContext.queryCollect(configuration, asSerializableString, statementName);
        ResolvedSchema orElseThrow = queryCollect.getSchema().orElseThrow(() -> {
            return Utils.illegalState("Query operation should have a schema");
        });
        collectModifyOperation.setConsumedDataType(orElseThrow.toPhysicalRowDataType());
        collectModifyOperation.setSelectResultProvider(new CollectResultProvider(pluginContext, queryCollect.getStatementName(), orElseThrow, new RowConverter(configuration)));
        return new ConfluentJobClient(pluginContext, queryCollect.getStatementName());
    }

    private JobClient executeModify(String str, Configuration configuration, @Nullable String str2) {
        PluginContext pluginContext = PluginContext.get(configuration);
        return new ConfluentJobClient(pluginContext, pluginContext.querySuccess(configuration, str, str2).getStatementName());
    }

    @Nullable
    private static String toStatementName(Operation operation) {
        if (operation instanceof ConfluentOperations.ConfluentOperation) {
            return ((ConfluentOperations.ConfluentOperation) operation).getStatementName();
        }
        return null;
    }
}
