package com.ververica.cdc.connectors.mongodb.internal;

import com.mongodb.ConnectionString;
import com.mongodb.MongoNamespace;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.model.Aggregates;
import com.mongodb.client.model.Filters;
import com.mongodb.kafka.connect.source.MongoSourceConfig;
import com.mongodb.kafka.connect.source.MongoSourceTask;
import com.ververica.cdc.connectors.mongodb.utils.CollectionDiscoveryUtils;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.clients.producer.RecordMetadata;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Schema;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.source.SourceRecord;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.source.SourceTask;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.source.SourceTaskContext;
import io.debezium.connector.SnapshotRecord;
import java.lang.reflect.Field;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.bson.conversions.Bson;
import org.bson.json.JsonReader;

/* loaded from: input_file:com/ververica/cdc/connectors/mongodb/internal/MongoDBConnectorSourceTask.class */
public class MongoDBConnectorSourceTask extends SourceTask {
    public static final String DATABASE_INCLUDE_LIST = "database.include.list";
    public static final String COLLECTION_INCLUDE_LIST = "collection.include.list";
    private static final String TRUE = "true";
    private static final Schema HEARTBEAT_VALUE_SCHEMA = SchemaBuilder.struct().field("ts_ms", Schema.INT64_SCHEMA).build();
    private SourceRecord currentLastSnapshotRecord;
    private boolean isInSnapshotPhase = false;
    private final MongoSourceTask target = new MongoSourceTask();
    private final Field isCopyingField = MongoSourceTask.class.getDeclaredField("isCopying");

    @Override // com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.connector.Task
    public String version() {
        return this.target.version();
    }

    @Override // com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.source.SourceTask
    public void initialize(SourceTaskContext sourceTaskContext) {
        this.target.initialize(sourceTaskContext);
        this.context = sourceTaskContext;
    }

    @Override // com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.source.SourceTask, com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.connector.Task
    public void start(Map<String, String> map) {
        initCapturedCollections(map);
        this.target.start(map);
        this.isInSnapshotPhase = isCopying();
    }

    @Override // com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.source.SourceTask
    public void commit() throws InterruptedException {
        this.target.commit();
    }

    @Override // com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.source.SourceTask
    public void commitRecord(SourceRecord sourceRecord) throws InterruptedException {
        this.target.commitRecord(sourceRecord);
    }

    @Override // com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.source.SourceTask
    public void commitRecord(SourceRecord sourceRecord, RecordMetadata recordMetadata) throws InterruptedException {
        this.target.commitRecord(sourceRecord, recordMetadata);
    }

    @Override // com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.source.SourceTask
    public List<SourceRecord> poll() throws InterruptedException {
        List<SourceRecord> poll = this.target.poll();
        LinkedList linkedList = new LinkedList();
        if (this.isInSnapshotPhase) {
            if (poll != null && !poll.isEmpty()) {
                Iterator<SourceRecord> it = poll.iterator();
                while (it.hasNext()) {
                    SourceRecord markRecordTimestamp = markRecordTimestamp(it.next());
                    if (isSnapshotRecord(markRecordTimestamp)) {
                        markSnapshotRecord(markRecordTimestamp);
                        if (this.currentLastSnapshotRecord != null) {
                            linkedList.add(this.currentLastSnapshotRecord);
                        }
                        this.currentLastSnapshotRecord = markRecordTimestamp;
                    } else {
                        if (this.currentLastSnapshotRecord != null) {
                            linkedList.add(markLastSnapshotRecordOfAll(this.currentLastSnapshotRecord));
                            this.currentLastSnapshotRecord = null;
                            this.isInSnapshotPhase = false;
                        }
                        linkedList.add(markRecordTimestamp);
                    }
                }
            } else if (!isCopying()) {
                if (this.currentLastSnapshotRecord != null) {
                    linkedList.add(markLastSnapshotRecordOfAll(this.currentLastSnapshotRecord));
                    this.currentLastSnapshotRecord = null;
                }
                this.isInSnapshotPhase = false;
            }
        } else if (poll != null && !poll.isEmpty()) {
            Iterator<SourceRecord> it2 = poll.iterator();
            while (it2.hasNext()) {
                linkedList.add(markRecordTimestamp(it2.next()));
            }
        }
        return linkedList;
    }

    @Override // com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.source.SourceTask, com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.connector.Task
    public void stop() {
        this.target.stop();
    }

    private SourceRecord markRecordTimestamp(SourceRecord sourceRecord) {
        return isHeartbeatRecord(sourceRecord) ? markTimestampForHeartbeatRecord(sourceRecord) : markTimestampForDataRecord(sourceRecord);
    }

    private SourceRecord markTimestampForDataRecord(SourceRecord sourceRecord) {
        String string;
        Struct struct = (Struct) sourceRecord.value();
        Struct struct2 = new Struct(struct.schema().field("source").schema());
        long j = 0;
        if (struct.schema().field(MongoDBEnvelope.CLUSTER_TIME_FIELD) != null && (string = struct.getString(MongoDBEnvelope.CLUSTER_TIME_FIELD)) != null) {
            j = new JsonReader(string).readTimestamp().getTime() * 1000;
        }
        struct2.put("ts_ms", Long.valueOf(j));
        struct.put("source", struct2);
        return sourceRecord;
    }

    private SourceRecord markTimestampForHeartbeatRecord(SourceRecord sourceRecord) {
        Struct struct = new Struct(HEARTBEAT_VALUE_SCHEMA);
        struct.put("ts_ms", Long.valueOf(Instant.now().toEpochMilli()));
        return new SourceRecord(sourceRecord.sourcePartition(), sourceRecord.sourceOffset(), sourceRecord.topic(), sourceRecord.keySchema(), sourceRecord.key(), HEARTBEAT_VALUE_SCHEMA, struct);
    }

    private void markSnapshotRecord(SourceRecord sourceRecord) {
        SnapshotRecord.TRUE.toSource(((Struct) sourceRecord.value()).getStruct("source"));
    }

    private SourceRecord markLastSnapshotRecordOfAll(SourceRecord sourceRecord) {
        Struct struct = ((Struct) sourceRecord.value()).getStruct("source");
        if (SnapshotRecord.fromSource(struct) == SnapshotRecord.TRUE) {
            SnapshotRecord.LAST.toSource(struct);
        }
        return sourceRecord;
    }

    private boolean isSnapshotRecord(SourceRecord sourceRecord) {
        return TRUE.equals(sourceRecord.sourceOffset().get(MongoDBEnvelope.COPY_KEY_FIELD));
    }

    private boolean isHeartbeatRecord(SourceRecord sourceRecord) {
        return TRUE.equals(sourceRecord.sourceOffset().get("HEARTBEAT"));
    }

    private boolean isCopying() {
        this.isCopyingField.setAccessible(true);
        try {
            return ((AtomicBoolean) this.isCopyingField.get(this.target)).get();
        } catch (IllegalAccessException e) {
            throw new IllegalStateException("Cannot access isCopying field of SourceTask", e);
        }
    }

    private void initCapturedCollections(Map<String, String> map) {
        MongoClient create;
        ConnectionString connectionString = new ConnectionString(map.get("connection.uri"));
        String str = map.get(DATABASE_INCLUDE_LIST);
        String str2 = map.get(COLLECTION_INCLUDE_LIST);
        List list = (List) Optional.ofNullable(str).map(str3 -> {
            return Arrays.asList(str3.split(","));
        }).orElse(null);
        List list2 = (List) Optional.ofNullable(str2).map(str4 -> {
            return Arrays.asList(str4.split(","));
        }).orElse(null);
        if (list2 == null) {
            if (list != null) {
                create = MongoClients.create(connectionString);
                Throwable th = null;
                try {
                    try {
                        List<String> databaseNames = CollectionDiscoveryUtils.databaseNames(create, CollectionDiscoveryUtils.databaseFilter(list));
                        if (create != null) {
                            if (0 != 0) {
                                try {
                                    create.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                create.close();
                            }
                        }
                        if (CollectionDiscoveryUtils.isIncludeListExplicitlySpecified(list, databaseNames)) {
                            map.put("database", databaseNames.get(0));
                            return;
                        }
                        String str5 = (String) CollectionDiscoveryUtils.includeListAsPatterns(list).stream().map((v0) -> {
                            return v0.pattern();
                        }).collect(Collectors.joining("|"));
                        ArrayList arrayList = new ArrayList();
                        arrayList.add(Aggregates.match(Filters.regex("ns.db", str5)));
                        map.put(MongoSourceConfig.PIPELINE_CONFIG, CollectionDiscoveryUtils.bsonListToJson(arrayList));
                        map.put(MongoSourceConfig.COPY_EXISTING_NAMESPACE_REGEX_CONFIG, (String) databaseNames.stream().map(str6 -> {
                            return CollectionDiscoveryUtils.completionPattern(str6 + "\\..*").pattern();
                        }).collect(Collectors.joining("|")));
                        return;
                    } catch (Throwable th3) {
                        th = th3;
                        throw th3;
                    }
                } finally {
                }
            }
            return;
        }
        create = MongoClients.create(connectionString);
        Throwable th4 = null;
        try {
            try {
                List<String> collectionNames = CollectionDiscoveryUtils.collectionNames(create, CollectionDiscoveryUtils.databaseNames(create, CollectionDiscoveryUtils.databaseFilter(list)), CollectionDiscoveryUtils.collectionsFilter(list2));
                if (create != null) {
                    if (0 != 0) {
                        try {
                            create.close();
                        } catch (Throwable th5) {
                            th4.addSuppressed(th5);
                        }
                    } else {
                        create.close();
                    }
                }
                if (CollectionDiscoveryUtils.isIncludeListExplicitlySpecified(list2, collectionNames)) {
                    MongoNamespace mongoNamespace = new MongoNamespace(collectionNames.get(0));
                    map.put("database", mongoNamespace.getDatabaseName());
                    map.put("collection", mongoNamespace.getCollectionName());
                    return;
                }
                String str7 = (String) CollectionDiscoveryUtils.includeListAsPatterns(list2).stream().map((v0) -> {
                    return v0.pattern();
                }).collect(Collectors.joining("|"));
                ArrayList arrayList2 = new ArrayList();
                arrayList2.add(CollectionDiscoveryUtils.ADD_NS_FIELD);
                Bson regex = Filters.regex(CollectionDiscoveryUtils.ADD_NS_FIELD_NAME, str7);
                if (list != null) {
                    regex = Filters.and(Filters.regex("ns.db", (String) CollectionDiscoveryUtils.includeListAsPatterns(list).stream().map((v0) -> {
                        return v0.pattern();
                    }).collect(Collectors.joining("|"))), regex);
                }
                arrayList2.add(Aggregates.match(regex));
                map.put(MongoSourceConfig.PIPELINE_CONFIG, CollectionDiscoveryUtils.bsonListToJson(arrayList2));
                map.put(MongoSourceConfig.COPY_EXISTING_NAMESPACE_REGEX_CONFIG, (String) collectionNames.stream().map(str8 -> {
                    return CollectionDiscoveryUtils.completionPattern(str8).pattern();
                }).collect(Collectors.joining("|")));
            } catch (Throwable th6) {
                th4 = th6;
                throw th6;
            }
        } finally {
        }
    }
}
