/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.file.source.reader;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.sql.Timestamp;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.type.ArrayType;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.DecimalType;
import org.apache.seatunnel.api.table.type.LocalTimeType;
import org.apache.seatunnel.api.table.type.MapType;
import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.api.table.type.SqlType;
import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
import org.apache.seatunnel.connectors.seatunnel.file.source.reader.AbstractReadStrategy;
import org.apache.seatunnel.shade.connector.file.org.apache.avro.Conversions;
import org.apache.seatunnel.shade.connector.file.org.apache.avro.data.TimeConversions;
import org.apache.seatunnel.shade.connector.file.org.apache.avro.generic.GenericData;
import org.apache.seatunnel.shade.connector.file.org.apache.avro.generic.GenericRecord;
import org.apache.seatunnel.shade.connector.file.org.apache.parquet.avro.AvroParquetReader;
import org.apache.seatunnel.shade.connector.file.org.apache.parquet.example.data.simple.NanoTime;
import org.apache.seatunnel.shade.connector.file.org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.seatunnel.shade.connector.file.org.apache.parquet.hadoop.ParquetReader;
import org.apache.seatunnel.shade.connector.file.org.apache.parquet.hadoop.metadata.FileMetaData;
import org.apache.seatunnel.shade.connector.file.org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.seatunnel.shade.connector.file.org.apache.parquet.hadoop.util.HadoopInputFile;
import org.apache.seatunnel.shade.connector.file.org.apache.parquet.io.api.Binary;
import org.apache.seatunnel.shade.connector.file.org.apache.parquet.schema.GroupType;
import org.apache.seatunnel.shade.connector.file.org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.seatunnel.shade.connector.file.org.apache.parquet.schema.MessageType;
import org.apache.seatunnel.shade.connector.file.org.apache.parquet.schema.OriginalType;
import org.apache.seatunnel.shade.connector.file.org.apache.parquet.schema.Type;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ParquetReadStrategy
extends AbstractReadStrategy {
    private static final Logger log = LoggerFactory.getLogger(ParquetReadStrategy.class);
    private static final byte[] PARQUET_MAGIC = new byte[]{80, 65, 82, 49};
    private static final long NANOS_PER_MILLISECOND = 1000000L;
    private static final long MILLIS_PER_DAY = TimeUnit.DAYS.toMillis(1L);
    private static final long JULIAN_DAY_NUMBER_FOR_UNIX_EPOCH = 2440588L;
    private int[] indexes;

    @Override
    public void read(String path, Collector<SeaTunnelRow> output) throws FileConnectorException, IOException {
        if (Boolean.FALSE.equals(this.checkFileType(path))) {
            String errorMsg = String.format("This file [%s] is not a parquet file, please check the format of this file", path);
            throw new FileConnectorException((SeaTunnelErrorCode)FileConnectorErrorCode.FILE_TYPE_INVALID, errorMsg);
        }
        Path filePath = new Path(path);
        Map<String, String> partitionsMap = this.parsePartitionsByPath(path);
        HadoopInputFile hadoopInputFile = HadoopInputFile.fromPath(filePath, this.getConfiguration());
        int fieldsCount = this.seaTunnelRowType.getTotalFields();
        GenericData dataModel = new GenericData();
        dataModel.addLogicalTypeConversion(new Conversions.DecimalConversion());
        dataModel.addLogicalTypeConversion(new TimeConversions.DateConversion());
        dataModel.addLogicalTypeConversion(new TimeConversions.LocalTimestampMillisConversion());
        try (ParquetReader reader = AvroParquetReader.builder(hadoopInputFile).withDataModel(dataModel).build();){
            GenericRecord record;
            while ((record = (GenericRecord)reader.read()) != null) {
                Object[] fields;
                if (this.isMergePartition) {
                    int index = fieldsCount;
                    fields = new Object[fieldsCount + partitionsMap.size()];
                    for (String value : partitionsMap.values()) {
                        fields[index++] = value;
                    }
                } else {
                    fields = new Object[fieldsCount];
                }
                for (int i = 0; i < fieldsCount; ++i) {
                    Object data = record.get(this.indexes[i]);
                    fields[i] = this.resolveObject(data, this.seaTunnelRowType.getFieldType(i));
                }
                SeaTunnelRow seaTunnelRow = new SeaTunnelRow(fields);
                output.collect((Object)seaTunnelRow);
            }
        }
    }

    private Object resolveObject(Object field, SeaTunnelDataType<?> fieldType) {
        if (field == null) {
            return null;
        }
        switch (fieldType.getSqlType()) {
            case ARRAY: {
                ArrayList origArray = new ArrayList();
                ((GenericData.Array)field).iterator().forEachRemaining(origArray::add);
                BasicType elementType = ((ArrayType)fieldType).getElementType();
                switch (elementType.getSqlType()) {
                    case STRING: {
                        return origArray.toArray(TYPE_ARRAY_STRING);
                    }
                    case BOOLEAN: {
                        return origArray.toArray(TYPE_ARRAY_BOOLEAN);
                    }
                    case TINYINT: {
                        return origArray.toArray(TYPE_ARRAY_BYTE);
                    }
                    case SMALLINT: {
                        return origArray.toArray(TYPE_ARRAY_SHORT);
                    }
                    case INT: {
                        return origArray.toArray(TYPE_ARRAY_INTEGER);
                    }
                    case BIGINT: {
                        return origArray.toArray(TYPE_ARRAY_LONG);
                    }
                    case FLOAT: {
                        return origArray.toArray(TYPE_ARRAY_FLOAT);
                    }
                    case DOUBLE: {
                        return origArray.toArray(TYPE_ARRAY_DOUBLE);
                    }
                }
                String errorMsg = String.format("SeaTunnel array type not support this type [%s] now", fieldType.getSqlType());
                throw new FileConnectorException((SeaTunnelErrorCode)CommonErrorCode.UNSUPPORTED_DATA_TYPE, errorMsg);
            }
            case MAP: {
                HashMap dataMap = new HashMap();
                SeaTunnelDataType keyType = ((MapType)fieldType).getKeyType();
                SeaTunnelDataType valueType = ((MapType)fieldType).getValueType();
                HashMap origDataMap = (HashMap)field;
                origDataMap.forEach((key, value) -> dataMap.put(this.resolveObject(key, keyType), this.resolveObject(value, valueType)));
                return dataMap;
            }
            case BOOLEAN: 
            case INT: 
            case BIGINT: 
            case FLOAT: 
            case DOUBLE: 
            case DECIMAL: 
            case DATE: {
                return field;
            }
            case STRING: {
                return field.toString();
            }
            case TINYINT: {
                return Byte.parseByte(field.toString());
            }
            case SMALLINT: {
                return Short.parseShort(field.toString());
            }
            case NULL: {
                return null;
            }
            case BYTES: {
                ByteBuffer buffer = (ByteBuffer)field;
                byte[] bytes = new byte[buffer.remaining()];
                buffer.get(bytes, 0, bytes.length);
                return bytes;
            }
            case TIMESTAMP: {
                if (field instanceof GenericData.Fixed) {
                    Binary binary = Binary.fromConstantByteArray(((GenericData.Fixed)field).bytes());
                    NanoTime nanoTime = NanoTime.fromBinary(binary);
                    int julianDay = nanoTime.getJulianDay();
                    long nanosOfDay = nanoTime.getTimeOfDayNanos();
                    long timestamp = ((long)julianDay - 2440588L) * MILLIS_PER_DAY + nanosOfDay / 1000000L;
                    return new Timestamp(timestamp).toLocalDateTime();
                }
                Instant instant = Instant.ofEpochMilli((Long)field);
                return LocalDateTime.ofInstant(instant, ZoneId.of("+8"));
            }
            case ROW: {
                SeaTunnelRowType rowType = (SeaTunnelRowType)fieldType;
                Object[] objects = new Object[rowType.getTotalFields()];
                for (int i = 0; i < rowType.getTotalFields(); ++i) {
                    SeaTunnelDataType dataType = rowType.getFieldType(i);
                    objects[i] = this.resolveObject(((GenericRecord)field).get(i), dataType);
                }
                return new SeaTunnelRow(objects);
            }
        }
        throw new FileConnectorException((SeaTunnelErrorCode)CommonErrorCode.UNSUPPORTED_DATA_TYPE, "SeaTunnel not support this data type now");
    }

    @Override
    public SeaTunnelRowType getSeaTunnelRowTypeInfo(HadoopConf hadoopConf, String path) throws FileConnectorException {
        ParquetMetadata metadata;
        Path filePath = new Path(path);
        try {
            HadoopInputFile hadoopInputFile = HadoopInputFile.fromPath(filePath, this.getConfiguration(hadoopConf));
            ParquetFileReader reader = ParquetFileReader.open(hadoopInputFile);
            metadata = reader.getFooter();
            reader.close();
        }
        catch (IOException e) {
            String errorMsg = String.format("Create parquet reader for this file [%s] failed", path);
            throw new FileConnectorException((SeaTunnelErrorCode)CommonErrorCode.READER_OPERATION_FAILED, errorMsg, e);
        }
        FileMetaData fileMetaData = metadata.getFileMetaData();
        MessageType originalSchema = fileMetaData.getSchema();
        if (this.readColumns.isEmpty()) {
            for (int i = 0; i < originalSchema.getFieldCount(); ++i) {
                this.readColumns.add(originalSchema.getFieldName(i));
            }
        }
        String[] fields = new String[this.readColumns.size()];
        SeaTunnelDataType[] types = new SeaTunnelDataType[this.readColumns.size()];
        this.indexes = new int[this.readColumns.size()];
        for (int i = 0; i < this.readColumns.size(); ++i) {
            int fieldIndex;
            fields[i] = (String)this.readColumns.get(i);
            Type type = originalSchema.getType(fields[i]);
            this.indexes[i] = fieldIndex = originalSchema.getFieldIndex(fields[i]);
            types[i] = this.parquetType2SeaTunnelType(type);
        }
        this.seaTunnelRowType = new SeaTunnelRowType(fields, types);
        this.seaTunnelRowTypeWithPartition = this.mergePartitionTypes(path, this.seaTunnelRowType);
        return this.getActualSeaTunnelRowTypeInfo();
    }

    private SeaTunnelDataType<?> parquetType2SeaTunnelType(Type type) {
        if (type.isPrimitive()) {
            switch (type.asPrimitiveType().getPrimitiveTypeName()) {
                case INT32: {
                    OriginalType originalType = type.asPrimitiveType().getOriginalType();
                    if (originalType == null) {
                        return BasicType.INT_TYPE;
                    }
                    switch (type.asPrimitiveType().getOriginalType()) {
                        case INT_8: {
                            return BasicType.BYTE_TYPE;
                        }
                        case INT_16: {
                            return BasicType.SHORT_TYPE;
                        }
                        case DATE: {
                            return LocalTimeType.LOCAL_DATE_TYPE;
                        }
                    }
                    String errorMsg = String.format("Not support this type [%s]", type);
                    throw new FileConnectorException((SeaTunnelErrorCode)CommonErrorCode.UNSUPPORTED_DATA_TYPE, errorMsg);
                }
                case INT64: {
                    if (type.asPrimitiveType().getOriginalType() == OriginalType.TIMESTAMP_MILLIS) {
                        return LocalTimeType.LOCAL_DATE_TIME_TYPE;
                    }
                    return BasicType.LONG_TYPE;
                }
                case INT96: {
                    return LocalTimeType.LOCAL_DATE_TIME_TYPE;
                }
                case BINARY: {
                    if (type.asPrimitiveType().getOriginalType() == null) {
                        return PrimitiveByteArrayType.INSTANCE;
                    }
                    return BasicType.STRING_TYPE;
                }
                case FLOAT: {
                    return BasicType.FLOAT_TYPE;
                }
                case DOUBLE: {
                    return BasicType.DOUBLE_TYPE;
                }
                case BOOLEAN: {
                    return BasicType.BOOLEAN_TYPE;
                }
                case FIXED_LEN_BYTE_ARRAY: {
                    if (type.getLogicalTypeAnnotation() == null) {
                        return LocalTimeType.LOCAL_DATE_TIME_TYPE;
                    }
                    String typeInfo = type.getLogicalTypeAnnotation().toString().replaceAll(SqlType.DECIMAL.toString(), "").replaceAll("\\(", "").replaceAll("\\)", "");
                    String[] splits = typeInfo.split(",");
                    int precision = Integer.parseInt(splits[0]);
                    int scale = Integer.parseInt(splits[1]);
                    return new DecimalType(precision, scale);
                }
            }
            String errorMsg = String.format("Not support this type [%s]", type);
            throw new FileConnectorException((SeaTunnelErrorCode)CommonErrorCode.UNSUPPORTED_DATA_TYPE, errorMsg);
        }
        LogicalTypeAnnotation logicalTypeAnnotation = type.asGroupType().getLogicalTypeAnnotation();
        if (logicalTypeAnnotation == null) {
            List<Type> fields = type.asGroupType().getFields();
            String[] fieldNames = new String[fields.size()];
            SeaTunnelDataType[] seaTunnelDataTypes = new SeaTunnelDataType[fields.size()];
            for (int i = 0; i < fields.size(); ++i) {
                Type fieldType = fields.get(i);
                SeaTunnelDataType<?> seaTunnelDataType = this.parquetType2SeaTunnelType(fields.get(i));
                fieldNames[i] = fieldType.getName();
                seaTunnelDataTypes[i] = seaTunnelDataType;
            }
            return new SeaTunnelRowType(fieldNames, seaTunnelDataTypes);
        }
        switch (logicalTypeAnnotation.toOriginalType()) {
            case MAP: {
                GroupType groupType = type.asGroupType().getType(0).asGroupType();
                SeaTunnelDataType<?> keyType = this.parquetType2SeaTunnelType(groupType.getType(0));
                SeaTunnelDataType<?> valueType = this.parquetType2SeaTunnelType(groupType.getType(1));
                return new MapType(keyType, valueType);
            }
            case LIST: {
                Type elementType;
                try {
                    elementType = type.asGroupType().getType(0).asGroupType().getType(0);
                }
                catch (Exception e) {
                    elementType = type.asGroupType().getType(0);
                }
                SeaTunnelDataType<?> fieldType = this.parquetType2SeaTunnelType(elementType);
                switch (fieldType.getSqlType()) {
                    case STRING: {
                        return ArrayType.STRING_ARRAY_TYPE;
                    }
                    case BOOLEAN: {
                        return ArrayType.BOOLEAN_ARRAY_TYPE;
                    }
                    case TINYINT: {
                        return ArrayType.BYTE_ARRAY_TYPE;
                    }
                    case SMALLINT: {
                        return ArrayType.SHORT_ARRAY_TYPE;
                    }
                    case INT: {
                        return ArrayType.INT_ARRAY_TYPE;
                    }
                    case BIGINT: {
                        return ArrayType.LONG_ARRAY_TYPE;
                    }
                    case FLOAT: {
                        return ArrayType.FLOAT_ARRAY_TYPE;
                    }
                    case DOUBLE: {
                        return ArrayType.DOUBLE_ARRAY_TYPE;
                    }
                }
                String errorMsg = String.format("SeaTunnel array type not supported this genericType [%s] yet", fieldType);
                throw new FileConnectorException((SeaTunnelErrorCode)CommonErrorCode.UNSUPPORTED_DATA_TYPE, errorMsg);
            }
        }
        throw new FileConnectorException((SeaTunnelErrorCode)CommonErrorCode.UNSUPPORTED_DATA_TYPE, "SeaTunnel file connector not support this nest type");
    }

    @Override
    boolean checkFileType(String path) {
        byte[] magic = new byte[PARQUET_MAGIC.length];
        try {
            Configuration configuration = this.getConfiguration();
            FileSystem fileSystem = FileSystem.get((Configuration)configuration);
            Path filePath = new Path(path);
            FSDataInputStream in = fileSystem.open(filePath);
            in.seek(0L);
            in.readFully(magic);
            boolean checkResult = Arrays.equals(magic, PARQUET_MAGIC);
            in.close();
            return checkResult;
        }
        catch (IOException e) {
            String errorMsg = String.format("Check parquet file [%s] failed", path);
            throw new FileConnectorException((SeaTunnelErrorCode)FileConnectorErrorCode.FILE_TYPE_INVALID, errorMsg);
        }
    }
}

