package com.alibaba.ververica.cdc.debezium.internal;

import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.connector.SnapshotRecord;
import io.debezium.embedded.EmbeddedEngineChangeEvent;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import java.util.ArrayDeque;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import org.apache.flink.annotation.Internal;
import org.apache.flink.core.memory.MemoryUtils;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:com/alibaba/ververica/cdc/debezium/internal/DebeziumChangeConsumer.class */
public class DebeziumChangeConsumer<T> implements DebeziumEngine.ChangeConsumer<ChangeEvent<SourceRecord, SourceRecord>> {
    private static final Logger LOG;
    public static final String LAST_COMPLETELY_PROCESSED_LSN_KEY = "lsn_proc";
    public static final String LAST_COMMIT_LSN_KEY = "lsn_commit";
    private final SourceFunction.SourceContext<T> sourceContext;
    private final Object checkpointLock;
    private final DebeziumDeserializationSchema<T> deserialization;
    private final ErrorReporter errorReporter;
    private final String heartbeatTopicPrefix;
    private boolean isInDbSnapshotPhase;
    private DebeziumEngine.RecordCommitter<ChangeEvent<SourceRecord, SourceRecord>> currentCommitter;
    static final /* synthetic */ boolean $assertionsDisabled;
    private boolean lockHold = false;
    private volatile long messageTimestamp = 0;
    private volatile long processTime = 0;
    private volatile long fetchDelay = 0;
    private volatile long emitDelay = 0;
    private final DebeziumChangeConsumer<T>.DebeziumCollector debeziumCollector = new DebeziumCollector();
    private final DebeziumOffset debeziumOffset = new DebeziumOffset();
    private final DebeziumOffsetSerializer stateSerializer = DebeziumOffsetSerializer.INSTANCE;

    /* loaded from: input_file:com/alibaba/ververica/cdc/debezium/internal/DebeziumChangeConsumer$DebeziumCollector.class */
    private class DebeziumCollector implements Collector<T> {
        private final Queue<T> records;

        private DebeziumCollector() {
            this.records = new ArrayDeque();
        }

        public void collect(T t) {
            this.records.add(t);
        }

        public void close() {
        }
    }

    public DebeziumChangeConsumer(SourceFunction.SourceContext<T> sourceContext, DebeziumDeserializationSchema<T> debeziumDeserializationSchema, boolean z, ErrorReporter errorReporter, String str) {
        this.sourceContext = sourceContext;
        this.checkpointLock = sourceContext.getCheckpointLock();
        this.deserialization = debeziumDeserializationSchema;
        this.isInDbSnapshotPhase = z;
        this.heartbeatTopicPrefix = str;
        this.errorReporter = errorReporter;
    }

    @Override // io.debezium.engine.DebeziumEngine.ChangeConsumer
    public void handleBatch(List<ChangeEvent<SourceRecord, SourceRecord>> list, DebeziumEngine.RecordCommitter<ChangeEvent<SourceRecord, SourceRecord>> recordCommitter) throws InterruptedException {
        this.currentCommitter = recordCommitter;
        this.processTime = System.currentTimeMillis();
        try {
            Iterator<ChangeEvent<SourceRecord, SourceRecord>> it = list.iterator();
            while (it.hasNext()) {
                SourceRecord value = it.next().value();
                updateMessageTimestamp(value);
                this.fetchDelay = this.processTime - this.messageTimestamp;
                if (isHeartbeatEvent(value)) {
                    synchronized (this.checkpointLock) {
                        this.debeziumOffset.setSourcePartition(value.sourcePartition());
                        this.debeziumOffset.setSourceOffset(value.sourceOffset());
                    }
                } else {
                    this.deserialization.deserialize(value, this.debeziumCollector);
                    if (this.isInDbSnapshotPhase) {
                        if (!this.lockHold) {
                            MemoryUtils.UNSAFE.monitorEnter(this.checkpointLock);
                            this.lockHold = true;
                            LOG.info("Database snapshot phase can't perform checkpoint, acquired Checkpoint lock.");
                        }
                        if (!isSnapshotRecord(value)) {
                            MemoryUtils.UNSAFE.monitorExit(this.checkpointLock);
                            this.isInDbSnapshotPhase = false;
                            LOG.info("Received record from streaming binlog phase, released checkpoint lock.");
                        }
                    }
                    emitRecordsUnderCheckpointLock(((DebeziumCollector) this.debeziumCollector).records, value.sourcePartition(), value.sourceOffset());
                }
            }
        } catch (Exception e) {
            LOG.error("Error happens when consuming change messages.", e);
            this.errorReporter.reportError(e);
        }
    }

    private void updateMessageTimestamp(SourceRecord sourceRecord) {
        Long int64;
        Schema valueSchema = sourceRecord.valueSchema();
        Struct struct = (Struct) sourceRecord.value();
        if (valueSchema.field("source") == null) {
            return;
        }
        Struct struct2 = struct.getStruct("source");
        if (struct2.schema().field("ts_ms") == null || (int64 = struct2.getInt64("ts_ms")) == null) {
            return;
        }
        this.messageTimestamp = int64.longValue();
    }

    private boolean isHeartbeatEvent(SourceRecord sourceRecord) {
        String str = sourceRecord.topic();
        return str != null && str.startsWith(this.heartbeatTopicPrefix);
    }

    private boolean isSnapshotRecord(SourceRecord sourceRecord) {
        Struct struct = (Struct) sourceRecord.value();
        if (struct != null) {
            return SnapshotRecord.TRUE == SnapshotRecord.fromSource(struct.getStruct("source"));
        }
        return false;
    }

    private void emitRecordsUnderCheckpointLock(Queue<T> queue, Map<String, ?> map, Map<String, ?> map2) throws InterruptedException {
        if (this.isInDbSnapshotPhase) {
            emitRecords(queue, map, map2);
            return;
        }
        synchronized (this.checkpointLock) {
            emitRecords(queue, map, map2);
        }
    }

    private void emitRecords(Queue<T> queue, Map<String, ?> map, Map<String, ?> map2) {
        long currentTimeMillis = System.currentTimeMillis();
        while (true) {
            T poll = queue.poll();
            if (poll == null) {
                this.debeziumOffset.setSourcePartition(map);
                this.debeziumOffset.setSourceOffset(map2);
                return;
            } else {
                this.emitDelay = currentTimeMillis - this.messageTimestamp;
                this.sourceContext.collect(poll);
            }
        }
    }

    public byte[] snapshotCurrentState() throws Exception {
        if (!$assertionsDisabled && !Thread.holdsLock(this.checkpointLock)) {
            throw new AssertionError();
        }
        if (this.debeziumOffset.sourceOffset == null || this.debeziumOffset.sourcePartition == null) {
            return null;
        }
        return this.stateSerializer.serialize(this.debeziumOffset);
    }

    public void commitOffset(DebeziumOffset debeziumOffset) throws InterruptedException {
        if (this.currentCommitter == null) {
            LOG.info("commitOffset() called on Debezium ChangeConsumer which doesn't receive records yet.");
            return;
        }
        SourceRecord sourceRecord = new SourceRecord(debeziumOffset.sourcePartition, adjustSourceOffset(debeziumOffset.sourceOffset), "DUMMY", Schema.BOOLEAN_SCHEMA, true);
        this.currentCommitter.markProcessed(new EmbeddedEngineChangeEvent(null, sourceRecord, sourceRecord));
        this.currentCommitter.markBatchFinished();
    }

    public long getFetchDelay() {
        return this.fetchDelay;
    }

    public long getEmitDelay() {
        return this.emitDelay;
    }

    public long getIdleTime() {
        return System.currentTimeMillis() - this.processTime;
    }

    private Map<String, Object> adjustSourceOffset(Map<String, Object> map) {
        if (map.containsKey(LAST_COMPLETELY_PROCESSED_LSN_KEY)) {
            map.put(LAST_COMPLETELY_PROCESSED_LSN_KEY, Long.valueOf(Long.parseLong(map.get(LAST_COMPLETELY_PROCESSED_LSN_KEY).toString())));
        }
        if (map.containsKey(LAST_COMMIT_LSN_KEY)) {
            map.put(LAST_COMMIT_LSN_KEY, Long.valueOf(Long.parseLong(map.get(LAST_COMMIT_LSN_KEY).toString())));
        }
        return map;
    }

    static {
        $assertionsDisabled = !DebeziumChangeConsumer.class.desiredAssertionStatus();
        LOG = LoggerFactory.getLogger(DebeziumChangeConsumer.class);
    }
}
