package com.ververica.cdc.connectors.mongodb.source.reader;

import com.ververica.cdc.connectors.base.source.meta.split.ChangeEventRecords;
import com.ververica.cdc.connectors.base.source.meta.split.SnapshotSplit;
import com.ververica.cdc.connectors.base.source.meta.split.SourceRecords;
import com.ververica.cdc.connectors.base.source.meta.wartermark.WatermarkEvent;
import com.ververica.cdc.connectors.base.source.reader.IncrementalSourceSplitReader;
import com.ververica.cdc.connectors.base.source.utils.hooks.SnapshotPhaseHooks;
import com.ververica.cdc.connectors.mongodb.source.MongoDBSourceTestBase;
import com.ververica.cdc.connectors.mongodb.source.assigners.splitters.SampleBucketSplitStrategy;
import com.ververica.cdc.connectors.mongodb.source.assigners.splitters.ShardedSplitStrategy;
import com.ververica.cdc.connectors.mongodb.source.assigners.splitters.SingleSplitStrategy;
import com.ververica.cdc.connectors.mongodb.source.assigners.splitters.SplitContext;
import com.ververica.cdc.connectors.mongodb.source.assigners.splitters.SplitStrategy;
import com.ververica.cdc.connectors.mongodb.source.assigners.splitters.SplitVectorSplitStrategy;
import com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceConfig;
import com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceConfigFactory;
import com.ververica.cdc.connectors.mongodb.source.dialect.MongoDBDialect;
import io.debezium.relational.TableId;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.bson.BsonDocument;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/ververica/cdc/connectors/mongodb/source/reader/MongoDBSnapshotSplitReaderTest.class */
public class MongoDBSnapshotSplitReaderTest extends MongoDBSourceTestBase {
    private static final Logger LOG = LoggerFactory.getLogger(MongoDBSnapshotSplitReaderTest.class);
    private static final int MAX_RETRY_TIMES = 100;
    private String database;
    private MongoDBSourceConfig sourceConfig;
    private MongoDBDialect dialect;
    private SplitContext splitContext;

    @Before
    public void before() {
        this.database = CONTAINER.executeCommandFileInSeparateDatabase("chunk_test");
        this.sourceConfig = new MongoDBSourceConfigFactory().hosts(CONTAINER.getHostAndPort()).databaseList(new String[]{this.database}).collectionList(new String[]{this.database + ".shopping_cart"}).username("flinkuser").password("a1?~!@#$%^&*(){}[]<>.,+_-=/|:;").splitSizeMB(1).samplesPerChunk(10).pollAwaitTimeMillis(500).create(0);
        this.dialect = new MongoDBDialect();
        this.splitContext = SplitContext.of(this.sourceConfig, new TableId(this.database, (String) null, "shopping_cart"));
    }

    @Test
    public void testMongoDBSnapshotSplitReaderWithShardedSplitter() throws Exception {
        testMongoDBSnapshotSplitReader(ShardedSplitStrategy.INSTANCE);
    }

    @Test
    public void testMongoDBSnapshotSplitReaderWithSplitVectorSplitter() throws Exception {
        testMongoDBSnapshotSplitReader(SplitVectorSplitStrategy.INSTANCE);
    }

    @Test
    public void testMongoDBSnapshotSplitReaderWithSamplerSplitter() throws Exception {
        testMongoDBSnapshotSplitReader(SampleBucketSplitStrategy.INSTANCE);
    }

    @Test
    public void testMongoDBSnapshotSplitReaderWithSingleSplitter() throws Exception {
        testMongoDBSnapshotSplitReader(SingleSplitStrategy.INSTANCE);
    }

    private void testMongoDBSnapshotSplitReader(SplitStrategy splitStrategy) throws Exception {
        LinkedList linkedList = new LinkedList(splitStrategy.split(this.splitContext));
        Assert.assertTrue(linkedList.size() > 0);
        IncrementalSourceSplitReader incrementalSourceSplitReader = new IncrementalSourceSplitReader(0, this.dialect, this.sourceConfig, SnapshotPhaseHooks.empty());
        long j = 0;
        for (int i = 0; i < MAX_RETRY_TIMES; i++) {
            try {
                if (!linkedList.isEmpty() && incrementalSourceSplitReader.canAssignNextSplit()) {
                    SnapshotSplit snapshotSplit = (SnapshotSplit) linkedList.poll();
                    LOG.info("Add snapshot split {}", snapshotSplit.splitId());
                    incrementalSourceSplitReader.handleSplitsChanges(new SplitsAddition(Collections.singletonList(snapshotSplit)));
                }
                ChangeEventRecords fetch = incrementalSourceSplitReader.fetch();
                if (fetch.nextSplit() != null) {
                    while (true) {
                        SourceRecords nextRecordFromSplit = fetch.nextRecordFromSplit();
                        if (nextRecordFromSplit != null) {
                            Iterator it = nextRecordFromSplit.iterator();
                            while (it.hasNext()) {
                                SourceRecord sourceRecord = (SourceRecord) it.next();
                                if (!WatermarkEvent.isWatermarkEvent(sourceRecord)) {
                                    BsonDocument parse = BsonDocument.parse(((Struct) sourceRecord.value()).getString("fullDocument"));
                                    long longValue = parse.getInt64("product_no").longValue();
                                    String value = parse.getString("product_kind").getValue();
                                    String value2 = parse.getString("user_id").getValue();
                                    String value3 = parse.getString("description").getValue();
                                    Assert.assertEquals("KIND_" + longValue, value);
                                    Assert.assertEquals("user_" + longValue, value2);
                                    Assert.assertEquals("my shopping cart " + longValue, value3);
                                    j++;
                                }
                            }
                        }
                    }
                } else if (linkedList.isEmpty() && incrementalSourceSplitReader.canAssignNextSplit()) {
                    break;
                }
                Thread.sleep(300L);
            } finally {
                incrementalSourceSplitReader.close();
            }
        }
        Assert.assertEquals(this.splitContext.getDocumentCount(), j);
    }
}
