package com.ververica.cdc.connectors.mysql.source.reader;

import com.ververica.cdc.connectors.mysql.source.events.FinishedSnapshotSplitsAckEvent;
import com.ververica.cdc.connectors.mysql.source.events.FinishedSnapshotSplitsReportEvent;
import com.ververica.cdc.connectors.mysql.source.events.FinishedSnapshotSplitsRequestEvent;
import com.ververica.cdc.connectors.mysql.source.split.MySqlBinlogSplitState;
import com.ververica.cdc.connectors.mysql.source.split.MySqlSnapshotSplit;
import com.ververica.cdc.connectors.mysql.source.split.MySqlSnapshotSplitState;
import com.ververica.cdc.connectors.mysql.source.split.MySqlSplit;
import com.ververica.cdc.connectors.mysql.source.split.MySqlSplitState;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.RecordEmitter;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase;
import org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.util.Preconditions;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/ververica/cdc/connectors/mysql/source/reader/MySqlSourceReader.class */
public class MySqlSourceReader<T> extends SingleThreadMultiplexSourceReaderBase<SourceRecord, T, MySqlSplit, MySqlSplitState> {
    private static final Logger LOG = LoggerFactory.getLogger(MySqlSourceReader.class);
    private final Map<String, MySqlSnapshotSplit> finishedUnackedSplits;
    private final int subtaskId;

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public MySqlSourceReader(FutureCompletingBlockingQueue<RecordsWithSplitIds<SourceRecord>> futureCompletingBlockingQueue, Supplier<MySqlSplitReader> supplier, RecordEmitter<SourceRecord, T, MySqlSplitState> recordEmitter, Configuration configuration, SourceReaderContext sourceReaderContext) {
        super(futureCompletingBlockingQueue, new SingleThreadFetcherManager(futureCompletingBlockingQueue, supplier::get), recordEmitter, configuration, sourceReaderContext);
        supplier.getClass();
        this.finishedUnackedSplits = new HashMap();
        this.subtaskId = sourceReaderContext.getIndexOfSubtask();
    }

    public void start() {
        if (getNumberOfCurrentlyAssignedSplits() == 0) {
            this.context.sendSplitRequest();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public MySqlSplitState initializedState(MySqlSplit mySqlSplit) {
        return mySqlSplit.isSnapshotSplit() ? new MySqlSnapshotSplitState(mySqlSplit.asSnapshotSplit()) : new MySqlBinlogSplitState(mySqlSplit.asBinlogSplit());
    }

    public List<MySqlSplit> snapshotState(long j) {
        List<MySqlSplit> snapshotState = super.snapshotState(j);
        snapshotState.addAll(this.finishedUnackedSplits.values());
        return snapshotState;
    }

    protected void onSplitFinished(Map<String, MySqlSplitState> map) {
        Iterator<MySqlSplitState> it = map.values().iterator();
        while (it.hasNext()) {
            MySqlSplit mySqlSplit = it.next().toMySqlSplit();
            Preconditions.checkState(mySqlSplit.isSnapshotSplit(), String.format("Only snapshot split could finish, but the actual split is binlog split %s", mySqlSplit));
            this.finishedUnackedSplits.put(mySqlSplit.splitId(), mySqlSplit.asSnapshotSplit());
        }
        reportFinishedSnapshotSplitsIfNeed();
        this.context.sendSplitRequest();
    }

    public void addSplits(List<MySqlSplit> list) {
        ArrayList arrayList = new ArrayList();
        for (MySqlSplit mySqlSplit : list) {
            if (mySqlSplit.isSnapshotSplit()) {
                MySqlSnapshotSplit asSnapshotSplit = mySqlSplit.asSnapshotSplit();
                if (asSnapshotSplit.isSnapshotReadFinished()) {
                    this.finishedUnackedSplits.put(asSnapshotSplit.splitId(), asSnapshotSplit);
                } else {
                    arrayList.add(mySqlSplit);
                }
            } else {
                arrayList.add(mySqlSplit);
            }
        }
        reportFinishedSnapshotSplitsIfNeed();
        super.addSplits(arrayList);
    }

    public void handleSourceEvents(SourceEvent sourceEvent) {
        if (!(sourceEvent instanceof FinishedSnapshotSplitsAckEvent)) {
            if (!(sourceEvent instanceof FinishedSnapshotSplitsRequestEvent)) {
                super.handleSourceEvents(sourceEvent);
                return;
            } else {
                LOG.debug("The subtask {} receives request to report finished snapshot splits.", Integer.valueOf(this.subtaskId));
                reportFinishedSnapshotSplitsIfNeed();
                return;
            }
        }
        FinishedSnapshotSplitsAckEvent finishedSnapshotSplitsAckEvent = (FinishedSnapshotSplitsAckEvent) sourceEvent;
        LOG.debug("The subtask {} receives ack event for {} from enumerator.", Integer.valueOf(this.subtaskId), finishedSnapshotSplitsAckEvent.getFinishedSplits());
        Iterator<String> it = finishedSnapshotSplitsAckEvent.getFinishedSplits().iterator();
        while (it.hasNext()) {
            this.finishedUnackedSplits.remove(it.next());
        }
    }

    private void reportFinishedSnapshotSplitsIfNeed() {
        if (this.finishedUnackedSplits.isEmpty()) {
            return;
        }
        HashMap hashMap = new HashMap();
        for (MySqlSnapshotSplit mySqlSnapshotSplit : this.finishedUnackedSplits.values()) {
            hashMap.put(mySqlSnapshotSplit.splitId(), mySqlSnapshotSplit.getHighWatermark());
        }
        this.context.sendSourceEventToCoordinator(new FinishedSnapshotSplitsReportEvent(hashMap));
        LOG.debug("The subtask {} reports offsets of finished snapshot splits {}.", Integer.valueOf(this.subtaskId), hashMap);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public MySqlSplit toSplitType(String str, MySqlSplitState mySqlSplitState) {
        return mySqlSplitState.toMySqlSplit();
    }
}
