package com.ververica.cdc.debezium;

import com.ververica.cdc.debezium.internal.DebeziumChangeConsumer;
import com.ververica.cdc.debezium.internal.DebeziumChangeFetcher;
import com.ververica.cdc.debezium.internal.DebeziumOffset;
import com.ververica.cdc.debezium.internal.DebeziumOffsetSerializer;
import com.ververica.cdc.debezium.internal.FlinkDatabaseHistory;
import com.ververica.cdc.debezium.internal.FlinkDatabaseSchemaHistory;
import com.ververica.cdc.debezium.internal.FlinkOffsetBackingStore;
import com.ververica.cdc.debezium.internal.Handover;
import com.ververica.cdc.debezium.internal.SchemaRecord;
import com.ververica.cdc.debezium.utils.DatabaseHistoryUtil;
import io.debezium.document.DocumentReader;
import io.debezium.document.DocumentWriter;
import io.debezium.embedded.Connect;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.spi.OffsetCommitPolicy;
import io.debezium.heartbeat.Heartbeat;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.Iterator;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.commons.collections.map.LinkedMap;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@PublicEvolving
/* loaded from: input_file:com/ververica/cdc/debezium/DebeziumSourceFunction.class */
public class DebeziumSourceFunction<T> extends RichSourceFunction<T> implements CheckpointedFunction, CheckpointListener, ResultTypeQueryable<T> {
    private static final long serialVersionUID = -5808108641062931623L;
    protected static final Logger LOG = LoggerFactory.getLogger(DebeziumSourceFunction.class);
    public static final String OFFSETS_STATE_NAME = "offset-states";
    public static final String HISTORY_RECORDS_STATE_NAME = "history-records-states";
    public static final int MAX_NUM_PENDING_CHECKPOINTS = 100;
    public static final String LEGACY_IMPLEMENTATION_KEY = "internal.implementation";
    public static final String LEGACY_IMPLEMENTATION_VALUE = "legacy";
    private final DebeziumDeserializationSchema<T> deserializer;
    private final Properties properties;

    @Nullable
    private final DebeziumOffset specificOffset;
    private final LinkedMap pendingOffsetsToCommit = new LinkedMap();
    private volatile boolean debeziumStarted = false;
    private final Validator validator;
    private volatile transient String restoredOffsetState;
    private transient ListState<byte[]> offsetState;
    private transient ListState<String> schemaRecordsState;
    private transient ExecutorService executor;
    private transient DebeziumEngine<?> engine;
    private transient String engineInstanceName;
    private transient DebeziumChangeConsumer changeConsumer;
    private transient DebeziumChangeFetcher<T> debeziumChangeFetcher;
    private transient Handover handover;

    public DebeziumSourceFunction(DebeziumDeserializationSchema<T> debeziumDeserializationSchema, Properties properties, @Nullable DebeziumOffset debeziumOffset, Validator validator) {
        this.deserializer = debeziumDeserializationSchema;
        this.properties = properties;
        this.specificOffset = debeziumOffset;
        this.validator = validator;
    }

    public void open(Configuration configuration) throws Exception {
        this.validator.validate();
        super.open(configuration);
        this.executor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("debezium-engine").build());
        this.handover = new Handover();
        this.changeConsumer = new DebeziumChangeConsumer(this.handover);
    }

    public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
        OperatorStateStore operatorStateStore = functionInitializationContext.getOperatorStateStore();
        this.offsetState = operatorStateStore.getUnionListState(new ListStateDescriptor(OFFSETS_STATE_NAME, PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO));
        this.schemaRecordsState = operatorStateStore.getUnionListState(new ListStateDescriptor(HISTORY_RECORDS_STATE_NAME, BasicTypeInfo.STRING_TYPE_INFO));
        if (functionInitializationContext.isRestored()) {
            restoreOffsetState();
            restoreHistoryRecordsState();
        } else if (this.specificOffset == null) {
            LOG.info("Consumer subtask {} has no restore state.", Integer.valueOf(getRuntimeContext().getIndexOfThisSubtask()));
        } else {
            this.restoredOffsetState = new String(DebeziumOffsetSerializer.INSTANCE.serialize(this.specificOffset), StandardCharsets.UTF_8);
            LOG.info("Consumer subtask {} starts to read from specified offset {}.", Integer.valueOf(getRuntimeContext().getIndexOfThisSubtask()), this.restoredOffsetState);
        }
    }

    private void restoreOffsetState() throws Exception {
        for (byte[] bArr : (Iterable) this.offsetState.get()) {
            if (this.restoredOffsetState != null) {
                throw new RuntimeException("Debezium Source only support single task, however, this is restored from multiple tasks.");
            }
            this.restoredOffsetState = new String(bArr, StandardCharsets.UTF_8);
        }
        LOG.info("Consumer subtask {} restored offset state: {}.", Integer.valueOf(getRuntimeContext().getIndexOfThisSubtask()), this.restoredOffsetState);
    }

    private void restoreHistoryRecordsState() throws Exception {
        DocumentReader defaultReader = DocumentReader.defaultReader();
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        int i = 0;
        boolean z = true;
        for (String str : (Iterable) this.schemaRecordsState.get()) {
            if (z) {
                this.engineInstanceName = str;
                z = false;
            } else {
                concurrentLinkedQueue.add(new SchemaRecord(defaultReader.read(str)));
                i++;
            }
        }
        if (this.engineInstanceName != null) {
            DatabaseHistoryUtil.registerHistory(this.engineInstanceName, concurrentLinkedQueue);
        }
        LOG.info("Consumer subtask {} restored history records state: {} with {} records.", new Object[]{Integer.valueOf(getRuntimeContext().getIndexOfThisSubtask()), this.engineInstanceName, Integer.valueOf(i)});
    }

    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
        if (this.handover.hasError()) {
            LOG.debug("snapshotState() called on closed source");
            throw new FlinkRuntimeException("Call snapshotState() on closed source, checkpoint failed.");
        }
        snapshotOffsetState(functionSnapshotContext.getCheckpointId());
        snapshotHistoryRecordsState();
    }

    private void snapshotOffsetState(long j) throws Exception {
        this.offsetState.clear();
        DebeziumChangeFetcher<T> debeziumChangeFetcher = this.debeziumChangeFetcher;
        byte[] bArr = null;
        if (debeziumChangeFetcher != null) {
            byte[] snapshotCurrentState = debeziumChangeFetcher.snapshotCurrentState();
            bArr = (snapshotCurrentState != null || this.restoredOffsetState == null) ? snapshotCurrentState : this.restoredOffsetState.getBytes(StandardCharsets.UTF_8);
        } else if (this.restoredOffsetState != null) {
            bArr = this.restoredOffsetState.getBytes(StandardCharsets.UTF_8);
        }
        if (bArr != null) {
            this.offsetState.add(bArr);
            this.pendingOffsetsToCommit.put(Long.valueOf(j), bArr);
            while (this.pendingOffsetsToCommit.size() > 100) {
                this.pendingOffsetsToCommit.remove(0);
            }
        }
    }

    private void snapshotHistoryRecordsState() throws Exception {
        this.schemaRecordsState.clear();
        if (this.engineInstanceName != null) {
            this.schemaRecordsState.add(this.engineInstanceName);
            Collection<SchemaRecord> retrieveHistory = DatabaseHistoryUtil.retrieveHistory(this.engineInstanceName);
            DocumentWriter defaultWriter = DocumentWriter.defaultWriter();
            Iterator<SchemaRecord> it = retrieveHistory.iterator();
            while (it.hasNext()) {
                this.schemaRecordsState.add(defaultWriter.write(it.next().toDocument()));
            }
        }
    }

    public void run(SourceFunction.SourceContext<T> sourceContext) throws Exception {
        this.properties.setProperty("name", "engine");
        this.properties.setProperty("offset.storage", FlinkOffsetBackingStore.class.getCanonicalName());
        if (this.restoredOffsetState != null) {
            this.properties.setProperty(FlinkOffsetBackingStore.OFFSET_STATE_VALUE, this.restoredOffsetState);
        }
        this.properties.setProperty("key.converter.schemas.enable", "false");
        this.properties.setProperty("value.converter.schemas.enable", "false");
        this.properties.setProperty("include.schema.changes", "false");
        this.properties.setProperty("offset.flush.interval.ms", String.valueOf(Long.MAX_VALUE));
        this.properties.setProperty("tombstones.on.delete", "false");
        if (this.engineInstanceName == null) {
            this.engineInstanceName = UUID.randomUUID().toString();
        }
        this.properties.setProperty("database.history.instance.name", this.engineInstanceName);
        this.properties.setProperty("database.history", determineDatabase().getCanonicalName());
        this.debeziumChangeFetcher = new DebeziumChangeFetcher<>(sourceContext, this.deserializer, this.restoredOffsetState == null, this.properties.getProperty(Heartbeat.HEARTBEAT_TOPICS_PREFIX.name(), Heartbeat.HEARTBEAT_TOPICS_PREFIX.defaultValueAsString()), this.handover);
        this.engine = DebeziumEngine.create(Connect.class).using2(this.properties).notifying2(this.changeConsumer).using2(OffsetCommitPolicy.always()).using2((z, str, th) -> {
            if (z) {
                this.handover.close();
            } else {
                this.handover.reportError(th);
            }
        }).build2();
        this.executor.execute(this.engine);
        this.debeziumStarted = true;
        MetricGroup metricGroup = getRuntimeContext().getMetricGroup();
        metricGroup.gauge("currentFetchEventTimeLag", () -> {
            return Long.valueOf(this.debeziumChangeFetcher.getFetchDelay());
        });
        metricGroup.gauge("currentEmitEventTimeLag", () -> {
            return Long.valueOf(this.debeziumChangeFetcher.getEmitDelay());
        });
        metricGroup.gauge("sourceIdleTime", () -> {
            return Long.valueOf(this.debeziumChangeFetcher.getIdleTime());
        });
        this.debeziumChangeFetcher.runFetchLoop();
    }

    public void notifyCheckpointComplete(long j) {
        if (!this.debeziumStarted) {
            LOG.debug("notifyCheckpointComplete() called when engine is not started.");
            return;
        }
        if (this.debeziumChangeFetcher == null) {
            LOG.debug("notifyCheckpointComplete() called on uninitialized source");
            return;
        }
        try {
            int indexOf = this.pendingOffsetsToCommit.indexOf(Long.valueOf(j));
            if (indexOf == -1) {
                LOG.warn("Consumer subtask {} received confirmation for unknown checkpoint id {}", Integer.valueOf(getRuntimeContext().getIndexOfThisSubtask()), Long.valueOf(j));
                return;
            }
            byte[] bArr = (byte[]) this.pendingOffsetsToCommit.remove(indexOf);
            for (int i = 0; i < indexOf; i++) {
                this.pendingOffsetsToCommit.remove(0);
            }
            if (bArr == null || bArr.length == 0) {
                LOG.debug("Consumer subtask {} has empty checkpoint state.", Integer.valueOf(getRuntimeContext().getIndexOfThisSubtask()));
            } else {
                this.changeConsumer.commitOffset(DebeziumOffsetSerializer.INSTANCE.deserialize(bArr));
            }
        } catch (Exception e) {
            LOG.warn("Ignore error when committing offset to database.", e);
        }
    }

    public void cancel() {
        shutdownEngine();
        this.debeziumChangeFetcher.close();
    }

    public void close() throws Exception {
        cancel();
        if (this.executor != null) {
            this.executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
        }
        super.close();
    }

    private void shutdownEngine() {
        try {
            try {
                if (this.engine != null) {
                    this.engine.close();
                }
                if (this.executor != null) {
                    this.executor.shutdownNow();
                }
                this.debeziumStarted = false;
                if (this.handover != null) {
                    this.handover.close();
                }
            } catch (IOException e) {
                ExceptionUtils.rethrow(e);
                if (this.executor != null) {
                    this.executor.shutdownNow();
                }
                this.debeziumStarted = false;
                if (this.handover != null) {
                    this.handover.close();
                }
            }
        } catch (Throwable th) {
            if (this.executor != null) {
                this.executor.shutdownNow();
            }
            this.debeziumStarted = false;
            if (this.handover != null) {
                this.handover.close();
            }
            throw th;
        }
    }

    public TypeInformation<T> getProducedType() {
        return this.deserializer.getProducedType();
    }

    @VisibleForTesting
    public LinkedMap getPendingOffsetsToCommit() {
        return this.pendingOffsetsToCommit;
    }

    @VisibleForTesting
    public boolean getDebeziumStarted() {
        return this.debeziumStarted;
    }

    private Class<?> determineDatabase() {
        boolean isCompatible = FlinkDatabaseHistory.isCompatible(DatabaseHistoryUtil.retrieveHistory(this.engineInstanceName));
        if (LEGACY_IMPLEMENTATION_VALUE.equals(this.properties.get(LEGACY_IMPLEMENTATION_KEY))) {
            if (isCompatible) {
                return FlinkDatabaseHistory.class;
            }
            throw new IllegalStateException("The configured option 'debezium.internal.implementation' is 'legacy', but the state of source is incompatible with this implementation, you should remove the the option.");
        }
        if (FlinkDatabaseSchemaHistory.isCompatible(DatabaseHistoryUtil.retrieveHistory(this.engineInstanceName))) {
            return FlinkDatabaseSchemaHistory.class;
        }
        if (isCompatible) {
            return FlinkDatabaseHistory.class;
        }
        throw new IllegalStateException("Can't determine which DatabaseHistory to use.");
    }

    @VisibleForTesting
    public String getEngineInstanceName() {
        return this.engineInstanceName;
    }
}
