package com.ververica.cdc.connectors.mysql.source.reader;

import com.ververica.cdc.connectors.mysql.debezium.reader.BinlogSplitReader;
import com.ververica.cdc.connectors.mysql.debezium.reader.DebeziumReader;
import com.ververica.cdc.connectors.mysql.debezium.reader.SnapshotSplitReader;
import com.ververica.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext;
import com.ververica.cdc.connectors.mysql.source.split.MySqlRecords;
import com.ververica.cdc.connectors.mysql.source.split.MySqlSplit;
import io.debezium.connector.mysql.MySqlConnection;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Iterator;
import java.util.Queue;
import javax.annotation.Nullable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/ververica/cdc/connectors/mysql/source/reader/MySqlSplitReader.class */
public class MySqlSplitReader implements SplitReader<SourceRecord, MySqlSplit> {
    private static final Logger LOG = LoggerFactory.getLogger(MySqlSplitReader.class);
    private final Queue<MySqlSplit> splits = new ArrayDeque();
    private final Configuration config;
    private final int subtaskId;

    @Nullable
    private DebeziumReader<SourceRecord, MySqlSplit> currentReader;

    @Nullable
    private String currentSplitId;

    public MySqlSplitReader(Configuration configuration, int i) {
        this.config = configuration;
        this.subtaskId = i;
    }

    public RecordsWithSplitIds<SourceRecord> fetch() throws IOException {
        checkSplitOrStartNext();
        try {
            Iterator<SourceRecord> pollSplitRecords = this.currentReader.pollSplitRecords();
            return pollSplitRecords == null ? finishedSnapshotSplit() : MySqlRecords.forRecords(this.currentSplitId, pollSplitRecords);
        } catch (InterruptedException e) {
            LOG.warn("fetch data failed.", e);
            throw new IOException(e);
        }
    }

    public void handleSplitsChanges(SplitsChange<MySqlSplit> splitsChange) {
        if (!(splitsChange instanceof SplitsAddition)) {
            throw new UnsupportedOperationException(String.format("The SplitChange type of %s is not supported.", splitsChange.getClass()));
        }
        LOG.debug("Handling split change {}", splitsChange);
        this.splits.addAll(splitsChange.splits());
    }

    public void wakeUp() {
    }

    public void close() throws Exception {
        if (this.currentReader != null) {
            LOG.info("Close current debezium reader {}", this.currentReader.getClass().getCanonicalName());
            this.currentReader.close();
            this.currentSplitId = null;
        }
    }

    private void checkSplitOrStartNext() throws IOException {
        if (!(this.currentReader instanceof BinlogSplitReader) && canAssignNextSplit()) {
            MySqlSplit poll = this.splits.poll();
            if (poll == null) {
                throw new IOException("Cannot fetch from another split - no split remaining");
            }
            this.currentSplitId = poll.splitId();
            if (!poll.isSnapshotSplit()) {
                if (this.currentReader != null) {
                    LOG.info("It's turn to read binlog split, close current snapshot reader");
                    this.currentReader.close();
                }
                MySqlConnection connection = StatefulTaskContext.getConnection(this.config);
                StatefulTaskContext statefulTaskContext = new StatefulTaskContext(this.config, StatefulTaskContext.getBinaryClient(this.config), connection);
                LOG.info("Create binlog reader");
                this.currentReader = new BinlogSplitReader(statefulTaskContext, this.subtaskId);
            } else if (this.currentReader == null) {
                MySqlConnection connection2 = StatefulTaskContext.getConnection(this.config);
                this.currentReader = new SnapshotSplitReader(new StatefulTaskContext(this.config, StatefulTaskContext.getBinaryClient(this.config), connection2), this.subtaskId);
            }
            this.currentReader.submitSplit(poll);
        }
    }

    private boolean canAssignNextSplit() {
        return this.currentReader == null || this.currentReader.isFinished();
    }

    private MySqlRecords finishedSnapshotSplit() {
        MySqlRecords forFinishedSplit = MySqlRecords.forFinishedSplit(this.currentSplitId);
        this.currentSplitId = null;
        return forFinishedSplit;
    }
}
