package org.apache.flink.cdc.connectors.mongodb.source.reader.fetch;

import com.mongodb.client.MongoCursor;
import com.mongodb.client.model.changestream.OperationType;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.pipeline.DataChangeEvent;
import io.debezium.relational.TableId;
import org.apache.flink.cdc.connectors.base.source.meta.offset.Offset;
import org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
import org.apache.flink.cdc.connectors.base.source.meta.split.StreamSplit;
import org.apache.flink.cdc.connectors.base.source.meta.wartermark.WatermarkEvent;
import org.apache.flink.cdc.connectors.base.source.meta.wartermark.WatermarkKind;
import org.apache.flink.cdc.connectors.base.source.reader.external.AbstractScanFetchTask;
import org.apache.flink.cdc.connectors.base.source.reader.external.FetchTask;
import org.apache.flink.cdc.connectors.mongodb.internal.MongoDBEnvelope;
import org.apache.flink.cdc.connectors.mongodb.source.config.MongoDBSourceConfig;
import org.apache.flink.cdc.connectors.mongodb.source.utils.MongoRecordUtils;
import org.apache.flink.cdc.connectors.mongodb.source.utils.MongoUtils;
import org.bson.BsonDocument;
import org.bson.BsonInt64;
import org.bson.BsonString;
import org.bson.RawBsonDocument;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/cdc/connectors/mongodb/source/reader/fetch/MongoDBScanFetchTask.class */
public class MongoDBScanFetchTask extends AbstractScanFetchTask {
    private static final Logger LOG = LoggerFactory.getLogger(MongoDBScanFetchTask.class);

    public MongoDBScanFetchTask(SnapshotSplit snapshotSplit) {
        super(snapshotSplit);
    }

    protected void executeDataSnapshot(FetchTask.Context context) throws Exception {
        ChangeEventQueue queue = context.getQueue();
        MongoDBSourceConfig m19getSourceConfig = ((MongoDBFetchTaskContext) context).m19getSourceConfig();
        TableId tableId = this.snapshotSplit.getTableId();
        MongoCursor mongoCursor = null;
        try {
            try {
                mongoCursor = MongoUtils.collectionFor(MongoUtils.clientFor(m19getSourceConfig), tableId, RawBsonDocument.class).find().min((BsonDocument) this.snapshotSplit.getSplitStart()[1]).max((BsonDocument) this.snapshotSplit.getSplitEnd()[1]).hint((BsonDocument) this.snapshotSplit.getSplitStart()[0]).batchSize(m19getSourceConfig.getBatchSize()).noCursorTimeout(m19getSourceConfig.disableCursorTimeout()).cursor();
                while (mongoCursor.hasNext()) {
                    if (!this.taskRunning) {
                        throw new InterruptedException("Interrupted while snapshotting collection " + tableId.identifier());
                    }
                    BsonDocument normalizeSnapshotDocument = normalizeSnapshotDocument(tableId, (BsonDocument) mongoCursor.next());
                    BsonDocument bsonDocument = new BsonDocument(MongoDBEnvelope.ID_FIELD, normalizeSnapshotDocument.get(MongoDBEnvelope.ID_FIELD));
                    queue.enqueue(new DataChangeEvent(MongoRecordUtils.createSourceRecord(MongoRecordUtils.createPartitionMap(m19getSourceConfig.getScheme(), m19getSourceConfig.getHosts(), tableId.catalog(), tableId.table()), MongoRecordUtils.createSourceOffsetMap(bsonDocument.getDocument(MongoDBEnvelope.ID_FIELD), true), tableId.identifier(), bsonDocument, normalizeSnapshotDocument)));
                }
                if (mongoCursor != null) {
                    mongoCursor.close();
                }
            } catch (Exception e) {
                this.taskRunning = false;
                LOG.error(String.format("Execute snapshot read subtask for mongo split %s fail", this.snapshotSplit), e);
                throw e;
            }
        } catch (Throwable th) {
            if (mongoCursor != null) {
                mongoCursor.close();
            }
            throw th;
        }
    }

    protected void executeBackfillTask(FetchTask.Context context, StreamSplit streamSplit) throws Exception {
        new MongoDBStreamFetchTask(streamSplit).execute(context);
    }

    protected void dispatchLowWaterMarkEvent(FetchTask.Context context, SourceSplitBase sourceSplitBase, Offset offset) throws InterruptedException {
        context.getQueue().enqueue(new DataChangeEvent(WatermarkEvent.create(MongoRecordUtils.createWatermarkPartitionMap(this.snapshotSplit.getTableId().identifier()), MongoDBEnvelope.WATERMARK_TOPIC_NAME, this.snapshotSplit.splitId(), WatermarkKind.LOW, offset)));
    }

    protected void dispatchHighWaterMarkEvent(FetchTask.Context context, SourceSplitBase sourceSplitBase, Offset offset) throws InterruptedException {
        context.getQueue().enqueue(new DataChangeEvent(WatermarkEvent.create(MongoRecordUtils.createWatermarkPartitionMap(this.snapshotSplit.getTableId().identifier()), MongoDBEnvelope.WATERMARK_TOPIC_NAME, sourceSplitBase.splitId(), WatermarkKind.HIGH, offset)));
    }

    protected void dispatchEndWaterMarkEvent(FetchTask.Context context, SourceSplitBase sourceSplitBase, Offset offset) throws InterruptedException {
        context.getQueue().enqueue(new DataChangeEvent(WatermarkEvent.create(MongoRecordUtils.createWatermarkPartitionMap(this.snapshotSplit.getTableId().identifier()), MongoDBEnvelope.WATERMARK_TOPIC_NAME, sourceSplitBase.splitId(), WatermarkKind.END, offset)));
    }

    private BsonDocument normalizeSnapshotDocument(TableId tableId, BsonDocument bsonDocument) {
        BsonDocument bsonDocument2 = new BsonDocument();
        BsonDocument bsonDocument3 = new BsonDocument();
        bsonDocument3.put(MongoDBEnvelope.ID_FIELD, bsonDocument.get(MongoDBEnvelope.ID_FIELD));
        bsonDocument2.put(MongoDBEnvelope.ID_FIELD, bsonDocument3);
        bsonDocument2.put(MongoDBEnvelope.OPERATION_TYPE_FIELD, new BsonString(OperationType.INSERT.getValue()));
        BsonDocument bsonDocument4 = new BsonDocument();
        bsonDocument4.put(MongoDBEnvelope.NAMESPACE_DATABASE_FIELD, new BsonString(tableId.catalog()));
        bsonDocument4.put(MongoDBEnvelope.NAMESPACE_COLLECTION_FIELD, new BsonString(tableId.table()));
        bsonDocument2.put(MongoDBEnvelope.NAMESPACE_FIELD, bsonDocument4);
        bsonDocument2.put(MongoDBEnvelope.DOCUMENT_KEY_FIELD, new BsonDocument(MongoDBEnvelope.ID_FIELD, bsonDocument.get(MongoDBEnvelope.ID_FIELD)));
        bsonDocument2.put(MongoDBEnvelope.FULL_DOCUMENT_FIELD, bsonDocument);
        bsonDocument2.put(MongoDBEnvelope.TIMESTAMP_KEY_FIELD, new BsonInt64(System.currentTimeMillis()));
        BsonDocument bsonDocument5 = new BsonDocument();
        bsonDocument5.put(MongoDBEnvelope.SNAPSHOT_KEY_FIELD, new BsonString("true"));
        bsonDocument5.put(MongoDBEnvelope.TIMESTAMP_KEY_FIELD, new BsonInt64(0L));
        bsonDocument2.put(MongoDBEnvelope.SOURCE_FIELD, bsonDocument5);
        return bsonDocument2;
    }
}
