package org.apache.seatunnel.connectors.seatunnel.hudi.source;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Properties;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hudi.hadoop.HoodieParquetInputFormat;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.connectors.seatunnel.hudi.exception.HudiConnectorException;
import org.apache.seatunnel.connectors.seatunnel.hudi.util.HudiUtil;

/* loaded from: input_file:org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSourceReader.class */
public class HudiSourceReader implements SourceReader<SeaTunnelRow, HudiSourceSplit> {
    private static final long THREAD_WAIT_TIME = 500;
    private final String confPaths;
    private final Set<HudiSourceSplit> sourceSplits = new HashSet();
    private final SourceReader.Context context;
    private final SeaTunnelRowType seaTunnelRowType;

    public HudiSourceReader(String str, SourceReader.Context context, SeaTunnelRowType seaTunnelRowType) {
        this.confPaths = str;
        this.context = context;
        this.seaTunnelRowType = seaTunnelRowType;
    }

    public void open() {
    }

    public void close() {
    }

    public void pollNext(Collector<SeaTunnelRow> collector) throws Exception {
        if (this.sourceSplits.isEmpty()) {
            Thread.sleep(500L);
            return;
        }
        JobConf jobConf = HudiUtil.toJobConf(HudiUtil.getConfiguration(this.confPaths));
        this.sourceSplits.forEach(hudiSourceSplit -> {
            try {
                RecordReader<NullWritable, ArrayWritable> recordReader = new HoodieParquetInputFormat().getRecordReader(hudiSourceSplit.getInputSplit(), jobConf, Reporter.NULL);
                ParquetHiveSerDe parquetHiveSerDe = new ParquetHiveSerDe();
                Properties properties = new Properties();
                ArrayList arrayList = new ArrayList();
                for (SeaTunnelDataType seaTunnelDataType : this.seaTunnelRowType.getFieldTypes()) {
                    arrayList.add(seaTunnelDataType.getSqlType().name());
                }
                String join = StringUtils.join(this.seaTunnelRowType.getFieldNames(), ",");
                String lowerCase = StringUtils.join(arrayList, ",").toLowerCase(Locale.ROOT);
                properties.setProperty("columns", join);
                properties.setProperty("columns.types", lowerCase);
                parquetHiveSerDe.initialize(jobConf, properties);
                StructObjectInspector objectInspector = parquetHiveSerDe.getObjectInspector();
                List allStructFieldRefs = objectInspector.getAllStructFieldRefs();
                NullWritable createKey = recordReader.createKey();
                ArrayWritable createValue = recordReader.createValue();
                while (recordReader.next(createKey, createValue)) {
                    Object[] objArr = new Object[allStructFieldRefs.size()];
                    for (int i = 0; i < allStructFieldRefs.size(); i++) {
                        Object structFieldData = objectInspector.getStructFieldData(createValue, (StructField) allStructFieldRefs.get(i));
                        if (null != structFieldData) {
                            objArr[i] = String.valueOf(structFieldData);
                        } else {
                            objArr[i] = null;
                        }
                    }
                    collector.collect(new SeaTunnelRow(objArr));
                }
                recordReader.close();
            } catch (Exception e) {
                throw new HudiConnectorException((SeaTunnelErrorCode) CommonErrorCode.READER_OPERATION_FAILED, (Throwable) e);
            }
        });
        this.context.signalNoMoreElement();
    }

    public List<HudiSourceSplit> snapshotState(long j) {
        return new ArrayList(this.sourceSplits);
    }

    public void addSplits(List<HudiSourceSplit> list) {
        this.sourceSplits.addAll(list);
    }

    public void handleNoMoreSplits() {
    }

    public void notifyCheckpointComplete(long j) {
    }
}
