/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.ververica.cdc.debezium.internal;

import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.alibaba.ververica.cdc.debezium.internal.DebeziumState;
import com.alibaba.ververica.cdc.debezium.internal.DebeziumStateSerializer;
import com.alibaba.ververica.cdc.debezium.internal.ErrorReporter;
import io.debezium.connector.SnapshotRecord;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import java.util.ArrayDeque;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import org.apache.flink.annotation.Internal;
import org.apache.flink.core.memory.MemoryUtils;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class DebeziumChangeConsumer<T>
implements DebeziumEngine.ChangeConsumer<ChangeEvent<SourceRecord, SourceRecord>> {
    private static final Logger LOG = LoggerFactory.getLogger(DebeziumChangeConsumer.class);
    private final SourceFunction.SourceContext<T> sourceContext;
    private final Object checkpointLock;
    private final DebeziumDeserializationSchema<T> deserialization;
    private final DebeziumCollector debeziumCollector;
    private final ErrorReporter errorReporter;
    private final DebeziumState debeziumState;
    private final DebeziumStateSerializer stateSerializer;
    private boolean isInDbSnapshotPhase;
    private boolean lockHold = false;

    public DebeziumChangeConsumer(SourceFunction.SourceContext<T> sourceContext, DebeziumDeserializationSchema<T> deserialization, boolean isInDbSnapshotPhase, ErrorReporter errorReporter) {
        this.sourceContext = sourceContext;
        this.checkpointLock = sourceContext.getCheckpointLock();
        this.deserialization = deserialization;
        this.isInDbSnapshotPhase = isInDbSnapshotPhase;
        this.debeziumCollector = new DebeziumCollector();
        this.errorReporter = errorReporter;
        this.debeziumState = new DebeziumState();
        this.stateSerializer = new DebeziumStateSerializer();
    }

    @Override
    public void handleBatch(List<ChangeEvent<SourceRecord, SourceRecord>> changeEvents, DebeziumEngine.RecordCommitter<ChangeEvent<SourceRecord, SourceRecord>> committer) throws InterruptedException {
        try {
            for (ChangeEvent<SourceRecord, SourceRecord> event : changeEvents) {
                SourceRecord record = event.value();
                this.deserialization.deserialize(record, this.debeziumCollector);
                if (this.isInDbSnapshotPhase) {
                    if (!this.lockHold) {
                        MemoryUtils.UNSAFE.monitorEnter(this.checkpointLock);
                        this.lockHold = true;
                        LOG.info("Database snapshot phase can't perform checkpoint, acquired Checkpoint lock.");
                    }
                    if (!this.isSnapshotRecord(record)) {
                        MemoryUtils.UNSAFE.monitorExit(this.checkpointLock);
                        this.isInDbSnapshotPhase = false;
                        LOG.info("Received record from streaming binlog phase, released checkpoint lock.");
                    }
                }
                this.emitRecordsUnderCheckpointLock(this.debeziumCollector.records, record.sourcePartition(), record.sourceOffset());
            }
        }
        catch (Exception e) {
            LOG.error("Error happens when consuming change messages.", (Throwable)e);
            this.errorReporter.reportError(e);
        }
    }

    private boolean isSnapshotRecord(SourceRecord record) {
        Struct value = (Struct)record.value();
        if (value != null) {
            Struct source = value.getStruct("source");
            SnapshotRecord snapshotRecord = SnapshotRecord.fromSource(source);
            return SnapshotRecord.TRUE == snapshotRecord;
        }
        return false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void emitRecordsUnderCheckpointLock(Queue<T> records, Map<String, ?> sourcePartition, Map<String, ?> sourceOffset) throws InterruptedException {
        if (this.isInDbSnapshotPhase) {
            this.emitRecords(records, sourcePartition, sourceOffset);
        } else {
            Object object = this.checkpointLock;
            synchronized (object) {
                this.emitRecords(records, sourcePartition, sourceOffset);
            }
        }
    }

    private void emitRecords(Queue<T> records, Map<String, ?> sourcePartition, Map<String, ?> sourceOffset) {
        T record;
        while ((record = records.poll()) != null) {
            this.sourceContext.collect(record);
        }
        this.debeziumState.setSourcePartition(sourcePartition);
        this.debeziumState.setSourceOffset(sourceOffset);
    }

    public byte[] snapshotCurrentState() throws Exception {
        assert (Thread.holdsLock(this.checkpointLock));
        if (this.debeziumState.sourceOffset == null || this.debeziumState.sourcePartition == null) {
            return null;
        }
        return this.stateSerializer.serialize(this.debeziumState);
    }

    private class DebeziumCollector
    implements Collector<T> {
        private final Queue<T> records = new ArrayDeque();

        private DebeziumCollector() {
        }

        public void collect(T record) {
            this.records.add(record);
        }

        public void close() {
        }
    }
}

