package org.apache.paimon.flink.action.cdc.mysql;

import io.debezium.relational.history.TableChanges;
import java.lang.invoke.SerializedLambda;
import java.math.BigDecimal;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.action.cdc.TableNameConverter;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.sink.cdc.CdcRecord;
import org.apache.paimon.flink.sink.cdc.EventParser;
import org.apache.paimon.flink.sink.cdc.NewTableSchemaBuilder;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.type.TypeReference;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.utils.DateTimeUtils;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.class */
public class MySqlDebeziumJsonEventParser implements EventParser<String> {
    private static final Logger LOG = LoggerFactory.getLogger(MySqlDebeziumJsonEventParser.class);
    private final ObjectMapper objectMapper;
    private final ZoneId serverTimeZone;
    private final boolean caseSensitive;
    private final TableNameConverter tableNameConverter;
    private final List<ComputedColumn> computedColumns;
    private final NewTableSchemaBuilder<JsonNode> schemaBuilder;

    @Nullable
    private final Pattern includingPattern;

    @Nullable
    private final Pattern excludingPattern;
    private final Set<String> includedTables;
    private final Set<String> excludedTables;
    private final TypeMapping typeMapping;
    private JsonNode root;
    private JsonNode payload;
    private String currentTable;
    private boolean shouldSynchronizeCurrentTable;

    public MySqlDebeziumJsonEventParser(ZoneId zoneId, boolean z, List<ComputedColumn> list, TypeMapping typeMapping) {
        this(zoneId, z, list, new TableNameConverter(z), jsonNode -> {
            return Optional.empty();
        }, null, null, typeMapping);
    }

    public MySqlDebeziumJsonEventParser(ZoneId zoneId, boolean z, TableNameConverter tableNameConverter, NewTableSchemaBuilder<JsonNode> newTableSchemaBuilder, @Nullable Pattern pattern, @Nullable Pattern pattern2, TypeMapping typeMapping) {
        this(zoneId, z, Collections.emptyList(), tableNameConverter, newTableSchemaBuilder, pattern, pattern2, typeMapping);
    }

    public MySqlDebeziumJsonEventParser(ZoneId zoneId, boolean z, List<ComputedColumn> list, TableNameConverter tableNameConverter, NewTableSchemaBuilder<JsonNode> newTableSchemaBuilder, @Nullable Pattern pattern, @Nullable Pattern pattern2, TypeMapping typeMapping) {
        this.objectMapper = new ObjectMapper();
        this.includedTables = new HashSet();
        this.excludedTables = new HashSet();
        this.serverTimeZone = zoneId;
        this.caseSensitive = z;
        this.computedColumns = list;
        this.tableNameConverter = tableNameConverter;
        this.schemaBuilder = newTableSchemaBuilder;
        this.includingPattern = pattern;
        this.excludingPattern = pattern2;
        this.typeMapping = typeMapping;
    }

    @Override // org.apache.paimon.flink.sink.cdc.EventParser
    public void setRawEvent(String str) {
        try {
            this.root = (JsonNode) this.objectMapper.readValue(str, JsonNode.class);
            this.payload = this.root.get("payload");
            this.currentTable = this.payload.get("source").get("table").asText();
            this.shouldSynchronizeCurrentTable = shouldSynchronizeCurrentTable();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Override // org.apache.paimon.flink.sink.cdc.EventParser
    public String parseTableName() {
        return this.tableNameConverter.convert(Identifier.create(getDatabaseName(), this.currentTable));
    }

    private boolean isSchemaChange() {
        return this.payload.get("op") == null;
    }

    @Override // org.apache.paimon.flink.sink.cdc.EventParser
    public List<DataField> parseSchemaChange() {
        if (!this.shouldSynchronizeCurrentTable || !isSchemaChange()) {
            return Collections.emptyList();
        }
        JsonNode jsonNode = this.payload.get("historyRecord");
        if (jsonNode == null) {
            return Collections.emptyList();
        }
        try {
            JsonNode jsonNode2 = this.objectMapper.readTree(jsonNode.asText()).get("tableChanges");
            if (jsonNode2.size() != 1) {
                LOG.error("Invalid historyRecord, because tableChanges should contain exactly 1 item.\n" + jsonNode.asText());
                return Collections.emptyList();
            }
            JsonNode jsonNode3 = jsonNode2.get(0).get("table").get("columns");
            if (jsonNode3 == null) {
                return Collections.emptyList();
            }
            ArrayList arrayList = new ArrayList();
            for (int i = 0; i < jsonNode3.size(); i++) {
                JsonNode jsonNode4 = jsonNode3.get(i);
                JsonNode jsonNode5 = jsonNode4.get("length");
                JsonNode jsonNode6 = jsonNode4.get("scale");
                DataType dataType = MySqlTypeUtils.toDataType(jsonNode4.get("typeName").asText(), jsonNode5 == null ? null : Integer.valueOf(jsonNode5.asInt()), jsonNode6 == null ? null : Integer.valueOf(jsonNode6.asInt()), this.typeMapping);
                if (!this.typeMapping.containsMode(TypeMapping.TypeMappingMode.TO_NULLABLE)) {
                    dataType = dataType.copy(jsonNode4.get("optional").asBoolean());
                }
                String asText = jsonNode4.get("name").asText();
                arrayList.add(new DataField(i, this.caseSensitive ? asText : asText.toLowerCase(), dataType));
            }
            return arrayList;
        } catch (Exception e) {
            LOG.info("Failed to parse history record for schema changes", e);
            return Collections.emptyList();
        }
    }

    @Override // org.apache.paimon.flink.sink.cdc.EventParser
    public Optional<Schema> parseNewTable() {
        JsonNode jsonNode;
        if (this.shouldSynchronizeCurrentTable && (jsonNode = this.payload.get("historyRecord")) != null) {
            try {
                JsonNode jsonNode2 = this.objectMapper.readTree(jsonNode.asText()).get("tableChanges");
                if (jsonNode2.size() != 1) {
                    LOG.error("Invalid historyRecord, because tableChanges should contain exactly 1 item.\n" + jsonNode.asText());
                    return Optional.empty();
                }
                JsonNode jsonNode3 = jsonNode2.get(0);
                if (!jsonNode3.get("type").asText().equals(TableChanges.TableChangeType.CREATE.name())) {
                    return Optional.empty();
                }
                if (jsonNode3.get("table").get("primaryKeyColumnNames").size() != 0) {
                    return this.schemaBuilder.build(jsonNode3);
                }
                LOG.debug("Didn't find primary keys from MySQL DDL for table '{}'. This table won't be synchronized.", this.currentTable);
                this.excludedTables.add(this.currentTable);
                this.shouldSynchronizeCurrentTable = false;
                return Optional.empty();
            } catch (Exception e) {
                LOG.info("Failed to parse history record for schema changes", e);
                return Optional.empty();
            }
        }
        return Optional.empty();
    }

    @Override // org.apache.paimon.flink.sink.cdc.EventParser
    public List<CdcRecord> parseRecords() {
        if (!this.shouldSynchronizeCurrentTable || isSchemaChange()) {
            return Collections.emptyList();
        }
        ArrayList arrayList = new ArrayList();
        Map<String, String> extractRow = extractRow(this.payload.get("before"));
        if (extractRow.size() > 0) {
            arrayList.add(new CdcRecord(RowKind.DELETE, this.caseSensitive ? extractRow : keyCaseInsensitive(extractRow)));
        }
        Map<String, String> extractRow2 = extractRow(this.payload.get("after"));
        if (extractRow2.size() > 0) {
            arrayList.add(new CdcRecord(RowKind.INSERT, this.caseSensitive ? extractRow2 : keyCaseInsensitive(extractRow2)));
        }
        return arrayList;
    }

    private String getDatabaseName() {
        return this.payload.get("source").get("db").asText();
    }

    /* JADX WARN: Type inference failed for: r0v111, types: [java.time.LocalDateTime] */
    /* JADX WARN: Type inference failed for: r0v96, types: [java.time.LocalDateTime] */
    private Map<String, String> extractRow(JsonNode jsonNode) {
        JsonNode jsonNode2 = (JsonNode) Preconditions.checkNotNull(this.root.get("schema"), "MySqlDebeziumJsonEventParser only supports debezium JSON with schema. Please make sure that `includeSchema` is true in the JsonDebeziumDeserializationSchema you created");
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        JsonNode jsonNode3 = jsonNode2.get(CoreOptions.FIELDS_PREFIX);
        for (int i = 0; i < jsonNode3.size(); i++) {
            JsonNode jsonNode4 = jsonNode3.get(i);
            String asText = jsonNode4.get("field").asText();
            if ("before".equals(asText) || "after".equals(asText)) {
                JsonNode jsonNode5 = jsonNode4.get(CoreOptions.FIELDS_PREFIX);
                for (int i2 = 0; i2 < jsonNode5.size(); i2++) {
                    JsonNode jsonNode6 = jsonNode5.get(i2);
                    String asText2 = jsonNode6.get("field").asText();
                    hashMap.put(asText2, jsonNode6.get("type").asText());
                    if (jsonNode6.get("name") != null) {
                        hashMap2.put(asText2, jsonNode6.get("name").asText());
                    }
                }
            }
        }
        Map map = (Map) this.objectMapper.convertValue(jsonNode, new TypeReference<Map<String, Object>>() { // from class: org.apache.paimon.flink.action.cdc.mysql.MySqlDebeziumJsonEventParser.1
        });
        if (map == null) {
            return new HashMap();
        }
        HashMap hashMap3 = new HashMap();
        for (Map.Entry entry : hashMap.entrySet()) {
            String str = (String) entry.getKey();
            String str2 = (String) entry.getValue();
            Object obj = map.get(str);
            if (obj != null) {
                String str3 = (String) hashMap2.get(str);
                String obj2 = obj.toString();
                String str4 = obj2;
                if ("io.debezium.data.Bits".equals(str3)) {
                    byte[] decode = Base64.getDecoder().decode(obj2);
                    byte[] bArr = new byte[decode.length];
                    for (int i3 = 0; i3 < decode.length; i3++) {
                        bArr[i3] = decode[(decode.length - 1) - i3];
                    }
                    str4 = this.typeMapping.containsMode(TypeMapping.TypeMappingMode.TO_STRING) ? StringUtils.bytesToBinaryString(bArr) : Base64.getEncoder().encodeToString(bArr);
                } else if ("bytes".equals(str2) && str3 == null) {
                    str4 = new String(Base64.getDecoder().decode(obj2));
                } else if ("bytes".equals(str2) && "org.apache.flink.kafka.shaded.org.apache.kafka.connect.data.Decimal".equals(str3)) {
                    try {
                        new BigDecimal(obj2);
                    } catch (NumberFormatException e) {
                        throw new IllegalArgumentException("Invalid big decimal value " + obj2 + ". Make sure that in the `customConverterConfigs` of the JsonDebeziumDeserializationSchema you created, set 'decimal.format' to 'numeric'", e);
                    }
                } else if ("io.debezium.time.Date".equals(str3)) {
                    str4 = DateTimeUtils.toLocalDate(Integer.parseInt(obj2)).toString();
                } else if ("io.debezium.time.Timestamp".equals(str3)) {
                    str4 = DateTimeUtils.formatLocalDateTime(DateTimeUtils.toLocalDateTime(Long.parseLong(obj2), ZoneOffset.UTC), 3);
                } else if ("io.debezium.time.MicroTimestamp".equals(str3)) {
                    long parseLong = Long.parseLong(obj2);
                    str4 = DateTimeUtils.formatLocalDateTime(Instant.ofEpochSecond(parseLong / 1000000, (parseLong % 1000000) * 1000).atZone(ZoneOffset.UTC).toLocalDateTime(), 6);
                } else if ("io.debezium.time.ZonedTimestamp".equals(str3)) {
                    str4 = DateTimeUtils.formatLocalDateTime(Instant.parse(obj2).atZone(this.serverTimeZone).toLocalDateTime(), 6);
                } else if ("io.debezium.time.MicroTime".equals(str3)) {
                    long parseLong2 = Long.parseLong(obj2);
                    str4 = Instant.ofEpochSecond(parseLong2 / 1000000, (parseLong2 % 1000000) * 1000).atZone(ZoneOffset.UTC).toLocalTime().toString();
                } else if ("io.debezium.data.geometry.Point".equals(str3) || "io.debezium.data.geometry.Geometry".equals(str3)) {
                    JsonNode jsonNode7 = jsonNode.get(str);
                    try {
                        str4 = MySqlTypeUtils.convertWkbArray(jsonNode7.get("wkb").binaryValue());
                    } catch (Exception e2) {
                        throw new IllegalArgumentException(String.format("Failed to convert %s to geometry JSON.", jsonNode7), e2);
                    }
                }
                hashMap3.put(str, str4);
            }
        }
        for (ComputedColumn computedColumn : this.computedColumns) {
            hashMap3.put(computedColumn.columnName(), computedColumn.eval((String) hashMap3.get(computedColumn.fieldReference())));
        }
        return hashMap3;
    }

    private Map<String, String> keyCaseInsensitive(Map<String, String> map) {
        HashMap hashMap = new HashMap();
        for (Map.Entry<String, String> entry : map.entrySet()) {
            String lowerCase = entry.getKey().toLowerCase();
            Preconditions.checkArgument(!hashMap.containsKey(lowerCase), "Duplicate key appears when converting map keys to case-insensitive form. Original map is:\n%s", map);
            hashMap.put(lowerCase, entry.getValue());
        }
        return hashMap;
    }

    private boolean shouldSynchronizeCurrentTable() {
        if (this.excludedTables.contains(this.currentTable)) {
            return false;
        }
        if (this.includedTables.contains(this.currentTable)) {
            return true;
        }
        boolean z = true;
        if (this.includingPattern != null) {
            z = this.includingPattern.matcher(this.currentTable).matches();
        }
        if (this.excludingPattern != null) {
            z = z && !this.excludingPattern.matcher(this.currentTable).matches();
        }
        if (z) {
            this.includedTables.add(this.currentTable);
            return true;
        }
        LOG.debug("Source table {} won't be synchronized because it was excluded. ", this.currentTable);
        this.excludedTables.add(this.currentTable);
        return false;
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -2005429802:
                if (implMethodName.equals("lambda$new$b05ea5a7$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/paimon/flink/sink/cdc/NewTableSchemaBuilder") && serializedLambda.getFunctionalInterfaceMethodName().equals("build") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/util/Optional;") && serializedLambda.getImplClass().equals("org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/paimon/shade/jackson2/com/fasterxml/jackson/databind/JsonNode;)Ljava/util/Optional;")) {
                    return jsonNode -> {
                        return Optional.empty();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
