package org.apache.flink.table.planner.plan.nodes.exec.common;

import java.util.ArrayList;
import java.util.Collections;
import javax.annotation.Nullable;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.planner.codegen.CodeGenUtils;
import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
import org.apache.flink.table.planner.codegen.SinkCodeGenerator;
import org.apache.flink.table.planner.delegation.PlannerBase;
import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import org.apache.flink.table.planner.plan.nodes.exec.MultipleTransformationTranslator;
import org.apache.flink.table.planner.sinks.DataStreamTableSink;
import org.apache.flink.table.planner.sinks.TableSinkUtils;
import org.apache.flink.table.runtime.typeutils.TypeCheckUtils;
import org.apache.flink.table.sinks.AppendStreamTableSink;
import org.apache.flink.table.sinks.RetractStreamTableSink;
import org.apache.flink.table.sinks.StreamTableSink;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.sinks.UpsertStreamTableSink;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;

/* loaded from: input_file:org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLegacySink.class */
public abstract class CommonExecLegacySink<T> extends ExecNodeBase<Object> implements MultipleTransformationTranslator<Object> {
    protected final TableSink<T> tableSink;

    @Nullable
    protected final String[] upsertKeys;
    protected final boolean needRetraction;
    protected final boolean isStreaming;

    public CommonExecLegacySink(TableSink<T> tableSink, @Nullable String[] strArr, boolean z, boolean z2, InputProperty inputProperty, LogicalType logicalType, String str) {
        super(Collections.singletonList(inputProperty), logicalType, str);
        this.tableSink = tableSink;
        this.upsertKeys = strArr;
        this.needRetraction = z;
        this.isStreaming = z2;
    }

    @Override // org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase
    protected Transformation<Object> translateToPlanInternal(PlannerBase plannerBase) {
        Transformation<T> translateToTransformation;
        if (!(this.tableSink instanceof StreamTableSink)) {
            if (this.tableSink instanceof DataStreamTableSink) {
                return translateToTransformation(plannerBase, ((DataStreamTableSink) this.tableSink).withChangeFlag());
            }
            throw new TableException(String.format("Only Support StreamTableSink! However %s is not a StreamTableSink.", this.tableSink.getClass().getCanonicalName()));
        }
        if (this.tableSink instanceof RetractStreamTableSink) {
            translateToTransformation = translateToTransformation(plannerBase, true);
        } else if (this.tableSink instanceof UpsertStreamTableSink) {
            UpsertStreamTableSink upsertStreamTableSink = this.tableSink;
            boolean z = !this.needRetraction;
            upsertStreamTableSink.setIsAppendOnly(Boolean.valueOf(z));
            if (this.upsertKeys != null) {
                upsertStreamTableSink.setKeyFields(this.upsertKeys);
            } else {
                if (!z) {
                    throw new TableException("UpsertStreamTableSink requires that Table has a full primary keys if it is updated.");
                }
                upsertStreamTableSink.setKeyFields((String[]) null);
            }
            translateToTransformation = translateToTransformation(plannerBase, true);
        } else if (this.tableSink instanceof AppendStreamTableSink) {
            if (this.needRetraction) {
                throw new TableException("AppendStreamTableSink requires that Table has only insert changes.");
            }
            translateToTransformation = translateToTransformation(plannerBase, false);
        } else {
            if (this.isStreaming) {
                throw new TableException("Stream Tables can only be emitted by AppendStreamTableSink, RetractStreamTableSink, or UpsertStreamTableSink.");
            }
            translateToTransformation = translateToTransformation(plannerBase, false);
        }
        DataStreamSink consumeDataStream = this.tableSink.consumeDataStream(new DataStream(plannerBase.getExecEnv(), translateToTransformation));
        if (consumeDataStream == null) {
            throw new TableException(String.format("The StreamTableSink#consumeDataStream(DataStream) must be implemented and return the sink transformation DataStreamSink. However, %s doesn't implement this method.", this.tableSink.getClass().getCanonicalName()));
        }
        return consumeDataStream.getTransformation();
    }

    protected abstract RowType checkAndConvertInputTypeIfNeeded(RowType rowType);

    private Transformation<T> translateToTransformation(PlannerBase plannerBase, boolean z) {
        if (!z && this.needRetraction) {
            throw new TableException("Table is not an append-only table. Use the toRetractStream() in order to handle add and retract messages.");
        }
        ExecEdge execEdge = getInputEdges().get(0);
        Transformation<T> transformation = (Transformation<T>) execEdge.translateToPlan(plannerBase);
        RowType outputType = execEdge.getOutputType();
        RowType checkAndConvertInputTypeIfNeeded = checkAndConvertInputTypeIfNeeded(outputType);
        DataType consumedDataType = this.tableSink.getConsumedDataType();
        if (CodeGenUtils.isInternalClass(consumedDataType)) {
            return transformation;
        }
        int rowtimeIndex = getRowtimeIndex(outputType);
        DataType inferSinkPhysicalDataType = TableSinkUtils.inferSinkPhysicalDataType(consumedDataType, checkAndConvertInputTypeIfNeeded, z);
        return new OneInputTransformation(transformation, "SinkConversionTo" + consumedDataType.getConversionClass().getSimpleName(), SinkCodeGenerator.generateRowConverterOperator(new CodeGeneratorContext(plannerBase.getTableConfig()), checkAndConvertInputTypeIfNeeded, this.tableSink, inferSinkPhysicalDataType, z, "SinkConversion", rowtimeIndex), SinkCodeGenerator.deriveSinkOutputTypeInfo(this.tableSink, inferSinkPhysicalDataType, z), transformation.getParallelism());
    }

    private int getRowtimeIndex(RowType rowType) {
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < rowType.getFieldCount(); i++) {
            if (TypeCheckUtils.isRowTime(rowType.getTypeAt(i))) {
                arrayList.add(Integer.valueOf(i));
            }
        }
        return arrayList.size() == 1 ? ((Integer) arrayList.get(0)).intValue() : -1;
    }
}
