package com.ververica.cdc.connectors.mysql.debezium.reader;

import com.ververica.cdc.connectors.mysql.debezium.dispatcher.SignalEventDispatcher;
import com.ververica.cdc.connectors.mysql.debezium.task.MySqlBinlogSplitReadTask;
import com.ververica.cdc.connectors.mysql.debezium.task.MySqlSnapshotSplitReadTask;
import com.ververica.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext;
import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset;
import com.ververica.cdc.connectors.mysql.source.split.MySqlBinlogSplit;
import com.ververica.cdc.connectors.mysql.source.split.MySqlSnapshotSplit;
import com.ververica.cdc.connectors.mysql.source.split.MySqlSplit;
import com.ververica.cdc.connectors.mysql.source.split.SourceRecords;
import com.ververica.cdc.connectors.mysql.source.utils.RecordUtils;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.connector.mysql.MySqlConnectorConfig;
import io.debezium.connector.mysql.MySqlOffsetContext;
import io.debezium.heartbeat.Heartbeat;
import io.debezium.pipeline.DataChangeEvent;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.pipeline.spi.SnapshotResult;
import io.debezium.util.SchemaNameAdjuster;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.shaded.guava30.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/ververica/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.class */
public class SnapshotSplitReader implements DebeziumReader<SourceRecords, MySqlSplit> {
    private static final Logger LOG = LoggerFactory.getLogger(SnapshotSplitReader.class);
    private final StatefulTaskContext statefulTaskContext;
    private final ExecutorService executorService;
    private volatile ChangeEventQueue<DataChangeEvent> queue;
    private volatile Throwable readException;
    private MySqlSnapshotSplitReadTask splitSnapshotReadTask;
    private MySqlSnapshotSplit currentSnapshotSplit;
    private SchemaNameAdjuster nameAdjuster;
    private static final long READER_CLOSE_TIMEOUT = 30;
    private volatile boolean currentTaskRunning = false;
    public AtomicBoolean hasNextElement = new AtomicBoolean(false);
    public AtomicBoolean reachEnd = new AtomicBoolean(false);

    /* loaded from: input_file:com/ververica/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader$SnapshotBinlogSplitChangeEventSourceContextImpl.class */
    public class SnapshotBinlogSplitChangeEventSourceContextImpl implements ChangeEventSource.ChangeEventSourceContext {
        public SnapshotBinlogSplitChangeEventSourceContextImpl() {
        }

        public void finished() {
            SnapshotSplitReader.this.currentTaskRunning = false;
        }

        public boolean isRunning() {
            return SnapshotSplitReader.this.currentTaskRunning;
        }
    }

    /* loaded from: input_file:com/ververica/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader$SnapshotSplitChangeEventSourceContextImpl.class */
    public static class SnapshotSplitChangeEventSourceContextImpl implements ChangeEventSource.ChangeEventSourceContext {
        private BinlogOffset lowWatermark;
        private BinlogOffset highWatermark;

        public BinlogOffset getLowWatermark() {
            return this.lowWatermark;
        }

        public void setLowWatermark(BinlogOffset binlogOffset) {
            this.lowWatermark = binlogOffset;
        }

        public BinlogOffset getHighWatermark() {
            return this.highWatermark;
        }

        public void setHighWatermark(BinlogOffset binlogOffset) {
            this.highWatermark = binlogOffset;
        }

        public boolean isRunning() {
            return (this.lowWatermark == null || this.highWatermark == null) ? false : true;
        }
    }

    public SnapshotSplitReader(StatefulTaskContext statefulTaskContext, int i) {
        this.statefulTaskContext = statefulTaskContext;
        this.executorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("debezium-reader-" + i).setUncaughtExceptionHandler((thread, th) -> {
            setReadException(th);
        }).build());
    }

    @Override // com.ververica.cdc.connectors.mysql.debezium.reader.DebeziumReader
    public void submitSplit(MySqlSplit mySqlSplit) {
        this.currentSnapshotSplit = mySqlSplit.asSnapshotSplit();
        this.statefulTaskContext.configure(this.currentSnapshotSplit);
        this.queue = this.statefulTaskContext.getQueue();
        this.nameAdjuster = this.statefulTaskContext.getSchemaNameAdjuster();
        this.hasNextElement.set(true);
        this.reachEnd.set(false);
        this.splitSnapshotReadTask = new MySqlSnapshotSplitReadTask(this.statefulTaskContext.getConnectorConfig(), this.statefulTaskContext.getSnapshotChangeEventSourceMetrics(), this.statefulTaskContext.getDatabaseSchema(), this.statefulTaskContext.getConnection(), this.statefulTaskContext.getDispatcher(), this.statefulTaskContext.getTopicSelector(), this.statefulTaskContext.getSnapshotReceiver(), StatefulTaskContext.getClock(), this.currentSnapshotSplit);
        this.executorService.execute(() -> {
            try {
                this.currentTaskRunning = true;
                SnapshotSplitChangeEventSourceContextImpl snapshotSplitChangeEventSourceContextImpl = new SnapshotSplitChangeEventSourceContextImpl();
                SnapshotResult<MySqlOffsetContext> execute = this.splitSnapshotReadTask.execute(snapshotSplitChangeEventSourceContextImpl, this.statefulTaskContext.getMySqlPartition(), this.statefulTaskContext.getOffsetContext());
                MySqlBinlogSplit createBackfillBinlogSplit = createBackfillBinlogSplit(snapshotSplitChangeEventSourceContextImpl);
                if (!createBackfillBinlogSplit.getEndingOffset().isAfter(createBackfillBinlogSplit.getStartingOffset())) {
                    dispatchBinlogEndEvent(createBackfillBinlogSplit);
                    this.currentTaskRunning = false;
                } else {
                    if (execute.isCompletedOrSkipped()) {
                        createBackfillBinlogReadTask(createBackfillBinlogSplit).execute(new SnapshotBinlogSplitChangeEventSourceContextImpl(), this.statefulTaskContext.getMySqlPartition(), new MySqlOffsetContext.Loader(this.statefulTaskContext.getConnectorConfig()).load(createBackfillBinlogSplit.getStartingOffset().getOffset()));
                    } else {
                        setReadException(new IllegalStateException(String.format("Read snapshot for mysql split %s fail", this.currentSnapshotSplit)));
                    }
                }
            } catch (Throwable th) {
                setReadException(th);
            }
        });
    }

    private MySqlBinlogSplit createBackfillBinlogSplit(SnapshotSplitChangeEventSourceContextImpl snapshotSplitChangeEventSourceContextImpl) {
        return new MySqlBinlogSplit(this.currentSnapshotSplit.splitId(), snapshotSplitChangeEventSourceContextImpl.getLowWatermark(), snapshotSplitChangeEventSourceContextImpl.getHighWatermark(), new ArrayList(), this.currentSnapshotSplit.getTableSchemas(), 0);
    }

    private MySqlBinlogSplitReadTask createBackfillBinlogReadTask(MySqlBinlogSplit mySqlBinlogSplit) {
        return new MySqlBinlogSplitReadTask(new MySqlConnectorConfig(this.statefulTaskContext.getSourceConfig().getDbzConfiguration().edit().with("table.include.list", this.currentSnapshotSplit.getTableId().toString()).with(Heartbeat.HEARTBEAT_INTERVAL, 0).build()), this.statefulTaskContext.getConnection(), this.statefulTaskContext.getDispatcher(), this.statefulTaskContext.getSignalEventDispatcher(), this.statefulTaskContext.getErrorHandler(), StatefulTaskContext.getClock(), this.statefulTaskContext.getTaskContext(), this.statefulTaskContext.getStreamingChangeEventSourceMetrics(), mySqlBinlogSplit, event -> {
            return true;
        });
    }

    private void dispatchBinlogEndEvent(MySqlBinlogSplit mySqlBinlogSplit) throws InterruptedException {
        new SignalEventDispatcher(this.statefulTaskContext.getOffsetContext().getOffset(), this.statefulTaskContext.getTopicSelector().getPrimaryTopic(), this.statefulTaskContext.getDispatcher().getQueue()).dispatchWatermarkEvent(mySqlBinlogSplit, mySqlBinlogSplit.getEndingOffset(), SignalEventDispatcher.WatermarkKind.BINLOG_END);
    }

    @Override // com.ververica.cdc.connectors.mysql.debezium.reader.DebeziumReader
    public boolean isFinished() {
        return this.currentSnapshotSplit == null || !(this.currentTaskRunning || this.hasNextElement.get() || !this.reachEnd.get());
    }

    @Override // com.ververica.cdc.connectors.mysql.debezium.reader.DebeziumReader
    @Nullable
    public Iterator<SourceRecords> pollSplitRecords() throws InterruptedException {
        checkReadException();
        if (!this.hasNextElement.get()) {
            this.reachEnd.compareAndSet(false, true);
            return null;
        }
        boolean z = false;
        boolean z2 = false;
        SourceRecord sourceRecord = null;
        SourceRecord sourceRecord2 = null;
        HashMap hashMap = new HashMap();
        while (!z2) {
            checkReadException();
            Iterator it = this.queue.poll().iterator();
            while (true) {
                if (it.hasNext()) {
                    SourceRecord record = ((DataChangeEvent) it.next()).getRecord();
                    if (sourceRecord == null) {
                        sourceRecord = record;
                        assertLowWatermark(sourceRecord);
                    } else if (sourceRecord2 != null || !RecordUtils.isHighWatermarkEvent(record)) {
                        if (z && RecordUtils.isEndWatermarkEvent(record)) {
                            z2 = true;
                            break;
                        }
                        if (z) {
                            RecordUtils.upsertBinlog(hashMap, record, this.currentSnapshotSplit.getSplitKeyType(), this.nameAdjuster, this.currentSnapshotSplit.getSplitStart(), this.currentSnapshotSplit.getSplitEnd());
                        } else if (record.key() != null) {
                            hashMap.put((Struct) record.key(), Collections.singletonList(record));
                        } else {
                            ((List) hashMap.computeIfAbsent((Struct) record.value(), struct -> {
                                return new LinkedList();
                            })).add(record);
                        }
                    } else {
                        sourceRecord2 = record;
                        z = true;
                    }
                }
            }
        }
        this.hasNextElement.set(false);
        ArrayList arrayList = new ArrayList();
        arrayList.add(sourceRecord);
        arrayList.addAll(RecordUtils.formatMessageTimestamp((Collection) hashMap.values().stream().flatMap((v0) -> {
            return v0.stream();
        }).collect(Collectors.toList())));
        arrayList.add(sourceRecord2);
        ArrayList arrayList2 = new ArrayList();
        arrayList2.add(new SourceRecords(arrayList));
        return arrayList2.iterator();
    }

    private void checkReadException() {
        if (this.readException != null) {
            throw new FlinkRuntimeException(String.format("Read split %s error due to %s.", this.currentSnapshotSplit, this.readException.getMessage()), this.readException);
        }
    }

    private void assertLowWatermark(SourceRecord sourceRecord) {
        Preconditions.checkState(RecordUtils.isLowWatermarkEvent(sourceRecord), String.format("The first record should be low watermark signal event, but actual is %s", sourceRecord));
    }

    private void setReadException(Throwable th) {
        this.currentTaskRunning = false;
        LOG.error(String.format("Execute snapshot read task for mysql split %s fail", this.currentSnapshotSplit), th);
        if (this.readException == null) {
            this.readException = th;
        } else {
            this.readException.addSuppressed(th);
        }
    }

    @Override // com.ververica.cdc.connectors.mysql.debezium.reader.DebeziumReader
    public void close() {
        try {
            if (this.statefulTaskContext.getConnection() != null) {
                this.statefulTaskContext.getConnection().close();
            }
            if (this.statefulTaskContext.getBinaryLogClient() != null) {
                this.statefulTaskContext.getBinaryLogClient().disconnect();
            }
            if (this.statefulTaskContext.getDatabaseSchema() != null) {
                this.statefulTaskContext.getDatabaseSchema().close();
            }
            if (this.executorService != null) {
                this.executorService.shutdown();
                if (!this.executorService.awaitTermination(READER_CLOSE_TIMEOUT, TimeUnit.SECONDS)) {
                    LOG.warn("Failed to close the snapshot split reader in {} seconds.", Long.valueOf(READER_CLOSE_TIMEOUT));
                }
            }
        } catch (Exception e) {
            LOG.error("Close snapshot reader error", e);
        }
    }

    @VisibleForTesting
    public ExecutorService getExecutorService() {
        return this.executorService;
    }
}
