package org.apache.flink.table.runtime.operators.python.table;

import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.fnexecution.v1.FlinkFnApi;
import org.apache.flink.python.PythonOptions;
import org.apache.flink.python.util.ProtoUtils;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.utils.JoinedRowData;
import org.apache.flink.table.functions.python.PythonFunctionInfo;
import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
import org.apache.flink.table.runtime.operators.python.AbstractEmbeddedStatelessFunctionOperator;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;
import pemja.core.object.PyIterator;

@Internal
/* loaded from: input_file:org/apache/flink/table/runtime/operators/python/table/EmbeddedPythonTableFunctionOperator.class */
public class EmbeddedPythonTableFunctionOperator extends AbstractEmbeddedStatelessFunctionOperator {
    private static final long serialVersionUID = 1;
    private final PythonFunctionInfo tableFunction;
    private final FlinkJoinType joinType;
    private GenericRowData reuseNullResultRowData;
    private transient JoinedRowData reuseJoinedRow;

    public EmbeddedPythonTableFunctionOperator(Configuration configuration, PythonFunctionInfo pythonFunctionInfo, RowType rowType, RowType rowType2, RowType rowType3, FlinkJoinType flinkJoinType, int[] iArr) {
        super(configuration, rowType, rowType2, rowType3, iArr);
        this.tableFunction = (PythonFunctionInfo) Preconditions.checkNotNull(pythonFunctionInfo);
        Preconditions.checkArgument(flinkJoinType == FlinkJoinType.INNER || flinkJoinType == FlinkJoinType.LEFT, "The join type should be inner join or left join");
        this.joinType = flinkJoinType;
    }

    @Override // org.apache.flink.table.runtime.operators.python.AbstractEmbeddedStatelessFunctionOperator, org.apache.flink.streaming.api.operators.python.embedded.AbstractEmbeddedPythonFunctionOperator, org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator
    public void open() throws Exception {
        super.open();
        this.reuseJoinedRow = new JoinedRowData();
        this.reuseNullResultRowData = new GenericRowData(this.udfOutputType.getFieldCount());
        for (int i = 0; i < this.udfOutputType.getFieldCount(); i++) {
            this.reuseNullResultRowData.setField(i, (Object) null);
        }
    }

    @Override // org.apache.flink.streaming.api.operators.python.embedded.AbstractEmbeddedPythonFunctionOperator
    public void openPythonInterpreter() {
        this.interpreter.exec("from pyflink.fn_execution.embedded.operation_utils import create_table_operation_from_proto");
        this.interpreter.set("input_coder_proto", ProtoUtils.createFlattenRowTypeCoderInfoDescriptorProto(this.udfInputType, FlinkFnApi.CoderInfoDescriptor.Mode.MULTIPLE, false).toByteArray());
        this.interpreter.set("output_coder_proto", ProtoUtils.createFlattenRowTypeCoderInfoDescriptorProto(this.udfOutputType, FlinkFnApi.CoderInfoDescriptor.Mode.MULTIPLE, false).toByteArray());
        this.interpreter.set("proto", ProtoUtils.createUserDefinedFunctionsProto(getRuntimeContext(), new PythonFunctionInfo[]{this.tableFunction}, ((Boolean) this.config.get(PythonOptions.PYTHON_METRIC_ENABLED)).booleanValue(), ((Boolean) this.config.get(PythonOptions.PYTHON_PROFILE_ENABLED)).booleanValue()).toByteArray());
        this.interpreter.exec("table_operation = create_table_operation_from_proto(proto,input_coder_proto,output_coder_proto)");
        this.interpreter.invokeMethod("table_operation", "open", new Object[0]);
    }

    public void endInput() {
        if (this.interpreter != null) {
            this.interpreter.invokeMethod("table_operation", "close", new Object[0]);
        }
    }

    public void processElement(StreamRecord<RowData> streamRecord) throws Exception {
        RowData rowData = (RowData) streamRecord.getValue();
        for (int i = 0; i < this.userDefinedFunctionInputArgs.length; i++) {
            this.userDefinedFunctionInputArgs[i] = this.userDefinedFunctionInputConverters[i].toExternal(rowData, this.udfInputOffsets[i]);
        }
        PyIterator pyIterator = (PyIterator) this.interpreter.invokeMethod("table_operation", "process_element", this.userDefinedFunctionInputArgs);
        if (!pyIterator.hasNext()) {
            if (this.joinType == FlinkJoinType.LEFT) {
                this.rowDataWrapper.collect((RowData) this.reuseJoinedRow.replace(rowData, this.reuseNullResultRowData));
            }
            pyIterator.close();
        }
        do {
            Object[] objArr = (Object[]) pyIterator.next();
            for (int i2 = 0; i2 < objArr.length; i2++) {
                this.reuseResultRowData.setField(i2, this.userDefinedFunctionOutputConverters[i2].toInternal(objArr[i2]));
            }
            this.rowDataWrapper.collect((RowData) this.reuseJoinedRow.replace(rowData, this.reuseResultRowData));
        } while (pyIterator.hasNext());
        pyIterator.close();
    }
}
