package com.ververica.cdc.connectors.base.source.assigner;

import com.ververica.cdc.connectors.base.config.SourceConfig;
import com.ververica.cdc.connectors.base.dialect.DataSourceDialect;
import com.ververica.cdc.connectors.base.source.assigner.splitter.ChunkSplitter;
import com.ververica.cdc.connectors.base.source.assigner.state.SnapshotPendingSplitsState;
import com.ververica.cdc.connectors.base.source.meta.offset.Offset;
import com.ververica.cdc.connectors.base.source.meta.offset.OffsetFactory;
import com.ververica.cdc.connectors.base.source.meta.split.FinishedSnapshotSplitInfo;
import com.ververica.cdc.connectors.base.source.meta.split.SnapshotSplit;
import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase;
import io.debezium.relational.TableId;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.util.FlinkRuntimeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/ververica/cdc/connectors/base/source/assigner/SnapshotSplitAssigner.class */
public class SnapshotSplitAssigner implements SplitAssigner {
    private static final Logger LOG = LoggerFactory.getLogger(SnapshotSplitAssigner.class);
    private final List<TableId> alreadyProcessedTables;
    private final List<SnapshotSplit> remainingSplits;
    private final Map<String, SnapshotSplit> assignedSplits;
    private final Map<String, Offset> splitFinishedOffsets;
    private boolean assignerFinished;
    private final SourceConfig sourceConfig;
    private final int currentParallelism;
    private final LinkedList<TableId> remainingTables;
    private final boolean isRemainingTablesCheckpointed;
    private ChunkSplitter chunkSplitter;
    private boolean isTableIdCaseSensitive;

    @Nullable
    private Long checkpointIdToFinish;
    private final DataSourceDialect dialect;
    private OffsetFactory offsetFactory;

    public SnapshotSplitAssigner(SourceConfig sourceConfig, int i, List<TableId> list, boolean z, DataSourceDialect dataSourceDialect, OffsetFactory offsetFactory) {
        this(sourceConfig, i, new ArrayList(), new ArrayList(), new HashMap(), new HashMap(), false, list, z, true, dataSourceDialect, offsetFactory);
    }

    public SnapshotSplitAssigner(SourceConfig sourceConfig, int i, SnapshotPendingSplitsState snapshotPendingSplitsState, DataSourceDialect dataSourceDialect, OffsetFactory offsetFactory) {
        this(sourceConfig, i, snapshotPendingSplitsState.getAlreadyProcessedTables(), snapshotPendingSplitsState.getRemainingSplits(), snapshotPendingSplitsState.getAssignedSplits(), snapshotPendingSplitsState.getSplitFinishedOffsets(), snapshotPendingSplitsState.isAssignerFinished(), snapshotPendingSplitsState.getRemainingTables(), snapshotPendingSplitsState.isTableIdCaseSensitive(), snapshotPendingSplitsState.isRemainingTablesCheckpointed(), dataSourceDialect, offsetFactory);
    }

    private SnapshotSplitAssigner(SourceConfig sourceConfig, int i, List<TableId> list, List<SnapshotSplit> list2, Map<String, SnapshotSplit> map, Map<String, Offset> map2, boolean z, List<TableId> list3, boolean z2, boolean z3, DataSourceDialect dataSourceDialect, OffsetFactory offsetFactory) {
        this.sourceConfig = sourceConfig;
        this.currentParallelism = i;
        this.alreadyProcessedTables = list;
        this.remainingSplits = list2;
        this.assignedSplits = map;
        this.splitFinishedOffsets = map2;
        this.assignerFinished = z;
        this.remainingTables = new LinkedList<>(list3);
        this.isRemainingTablesCheckpointed = z3;
        this.isTableIdCaseSensitive = z2;
        this.dialect = dataSourceDialect;
        this.offsetFactory = offsetFactory;
    }

    @Override // com.ververica.cdc.connectors.base.source.assigner.SplitAssigner
    public void open() {
        this.chunkSplitter = this.dialect.createChunkSplitter(this.sourceConfig);
        if (this.isRemainingTablesCheckpointed || this.assignerFinished) {
            return;
        }
        try {
            List discoverDataCollections = this.dialect.discoverDataCollections(this.sourceConfig);
            discoverDataCollections.removeAll(this.alreadyProcessedTables);
            this.remainingTables.addAll(discoverDataCollections);
            this.isTableIdCaseSensitive = this.dialect.isDataCollectionIdCaseSensitive(this.sourceConfig);
        } catch (Exception e) {
            throw new FlinkRuntimeException("Failed to discover remaining tables to capture", e);
        }
    }

    @Override // com.ververica.cdc.connectors.base.source.assigner.SplitAssigner
    public Optional<SourceSplitBase> getNext() {
        if (!this.remainingSplits.isEmpty()) {
            Iterator<SnapshotSplit> it = this.remainingSplits.iterator();
            SnapshotSplit next = it.next();
            it.remove();
            this.assignedSplits.put(next.splitId(), next);
            return Optional.of(next);
        }
        TableId pollFirst = this.remainingTables.pollFirst();
        if (pollFirst == null) {
            return Optional.empty();
        }
        this.remainingSplits.addAll(this.chunkSplitter.generateSplits(pollFirst));
        this.alreadyProcessedTables.add(pollFirst);
        return getNext();
    }

    @Override // com.ververica.cdc.connectors.base.source.assigner.SplitAssigner
    public boolean waitingForFinishedSplits() {
        return !allSplitsFinished();
    }

    @Override // com.ververica.cdc.connectors.base.source.assigner.SplitAssigner
    public List<FinishedSnapshotSplitInfo> getFinishedSplitInfos() {
        if (waitingForFinishedSplits()) {
            LOG.error("The assigner is not ready to offer finished split information, this should not be called");
            throw new FlinkRuntimeException("The assigner is not ready to offer finished split information, this should not be called");
        }
        List<SnapshotSplit> list = (List) this.assignedSplits.values().stream().sorted(Comparator.comparing((v0) -> {
            return v0.splitId();
        })).collect(Collectors.toList());
        ArrayList arrayList = new ArrayList();
        for (SnapshotSplit snapshotSplit : list) {
            arrayList.add(new FinishedSnapshotSplitInfo(snapshotSplit.getTableId(), snapshotSplit.splitId(), snapshotSplit.getSplitStart(), snapshotSplit.getSplitEnd(), this.splitFinishedOffsets.get(snapshotSplit.splitId()), this.offsetFactory));
        }
        return arrayList;
    }

    @Override // com.ververica.cdc.connectors.base.source.assigner.SplitAssigner
    public void onFinishedSplits(Map<String, Offset> map) {
        this.splitFinishedOffsets.putAll(map);
        if (allSplitsFinished()) {
            if (this.currentParallelism != 1) {
                LOG.info("Snapshot split assigner received all splits finished, waiting for a complete checkpoint to mark the assigner finished.");
            } else {
                this.assignerFinished = true;
                LOG.info("Snapshot split assigner received all splits finished and the job parallelism is 1, snapshot split assigner is turn into finished status.");
            }
        }
    }

    @Override // com.ververica.cdc.connectors.base.source.assigner.SplitAssigner
    public void addSplits(Collection<SourceSplitBase> collection) {
        for (SourceSplitBase sourceSplitBase : collection) {
            this.remainingSplits.add(sourceSplitBase.asSnapshotSplit());
            this.assignedSplits.remove(sourceSplitBase.splitId());
            this.splitFinishedOffsets.remove(sourceSplitBase.splitId());
        }
    }

    @Override // com.ververica.cdc.connectors.base.source.assigner.SplitAssigner
    public SnapshotPendingSplitsState snapshotState(long j) {
        SnapshotPendingSplitsState snapshotPendingSplitsState = new SnapshotPendingSplitsState(this.alreadyProcessedTables, this.remainingSplits, this.assignedSplits, this.splitFinishedOffsets, this.assignerFinished, this.remainingTables, this.isTableIdCaseSensitive, true);
        if (this.checkpointIdToFinish == null && !this.assignerFinished && allSplitsFinished()) {
            this.checkpointIdToFinish = Long.valueOf(j);
        }
        return snapshotPendingSplitsState;
    }

    @Override // com.ververica.cdc.connectors.base.source.assigner.SplitAssigner
    public void notifyCheckpointComplete(long j) {
        if (this.checkpointIdToFinish == null || this.assignerFinished || !allSplitsFinished()) {
            return;
        }
        this.assignerFinished = j >= this.checkpointIdToFinish.longValue();
        LOG.info("Snapshot split assigner is turn into finished status.");
    }

    @Override // com.ververica.cdc.connectors.base.source.assigner.SplitAssigner
    public void close() {
    }

    public boolean noMoreSplits() {
        return this.remainingTables.isEmpty() && this.remainingSplits.isEmpty();
    }

    public boolean isFinished() {
        return this.assignerFinished;
    }

    public Map<String, SnapshotSplit> getAssignedSplits() {
        return this.assignedSplits;
    }

    public Map<String, Offset> getSplitFinishedOffsets() {
        return this.splitFinishedOffsets;
    }

    private boolean allSplitsFinished() {
        return noMoreSplits() && this.assignedSplits.size() == this.splitFinishedOffsets.size();
    }
}
