package org.apache.flink.cdc.connectors.base.source.reader;

import io.debezium.relational.TableId;
import io.debezium.relational.history.TableChanges;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.cdc.common.annotation.Experimental;
import org.apache.flink.cdc.common.annotation.VisibleForTesting;
import org.apache.flink.cdc.connectors.base.config.SourceConfig;
import org.apache.flink.cdc.connectors.base.dialect.DataSourceDialect;
import org.apache.flink.cdc.connectors.base.source.meta.events.FinishedSnapshotSplitsAckEvent;
import org.apache.flink.cdc.connectors.base.source.meta.events.FinishedSnapshotSplitsReportEvent;
import org.apache.flink.cdc.connectors.base.source.meta.events.FinishedSnapshotSplitsRequestEvent;
import org.apache.flink.cdc.connectors.base.source.meta.events.LatestFinishedSplitsNumberEvent;
import org.apache.flink.cdc.connectors.base.source.meta.events.LatestFinishedSplitsNumberRequestEvent;
import org.apache.flink.cdc.connectors.base.source.meta.events.StreamSplitAssignedEvent;
import org.apache.flink.cdc.connectors.base.source.meta.events.StreamSplitMetaEvent;
import org.apache.flink.cdc.connectors.base.source.meta.events.StreamSplitMetaRequestEvent;
import org.apache.flink.cdc.connectors.base.source.meta.events.StreamSplitUpdateAckEvent;
import org.apache.flink.cdc.connectors.base.source.meta.events.StreamSplitUpdateRequestEvent;
import org.apache.flink.cdc.connectors.base.source.meta.split.FinishedSnapshotSplitInfo;
import org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit;
import org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplitState;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceRecords;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitSerializer;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitState;
import org.apache.flink.cdc.connectors.base.source.meta.split.StreamSplit;
import org.apache.flink.cdc.connectors.base.source.meta.split.StreamSplitState;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.RecordEmitter;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase;
import org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Experimental
/* loaded from: input_file:org/apache/flink/cdc/connectors/base/source/reader/IncrementalSourceReader.class */
public class IncrementalSourceReader<T, C extends SourceConfig> extends SingleThreadMultiplexSourceReaderBase<SourceRecords, T, SourceSplitBase, SourceSplitState> {
    private static final Logger LOG = LoggerFactory.getLogger(IncrementalSourceReader.class);
    private final Map<String, SnapshotSplit> finishedUnackedSplits;
    protected final Map<String, StreamSplit> uncompletedStreamSplits;
    protected volatile StreamSplit suspendedStreamSplit;
    private final int subtaskId;
    private final SourceSplitSerializer sourceSplitSerializer;
    protected final C sourceConfig;
    protected final DataSourceDialect<C> dialect;
    private final IncrementalSourceReaderContext incrementalSourceReaderContext;

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public IncrementalSourceReader(FutureCompletingBlockingQueue<RecordsWithSplitIds<SourceRecords>> futureCompletingBlockingQueue, Supplier<IncrementalSourceSplitReader<C>> supplier, RecordEmitter<SourceRecords, T, SourceSplitState> recordEmitter, Configuration configuration, IncrementalSourceReaderContext incrementalSourceReaderContext, C c, SourceSplitSerializer sourceSplitSerializer, DataSourceDialect<C> dataSourceDialect) {
        super(futureCompletingBlockingQueue, new SingleThreadFetcherManager(futureCompletingBlockingQueue, supplier::get), recordEmitter, configuration, incrementalSourceReaderContext.getSourceReaderContext());
        supplier.getClass();
        this.sourceConfig = c;
        this.finishedUnackedSplits = new HashMap();
        this.uncompletedStreamSplits = new HashMap();
        this.subtaskId = this.context.getIndexOfSubtask();
        this.sourceSplitSerializer = (SourceSplitSerializer) Preconditions.checkNotNull(sourceSplitSerializer);
        this.dialect = dataSourceDialect;
        this.incrementalSourceReaderContext = incrementalSourceReaderContext;
        this.suspendedStreamSplit = null;
    }

    public void start() {
        if (getNumberOfCurrentlyAssignedSplits() <= 1) {
            this.context.sendSplitRequest();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public SourceSplitState initializedState(SourceSplitBase sourceSplitBase) {
        return sourceSplitBase.isSnapshotSplit() ? new SnapshotSplitState(sourceSplitBase.asSnapshotSplit()) : new StreamSplitState(sourceSplitBase.asStreamSplit());
    }

    public List<SourceSplitBase> snapshotState(long j) {
        List<SourceSplitBase> list = (List) super.snapshotState(j).stream().filter(sourceSplitBase -> {
            return !this.finishedUnackedSplits.containsKey(sourceSplitBase.splitId());
        }).collect(Collectors.toList());
        list.addAll(this.finishedUnackedSplits.values());
        list.addAll(this.uncompletedStreamSplits.values());
        if (this.suspendedStreamSplit != null) {
            list.add(this.suspendedStreamSplit);
        }
        logCurrentStreamOffsets(list, j);
        return list;
    }

    protected void onSplitFinished(Map<String, SourceSplitState> map) {
        boolean z = true;
        if (isNewlyAddedTableSplitAndStreamSplit(map)) {
            SourceSplitState remove = map.remove(StreamSplit.STREAM_SPLIT_ID);
            map.values().forEach(sourceSplitState -> {
                this.finishedUnackedSplits.put(sourceSplitState.toSourceSplit().splitId(), sourceSplitState.asSnapshotSplitState().toSourceSplit());
            });
            Preconditions.checkState(map.values().size() == 1);
            LOG.info("Source reader {} finished stream split and snapshot split {}", Integer.valueOf(this.subtaskId), map.values().iterator().next().toSourceSplit().splitId());
            addSplits(Collections.singletonList(remove.toSourceSplit()));
        } else {
            Preconditions.checkState(map.size() == 1);
            Iterator<SourceSplitState> it = map.values().iterator();
            while (it.hasNext()) {
                SourceSplitBase sourceSplit = it.next().toSourceSplit();
                if (!sourceSplit.isStreamSplit()) {
                    this.finishedUnackedSplits.put(sourceSplit.splitId(), sourceSplit.asSnapshotSplit());
                } else if (this.incrementalSourceReaderContext.isStreamSplitReaderSuspended()) {
                    this.suspendedStreamSplit = StreamSplit.toSuspendedStreamSplit(sourceSplit.asStreamSplit());
                    LOG.info("Source reader {} suspended stream split reader success after the newly added table process, current offset {}", Integer.valueOf(this.subtaskId), this.suspendedStreamSplit.getStartingOffset());
                    this.context.sendSourceEventToCoordinator(new LatestFinishedSplitsNumberRequestEvent());
                    z = false;
                }
            }
            reportFinishedSnapshotSplitsIfNeed();
        }
        if (z) {
            this.context.sendSplitRequest();
        }
    }

    private boolean isNewlyAddedTableSplitAndStreamSplit(Map<String, SourceSplitState> map) {
        return map.containsKey(StreamSplit.STREAM_SPLIT_ID) && map.size() == 2;
    }

    public void addSplits(List<SourceSplitBase> list) {
        addSplits(list, true);
    }

    private void addSplits(List<SourceSplitBase> list, boolean z) {
        ArrayList arrayList = new ArrayList();
        for (SourceSplitBase sourceSplitBase : list) {
            if (sourceSplitBase.isSnapshotSplit()) {
                SnapshotSplit asSnapshotSplit = sourceSplitBase.asSnapshotSplit();
                if (!this.dialect.isIncludeDataCollection(this.sourceConfig, asSnapshotSplit.getTableId())) {
                    LOG.info("The subtask {} is skipping split {} because it does not match new table filter.", Integer.valueOf(this.subtaskId), sourceSplitBase.splitId());
                } else if (asSnapshotSplit.isSnapshotReadFinished()) {
                    this.finishedUnackedSplits.put(asSnapshotSplit.splitId(), asSnapshotSplit);
                } else {
                    arrayList.add(sourceSplitBase);
                }
            } else {
                StreamSplit asStreamSplit = sourceSplitBase.asStreamSplit();
                if (z) {
                    LOG.info("before checkTableChangeForStreamSplit: " + asStreamSplit);
                    asStreamSplit = StreamSplit.filterOutdatedSplitInfos(asStreamSplit, tableId -> {
                        return this.dialect.isIncludeDataCollection(this.sourceConfig, tableId);
                    });
                    LOG.info("after checkTableChangeForStreamSplit: " + asStreamSplit);
                }
                boolean z2 = !this.incrementalSourceReaderContext.isHasAssignedStreamSplit() && this.sourceConfig.isScanNewlyAddedTableEnabled();
                this.incrementalSourceReaderContext.setHasAssignedStreamSplit(true);
                if (asStreamSplit.isSuspended()) {
                    this.suspendedStreamSplit = asStreamSplit;
                } else if (asStreamSplit.isCompletedSplit()) {
                    this.uncompletedStreamSplits.remove(sourceSplitBase.splitId());
                    asStreamSplit = discoverTableSchemasForStreamSplit(asStreamSplit, z2);
                    arrayList.add(asStreamSplit);
                } else {
                    this.uncompletedStreamSplits.put(sourceSplitBase.splitId(), sourceSplitBase.asStreamSplit());
                    requestStreamSplitMetaIfNeeded(sourceSplitBase.asStreamSplit());
                }
                LOG.info("Source reader {} received the stream split : {}.", Integer.valueOf(this.subtaskId), asStreamSplit);
                this.context.sendSourceEventToCoordinator(new StreamSplitAssignedEvent());
            }
        }
        reportFinishedSnapshotSplitsIfNeed();
        if (!arrayList.isEmpty()) {
            super.addSplits(arrayList);
        } else if (this.suspendedStreamSplit != null || getNumberOfCurrentlyAssignedSplits() <= 1) {
            this.context.sendSplitRequest();
        }
    }

    private StreamSplit discoverTableSchemasForStreamSplit(StreamSplit streamSplit, boolean z) {
        String splitId = streamSplit.splitId();
        if (!streamSplit.getTableSchemas().isEmpty() && !z) {
            LOG.warn("Source reader {} skip the table schema discovery, the stream split {} has table schemas yet.", Integer.valueOf(this.subtaskId), streamSplit);
            return streamSplit;
        }
        try {
            Map<? extends TableId, ? extends TableChanges.TableChange> tableSchemas = streamSplit.getTableSchemas();
            Map<TableId, TableChanges.TableChange> discoverDataCollectionSchemas = this.dialect.discoverDataCollectionSchemas(this.sourceConfig);
            discoverDataCollectionSchemas.putAll(tableSchemas);
            LOG.info("Source reader {} discovers table schema for stream split {} success", Integer.valueOf(this.subtaskId), splitId);
            return StreamSplit.fillTableSchemas(streamSplit, discoverDataCollectionSchemas);
        } catch (Exception e) {
            LOG.error("Source reader {} failed to obtains table schemas due to {}", Integer.valueOf(this.subtaskId), e.getMessage());
            throw new FlinkRuntimeException(e);
        }
    }

    private Set<String> getExistedSplitsOfLastGroup(List<FinishedSnapshotSplitInfo> list, int i) {
        int size = list.size() % this.sourceConfig.getSplitMetaGroupSize();
        if (size == 0) {
            return new HashSet();
        }
        int size2 = (list.size() / this.sourceConfig.getSplitMetaGroupSize()) * i;
        return new HashSet(((List) list.stream().map((v0) -> {
            return v0.getSplitId();
        }).sorted().collect(Collectors.toList())).subList(size2, size2 + size));
    }

    public void handleSourceEvents(SourceEvent sourceEvent) {
        if (sourceEvent instanceof FinishedSnapshotSplitsAckEvent) {
            FinishedSnapshotSplitsAckEvent finishedSnapshotSplitsAckEvent = (FinishedSnapshotSplitsAckEvent) sourceEvent;
            LOG.debug("The subtask {} receives ack event for {} from enumerator.", Integer.valueOf(this.subtaskId), finishedSnapshotSplitsAckEvent.getFinishedSplits());
            Iterator<String> it = finishedSnapshotSplitsAckEvent.getFinishedSplits().iterator();
            while (it.hasNext()) {
                this.finishedUnackedSplits.remove(it.next());
            }
            return;
        }
        if (sourceEvent instanceof FinishedSnapshotSplitsRequestEvent) {
            LOG.debug("The subtask {} receives request to report finished snapshot splits.", Integer.valueOf(this.subtaskId));
            reportFinishedSnapshotSplitsIfNeed();
            return;
        }
        if (sourceEvent instanceof StreamSplitMetaEvent) {
            LOG.debug("The subtask {} receives stream meta with group id {}.", Integer.valueOf(this.subtaskId), Integer.valueOf(((StreamSplitMetaEvent) sourceEvent).getMetaGroupId()));
            fillMetaDataForStreamSplit((StreamSplitMetaEvent) sourceEvent);
        } else if (sourceEvent instanceof StreamSplitUpdateRequestEvent) {
            suspendStreamSplitReader();
        } else if (sourceEvent instanceof LatestFinishedSplitsNumberEvent) {
            updateStreamSplitFinishedSplitsSize((LatestFinishedSplitsNumberEvent) sourceEvent);
        } else {
            super.handleSourceEvents(sourceEvent);
        }
    }

    private void suspendStreamSplitReader() {
        this.incrementalSourceReaderContext.suspendStreamSplitReader();
    }

    private void fillMetaDataForStreamSplit(StreamSplitMetaEvent streamSplitMetaEvent) {
        StreamSplit streamSplit = this.uncompletedStreamSplits.get(streamSplitMetaEvent.getSplitId());
        if (streamSplit == null) {
            LOG.warn("Received metadata event for split {}, but the uncompleted split map does not contain it", streamSplitMetaEvent.getSplitId());
            return;
        }
        int metaGroupId = streamSplitMetaEvent.getMetaGroupId();
        int totalFinishedSplitSize = streamSplitMetaEvent.getTotalFinishedSplitSize();
        int nextMetaGroupId = getNextMetaGroupId(streamSplit.getFinishedSnapshotSplitInfos().size(), this.sourceConfig.getSplitMetaGroupSize());
        if (totalFinishedSplitSize < streamSplit.getTotalFinishedSplitSize()) {
            LOG.warn("Source reader {} receives out of bound finished split size. The received finished split size is {}, but expected is {}, truncate it", new Object[]{Integer.valueOf(this.subtaskId), Integer.valueOf(totalFinishedSplitSize), Integer.valueOf(streamSplit.getTotalFinishedSplitSize())});
            streamSplit = StreamSplit.toNormalStreamSplit(streamSplit, totalFinishedSplitSize);
            this.uncompletedStreamSplits.put(streamSplit.splitId(), streamSplit);
        } else if (metaGroupId == nextMetaGroupId) {
            Set<String> existedSplitsOfLastGroup = getExistedSplitsOfLastGroup(streamSplit.getFinishedSnapshotSplitInfos(), this.sourceConfig.getSplitMetaGroupSize());
            Stream<byte[]> stream = streamSplitMetaEvent.getMetaGroup().stream();
            SourceSplitSerializer sourceSplitSerializer = this.sourceSplitSerializer;
            sourceSplitSerializer.getClass();
            List list = (List) stream.map(sourceSplitSerializer::deserialize).filter(finishedSnapshotSplitInfo -> {
                return !existedSplitsOfLastGroup.contains(finishedSnapshotSplitInfo.getSplitId());
            }).collect(Collectors.toList());
            this.uncompletedStreamSplits.put(streamSplit.splitId(), StreamSplit.appendFinishedSplitInfos(streamSplit, list));
            LOG.info("Fill metadata of group {} to stream split", Integer.valueOf(list.size()));
        } else {
            LOG.warn("Received out of oder metadata event for split {}, the received meta group id is {}, but expected is {}, ignore it", new Object[]{streamSplitMetaEvent.getSplitId(), Integer.valueOf(metaGroupId), Integer.valueOf(nextMetaGroupId)});
        }
        requestStreamSplitMetaIfNeeded(this.uncompletedStreamSplits.get(streamSplit.splitId()));
    }

    private void requestStreamSplitMetaIfNeeded(StreamSplit streamSplit) {
        String splitId = streamSplit.splitId();
        if (streamSplit.isCompletedSplit()) {
            LOG.info("The meta of stream split {} has been collected success", splitId);
            addSplits(Collections.singletonList(streamSplit));
        } else {
            this.context.sendSourceEventToCoordinator(new StreamSplitMetaRequestEvent(splitId, getNextMetaGroupId(streamSplit.getFinishedSnapshotSplitInfos().size(), this.sourceConfig.getSplitMetaGroupSize()), streamSplit.getTotalFinishedSplitSize()));
        }
    }

    protected void updateStreamSplitFinishedSplitsSize(LatestFinishedSplitsNumberEvent latestFinishedSplitsNumberEvent) {
        if (this.suspendedStreamSplit == null) {
            LOG.warn("Unexpected event {}, this should not happen.", latestFinishedSplitsNumberEvent);
            return;
        }
        StreamSplit normalStreamSplit = StreamSplit.toNormalStreamSplit(this.suspendedStreamSplit, latestFinishedSplitsNumberEvent.getLatestFinishedSplitsNumber());
        this.suspendedStreamSplit = null;
        addSplits(Collections.singletonList(normalStreamSplit), false);
        this.context.sendSourceEventToCoordinator(new StreamSplitUpdateAckEvent());
        LOG.info("Source reader {} notifies enumerator that stream split has been updated.", Integer.valueOf(this.subtaskId));
        this.incrementalSourceReaderContext.wakeupSuspendedStreamSplitReader();
        LOG.info("Source reader {} wakes up suspended stream reader as stream split has been updated.", Integer.valueOf(this.subtaskId));
    }

    private void reportFinishedSnapshotSplitsIfNeed() {
        if (this.finishedUnackedSplits.isEmpty()) {
            return;
        }
        HashMap hashMap = new HashMap();
        for (SnapshotSplit snapshotSplit : this.finishedUnackedSplits.values()) {
            hashMap.put(snapshotSplit.splitId(), snapshotSplit.getHighWatermark());
        }
        this.context.sendSourceEventToCoordinator(new FinishedSnapshotSplitsReportEvent(hashMap));
        LOG.debug("The subtask {} reports offsets of finished snapshot splits {}.", Integer.valueOf(this.subtaskId), hashMap);
    }

    public static int getNextMetaGroupId(int i, int i2) {
        Preconditions.checkState(i2 > 0);
        return i / i2;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public SourceSplitBase toSplitType(String str, SourceSplitState sourceSplitState) {
        return sourceSplitState.toSourceSplit();
    }

    private void logCurrentStreamOffsets(List<SourceSplitBase> list, long j) {
        if (LOG.isInfoEnabled()) {
            for (SourceSplitBase sourceSplitBase : list) {
                if (!sourceSplitBase.isStreamSplit()) {
                    return;
                }
                LOG.info("Stream split offset on checkpoint {}: {}", Long.valueOf(j), sourceSplitBase.asStreamSplit().getStartingOffset());
            }
        }
    }

    @VisibleForTesting
    public Map<String, SnapshotSplit> getFinishedUnackedSplits() {
        return this.finishedUnackedSplits;
    }
}
