/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.mongodb.table.serialization;

import com.mongodb.client.model.DeleteOneModel;
import com.mongodb.client.model.InsertOneModel;
import com.mongodb.client.model.UpdateOneModel;
import com.mongodb.client.model.UpdateOptions;
import com.mongodb.client.model.WriteModel;
import java.util.Map;
import java.util.function.Function;
import org.apache.flink.annotation.Internal;
import org.apache.flink.connector.mongodb.sink.writer.context.MongoSinkContext;
import org.apache.flink.connector.mongodb.sink.writer.serializer.MongoSerializationSchema;
import org.apache.flink.connector.mongodb.table.converter.RowDataToBsonConverters;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.data.RowData;
import org.bson.BsonDocument;
import org.bson.BsonValue;
import org.bson.conversions.Bson;

@Internal
public class MongoRowDataSerializationSchema
implements MongoSerializationSchema<RowData> {
    private final RowDataToBsonConverters.RowDataToBsonConverter rowDataToBsonConverter;
    private final Function<RowData, BsonValue> primaryKeyExtractor;
    private final Function<RowData, BsonDocument> shardKeysExtractor;

    public MongoRowDataSerializationSchema(RowDataToBsonConverters.RowDataToBsonConverter rowDataToBsonConverter, Function<RowData, BsonValue> primaryKeyExtractor, Function<RowData, BsonDocument> shardKeysExtractor) {
        this.rowDataToBsonConverter = rowDataToBsonConverter;
        this.primaryKeyExtractor = primaryKeyExtractor;
        this.shardKeysExtractor = shardKeysExtractor;
    }

    @Override
    public WriteModel<BsonDocument> serialize(RowData element, MongoSinkContext context) {
        switch (element.getRowKind()) {
            case INSERT: 
            case UPDATE_AFTER: {
                return this.processUpsert(element);
            }
            case UPDATE_BEFORE: 
            case DELETE: {
                return this.processDelete(element);
            }
        }
        throw new TableException("Unsupported message kind: " + element.getRowKind());
    }

    private WriteModel<BsonDocument> processUpsert(RowData row) {
        BsonDocument document = this.rowDataToBsonConverter.convert(row);
        BsonValue key = this.primaryKeyExtractor.apply(row);
        if (key != null) {
            BsonDocument filter = new BsonDocument("_id", key);
            BsonDocument shardKeysFilter = this.shardKeysExtractor.apply(row);
            if (!shardKeysFilter.isEmpty()) {
                filter.putAll((Map)shardKeysFilter);
            }
            document.remove((Object)"_id");
            BsonDocument update = new BsonDocument("$set", (BsonValue)document);
            return new UpdateOneModel((Bson)filter, (Bson)update, new UpdateOptions().upsert(true));
        }
        return new InsertOneModel((Object)document);
    }

    private WriteModel<BsonDocument> processDelete(RowData row) {
        BsonValue key = this.primaryKeyExtractor.apply(row);
        BsonDocument filter = new BsonDocument("_id", key);
        return new DeleteOneModel((Bson)filter);
    }
}

