package com.hazelcast.jet.sql.impl.connector.mongodb;

import com.hazelcast.core.HazelcastJsonValue;
import com.hazelcast.function.FunctionEx;
import com.hazelcast.function.SupplierEx;
import com.hazelcast.jet.core.EventTimePolicy;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.ProcessorSupplier;
import com.hazelcast.jet.mongodb.impl.ReadMongoP;
import com.hazelcast.jet.mongodb.impl.ReadMongoParams;
import com.hazelcast.shaded.com.google.common.base.Preconditions;
import com.hazelcast.sql.impl.expression.ExpressionEvalContext;
import com.hazelcast.sql.impl.row.JetSqlRow;
import com.hazelcast.sql.impl.type.QueryDataType;
import com.hazelcast.sql.impl.type.QueryDataTypeFamily;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.model.Aggregates;
import com.mongodb.client.model.Projections;
import com.mongodb.client.model.changestream.ChangeStreamDocument;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import javax.annotation.Nonnull;
import org.bson.BsonTimestamp;
import org.bson.Document;
import org.bson.conversions.Bson;

/* loaded from: input_file:com/hazelcast/jet/sql/impl/connector/mongodb/SelectProcessorSupplier.class */
public class SelectProcessorSupplier implements ProcessorSupplier {
    private transient SupplierEx<? extends MongoClient> clientSupplier;
    private final String databaseName;
    private final String collectionName;
    private final boolean stream;
    private final FunctionEx<ExpressionEvalContext, EventTimePolicy<JetSqlRow>> eventTimePolicyProvider;
    private final Document predicate;
    private final List<String> projection;
    private final Long startAt;
    private final String connectionString;
    private final String dataLinkName;
    private transient ExpressionEvalContext evalContext;
    private final QueryDataType[] types;

    SelectProcessorSupplier(MongoTable mongoTable, Document document, List<String> list, BsonTimestamp bsonTimestamp, boolean z, FunctionEx<ExpressionEvalContext, EventTimePolicy<JetSqlRow>> functionEx) {
        Preconditions.checkArgument((list == null || list.isEmpty()) ? false : true, "projection cannot be empty");
        this.predicate = document;
        this.projection = list;
        this.connectionString = mongoTable.connectionString;
        this.dataLinkName = mongoTable.dataLinkName;
        this.databaseName = mongoTable.databaseName;
        this.collectionName = mongoTable.collectionName;
        this.startAt = bsonTimestamp == null ? null : Long.valueOf(bsonTimestamp.getValue());
        this.stream = z;
        this.eventTimePolicyProvider = functionEx;
        this.types = mongoTable.resolveColumnTypes(list);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SelectProcessorSupplier(MongoTable mongoTable, Document document, List<String> list, BsonTimestamp bsonTimestamp, FunctionEx<ExpressionEvalContext, EventTimePolicy<JetSqlRow>> functionEx) {
        this(mongoTable, document, list, bsonTimestamp, true, functionEx);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SelectProcessorSupplier(MongoTable mongoTable, Document document, List<String> list) {
        this(mongoTable, document, list, null, false, null);
    }

    public void init(@Nonnull ProcessorSupplier.Context context) {
        if (this.connectionString != null) {
            this.clientSupplier = () -> {
                return MongoClients.create(this.connectionString);
            };
        }
        this.evalContext = ExpressionEvalContext.from(context);
    }

    @Nonnull
    public Collection<? extends Processor> get(int i) {
        ArrayList arrayList = new ArrayList();
        if (this.predicate != null) {
            arrayList.add(Aggregates.match(ParameterReplacer.replacePlaceholders(this.predicate, this.evalContext).toBsonDocument()));
        }
        Bson include = Projections.include(this.projection);
        if (this.projection.contains("_id") || this.stream) {
            arrayList.add(Aggregates.project(include));
        } else {
            arrayList.add(Aggregates.project(Projections.fields(new Bson[]{Projections.excludeId(), include})));
        }
        ArrayList arrayList2 = new ArrayList();
        EventTimePolicy noEventTime = this.eventTimePolicyProvider == null ? EventTimePolicy.noEventTime() : (EventTimePolicy) this.eventTimePolicyProvider.apply(this.evalContext);
        SupplierEx<? extends MongoClient> supplierEx = this.clientSupplier;
        for (int i2 = 0; i2 < i; i2++) {
            arrayList2.add(new ReadMongoP(new ReadMongoParams(this.stream).setClientSupplier(supplierEx).setDataLinkRef(this.dataLinkName).setAggregates(arrayList).setDatabaseName(this.databaseName).setCollectionName(this.collectionName).setMapItemFn(this::convertDocToRow).setMapStreamFn(this::convertStreamDocToRow).setStartAtTimestamp(this.startAt == null ? null : new BsonTimestamp(this.startAt.longValue())).setEventTimePolicy(noEventTime)));
        }
        return arrayList2;
    }

    private JetSqlRow convertDocToRow(Document document) {
        Object[] objArr = new Object[document.size()];
        for (Map.Entry entry : document.entrySet()) {
            int indexInProjection = indexInProjection((String) entry.getKey());
            objArr[indexInProjection] = convert(entry.getValue(), indexInProjection);
        }
        return new JetSqlRow(this.evalContext.getSerializationService(), objArr);
    }

    private Object convert(Object obj, int i) {
        Object unwrap = BsonTypes.unwrap(obj);
        return ((unwrap instanceof Document) && this.types[i].getTypeFamily() == QueryDataTypeFamily.JSON) ? new HazelcastJsonValue(((Document) unwrap).toJson()) : this.types[i].convert(unwrap);
    }

    private JetSqlRow convertStreamDocToRow(ChangeStreamDocument<Document> changeStreamDocument) {
        Document document = (Document) changeStreamDocument.getFullDocument();
        Objects.requireNonNull(document, "Document is empty");
        Object[] objArr = new Object[this.projection.size()];
        for (Map.Entry entry : document.entrySet()) {
            int indexInProjection = indexInProjection((String) entry.getKey());
            if (indexInProjection != -1) {
                objArr[indexInProjection] = convert(entry.getValue(), indexInProjection);
            }
        }
        addIfInProjection(changeStreamDocument.getOperationType().getValue(), "operationType", objArr);
        addIfInProjection(changeStreamDocument.getResumeToken().toString(), "resumeToken", objArr);
        return new JetSqlRow(this.evalContext.getSerializationService(), objArr);
    }

    private void addIfInProjection(Object obj, String str, Object[] objArr) {
        int indexInProjection = indexInProjection(str);
        if (indexInProjection == -1) {
            return;
        }
        objArr[indexInProjection] = obj;
    }

    private int indexInProjection(String str) {
        int indexOf = this.projection.indexOf(str);
        if (indexOf == -1) {
            indexOf = this.projection.indexOf("fullDocument." + str);
        }
        return indexOf;
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1567913510:
                if (implMethodName.equals("convertDocToRow")) {
                    z = false;
                    break;
                }
                break;
            case -1507323078:
                if (implMethodName.equals("convertStreamDocToRow")) {
                    z = true;
                    break;
                }
                break;
            case 989243930:
                if (implMethodName.equals("lambda$init$275d807a$1")) {
                    z = 2;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/FunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/sql/impl/connector/mongodb/SelectProcessorSupplier") && serializedLambda.getImplMethodSignature().equals("(Lorg/bson/Document;)Lcom/hazelcast/sql/impl/row/JetSqlRow;")) {
                    SelectProcessorSupplier selectProcessorSupplier = (SelectProcessorSupplier) serializedLambda.getCapturedArg(0);
                    return selectProcessorSupplier::convertDocToRow;
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/FunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/sql/impl/connector/mongodb/SelectProcessorSupplier") && serializedLambda.getImplMethodSignature().equals("(Lcom/mongodb/client/model/changestream/ChangeStreamDocument;)Lcom/hazelcast/sql/impl/row/JetSqlRow;")) {
                    SelectProcessorSupplier selectProcessorSupplier2 = (SelectProcessorSupplier) serializedLambda.getCapturedArg(0);
                    return selectProcessorSupplier2::convertStreamDocToRow;
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/SupplierEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("getEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/sql/impl/connector/mongodb/SelectProcessorSupplier") && serializedLambda.getImplMethodSignature().equals("()Lcom/mongodb/client/MongoClient;")) {
                    SelectProcessorSupplier selectProcessorSupplier3 = (SelectProcessorSupplier) serializedLambda.getCapturedArg(0);
                    return () -> {
                        return MongoClients.create(this.connectionString);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
