package com.ververica.cdc.connectors.mysql.source.assigners;

import com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils;
import com.ververica.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext;
import com.ververica.cdc.connectors.mysql.schema.MySqlSchema;
import com.ververica.cdc.connectors.mysql.source.MySqlSourceOptions;
import com.ververica.cdc.connectors.mysql.source.assigners.state.SnapshotPendingSplitsState;
import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset;
import com.ververica.cdc.connectors.mysql.source.split.MySqlSnapshotSplit;
import com.ververica.cdc.connectors.mysql.source.split.MySqlSplit;
import com.ververica.cdc.connectors.mysql.source.utils.TableDiscoveryUtils;
import io.debezium.connector.mysql.MySqlConnection;
import io.debezium.relational.RelationalTableFilters;
import io.debezium.relational.TableId;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import javax.annotation.Nullable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.class */
public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner {
    private static final Logger LOG = LoggerFactory.getLogger(MySqlSnapshotSplitAssigner.class);
    private final List<TableId> alreadyProcessedTables;
    private final List<MySqlSnapshotSplit> remainingSplits;
    private final Map<String, MySqlSnapshotSplit> assignedSplits;
    private final Map<String, BinlogOffset> splitFinishedOffsets;
    private boolean assignerFinished;
    private final Configuration configuration;
    private final int currentParallelism;
    private final LinkedList<TableId> remainingTables;
    private final RelationalTableFilters tableFilters;
    private final int chunkSize;
    private MySqlConnection jdbc;
    private ChunkSplitter chunkSplitter;

    @Nullable
    private Long checkpointIdToFinish;

    public MySqlSnapshotSplitAssigner(Configuration configuration, int i) {
        this(configuration, i, new ArrayList(), new ArrayList(), new HashMap(), new HashMap(), false);
    }

    public MySqlSnapshotSplitAssigner(Configuration configuration, int i, SnapshotPendingSplitsState snapshotPendingSplitsState) {
        this(configuration, i, snapshotPendingSplitsState.getAlreadyProcessedTables(), snapshotPendingSplitsState.getRemainingSplits(), snapshotPendingSplitsState.getAssignedSplits(), snapshotPendingSplitsState.getSplitFinishedOffsets(), snapshotPendingSplitsState.isAssignerFinished());
    }

    private MySqlSnapshotSplitAssigner(Configuration configuration, int i, List<TableId> list, List<MySqlSnapshotSplit> list2, Map<String, MySqlSnapshotSplit> map, Map<String, BinlogOffset> map2, boolean z) {
        this.configuration = configuration;
        this.currentParallelism = i;
        this.alreadyProcessedTables = list;
        this.remainingSplits = list2;
        this.assignedSplits = map;
        this.splitFinishedOffsets = map2;
        this.assignerFinished = z;
        this.remainingTables = new LinkedList<>();
        this.tableFilters = DebeziumUtils.createTableFilters(configuration);
        this.chunkSize = ((Integer) configuration.get(MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE)).intValue();
        Preconditions.checkState(this.chunkSize > 1, String.format("The value of option '%s' must larger than 1, but is %d", MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE.key(), Integer.valueOf(this.chunkSize)));
    }

    @Override // com.ververica.cdc.connectors.mysql.source.assigners.MySqlSplitAssigner
    public void open() {
        this.jdbc = DebeziumUtils.openMySqlConnection(this.configuration);
        this.chunkSplitter = createChunkSplitter(this.configuration, this.jdbc, this.chunkSize);
        if (this.assignerFinished) {
            return;
        }
        this.remainingTables.addAll(discoverCapturedTables());
    }

    @Override // com.ververica.cdc.connectors.mysql.source.assigners.MySqlSplitAssigner
    public Optional<MySqlSplit> getNext() {
        if (!this.remainingSplits.isEmpty()) {
            Iterator<MySqlSnapshotSplit> it = this.remainingSplits.iterator();
            MySqlSnapshotSplit next = it.next();
            it.remove();
            this.assignedSplits.put(next.splitId(), next);
            return Optional.of(next);
        }
        TableId pollFirst = this.remainingTables.pollFirst();
        if (pollFirst == null) {
            return Optional.empty();
        }
        this.remainingSplits.addAll(this.chunkSplitter.generateSplits(pollFirst));
        this.alreadyProcessedTables.add(pollFirst);
        return getNext();
    }

    @Override // com.ververica.cdc.connectors.mysql.source.assigners.MySqlSplitAssigner
    public boolean waitingForFinishedSplits() {
        return !allSplitsFinished();
    }

    @Override // com.ververica.cdc.connectors.mysql.source.assigners.MySqlSplitAssigner
    public void onFinishedSplits(Map<String, BinlogOffset> map) {
        this.splitFinishedOffsets.putAll(map);
        if (allSplitsFinished()) {
            if (this.currentParallelism != 1) {
                LOG.info("Snapshot split assigner received all splits finished, waiting for a complete checkpoint to mark the assigner finished.");
            } else {
                this.assignerFinished = true;
                LOG.info("Snapshot split assigner received all splits finished and the job parallelism is 1, snapshot split assigner is turn into finished status.");
            }
        }
    }

    @Override // com.ververica.cdc.connectors.mysql.source.assigners.MySqlSplitAssigner
    public void addSplits(Collection<MySqlSplit> collection) {
        for (MySqlSplit mySqlSplit : collection) {
            this.remainingSplits.add(mySqlSplit.asSnapshotSplit());
            this.assignedSplits.remove(mySqlSplit.splitId());
            this.splitFinishedOffsets.remove(mySqlSplit.splitId());
        }
    }

    @Override // com.ververica.cdc.connectors.mysql.source.assigners.MySqlSplitAssigner
    public SnapshotPendingSplitsState snapshotState(long j) {
        SnapshotPendingSplitsState snapshotPendingSplitsState = new SnapshotPendingSplitsState(this.alreadyProcessedTables, this.remainingSplits, this.assignedSplits, this.splitFinishedOffsets, this.assignerFinished);
        if (this.checkpointIdToFinish == null && !this.assignerFinished && allSplitsFinished()) {
            this.checkpointIdToFinish = Long.valueOf(j);
        }
        return snapshotPendingSplitsState;
    }

    @Override // com.ververica.cdc.connectors.mysql.source.assigners.MySqlSplitAssigner
    public void notifyCheckpointComplete(long j) {
        if (this.checkpointIdToFinish == null || this.assignerFinished || !allSplitsFinished()) {
            return;
        }
        this.assignerFinished = j >= this.checkpointIdToFinish.longValue();
        LOG.info("Snapshot split assigner is turn into finished status.");
    }

    @Override // com.ververica.cdc.connectors.mysql.source.assigners.MySqlSplitAssigner
    public void close() {
        if (this.jdbc != null) {
            DebeziumUtils.closeMySqlConnection(this.jdbc);
        }
    }

    public boolean noMoreSplits() {
        return this.remainingTables.isEmpty() && this.remainingSplits.isEmpty();
    }

    public boolean isFinished() {
        return this.assignerFinished;
    }

    public Map<String, MySqlSnapshotSplit> getAssignedSplits() {
        return this.assignedSplits;
    }

    public Map<String, BinlogOffset> getSplitFinishedOffsets() {
        return this.splitFinishedOffsets;
    }

    private boolean allSplitsFinished() {
        return noMoreSplits() && this.assignedSplits.size() == this.splitFinishedOffsets.size();
    }

    private List<TableId> discoverCapturedTables() {
        try {
            List<TableId> listTables = TableDiscoveryUtils.listTables(this.jdbc, this.tableFilters);
            if (listTables.isEmpty()) {
                throw new IllegalArgumentException(String.format("Can't find any matched tables, please check your configured database-name: %s and table-name: %s", this.configuration.get(MySqlSourceOptions.DATABASE_NAME), this.configuration.get(MySqlSourceOptions.TABLE_NAME)));
            }
            return listTables;
        } catch (SQLException e) {
            throw new FlinkRuntimeException("Failed to discover captured tables", e);
        }
    }

    private static ChunkSplitter createChunkSplitter(Configuration configuration, MySqlConnection mySqlConnection, int i) {
        return new ChunkSplitter(mySqlConnection, new MySqlSchema(StatefulTaskContext.toDebeziumConfig(configuration), mySqlConnection), i);
    }
}
