package org.apache.flink.cdc.connectors.base.source.reader.external;

import java.util.ArrayList;
import org.apache.flink.cdc.common.annotation.VisibleForTesting;
import org.apache.flink.cdc.connectors.base.config.SourceConfig;
import org.apache.flink.cdc.connectors.base.dialect.DataSourceDialect;
import org.apache.flink.cdc.connectors.base.source.meta.offset.Offset;
import org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
import org.apache.flink.cdc.connectors.base.source.meta.split.StreamSplit;
import org.apache.flink.cdc.connectors.base.source.meta.wartermark.WatermarkKind;
import org.apache.flink.cdc.connectors.base.source.reader.external.FetchTask;
import org.apache.flink.cdc.connectors.base.source.utils.hooks.SnapshotPhaseHooks;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/cdc/connectors/base/source/reader/external/AbstractScanFetchTask.class */
public abstract class AbstractScanFetchTask implements FetchTask {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractScanFetchTask.class);
    protected final SnapshotSplit snapshotSplit;
    protected volatile boolean taskRunning = false;
    private SnapshotPhaseHooks snapshotPhaseHooks = SnapshotPhaseHooks.empty();

    public AbstractScanFetchTask(SnapshotSplit snapshotSplit) {
        this.snapshotSplit = snapshotSplit;
    }

    @Override // org.apache.flink.cdc.connectors.base.source.reader.external.FetchTask
    public void execute(FetchTask.Context context) throws Exception {
        LOG.info("Execute ScanFetchTask for split: {}", this.snapshotSplit);
        DataSourceDialect dataSourceDialect = context.getDataSourceDialect();
        SourceConfig sourceConfig = context.getSourceConfig();
        this.taskRunning = true;
        if (this.snapshotPhaseHooks.getPreLowWatermarkAction() != null) {
            this.snapshotPhaseHooks.getPreLowWatermarkAction().accept(sourceConfig, this.snapshotSplit);
        }
        Offset displayCurrentOffset = dataSourceDialect.displayCurrentOffset(sourceConfig);
        LOG.info("Snapshot step 1 - Determining low watermark {} for split {}", displayCurrentOffset, this.snapshotSplit);
        dispatchLowWaterMarkEvent(context, this.snapshotSplit, displayCurrentOffset);
        if (this.snapshotPhaseHooks.getPostLowWatermarkAction() != null) {
            this.snapshotPhaseHooks.getPostLowWatermarkAction().accept(sourceConfig, this.snapshotSplit);
        }
        LOG.info("Snapshot step 2 - Snapshotting data");
        executeDataSnapshot(context);
        if (this.snapshotPhaseHooks.getPreHighWatermarkAction() != null) {
            this.snapshotPhaseHooks.getPreHighWatermarkAction().accept(sourceConfig, this.snapshotSplit);
        }
        Offset displayCurrentOffset2 = context.getSourceConfig().isSkipSnapshotBackfill() ? displayCurrentOffset : dataSourceDialect.displayCurrentOffset(sourceConfig);
        LOG.info("Snapshot step 3 - Determining high watermark {} for split {}", displayCurrentOffset2, this.snapshotSplit);
        dispatchHighWaterMarkEvent(context, this.snapshotSplit, displayCurrentOffset2);
        if (this.snapshotPhaseHooks.getPostHighWatermarkAction() != null) {
            this.snapshotPhaseHooks.getPostHighWatermarkAction().accept(sourceConfig, this.snapshotSplit);
        }
        StreamSplit createBackfillStreamSplit = createBackfillStreamSplit(displayCurrentOffset, displayCurrentOffset2);
        if (createBackfillStreamSplit.getEndingOffset().isAfter(createBackfillStreamSplit.getStartingOffset())) {
            executeBackfillTask(context, createBackfillStreamSplit);
        } else {
            LOG.info("Skip the backfill {} for split {}: low watermark >= high watermark", createBackfillStreamSplit, this.snapshotSplit);
            dispatchEndWaterMarkEvent(context, createBackfillStreamSplit, createBackfillStreamSplit.getEndingOffset());
        }
        this.taskRunning = false;
    }

    protected StreamSplit createBackfillStreamSplit(Offset offset, Offset offset2) {
        return new StreamSplit(this.snapshotSplit.splitId(), offset, offset2, new ArrayList(), this.snapshotSplit.getTableSchemas(), 0);
    }

    protected abstract void executeBackfillTask(FetchTask.Context context, StreamSplit streamSplit) throws Exception;

    protected abstract void executeDataSnapshot(FetchTask.Context context) throws Exception;

    protected void dispatchLowWaterMarkEvent(FetchTask.Context context, SourceSplitBase sourceSplitBase, Offset offset) throws Exception {
        if (!(context instanceof JdbcSourceFetchTaskContext)) {
            throw new UnsupportedOperationException("Unsupported Context type: " + context.getClass().toString());
        }
        ((JdbcSourceFetchTaskContext) context).getWaterMarkDispatcher().dispatchWatermarkEvent(((JdbcSourceFetchTaskContext) context).getPartition().getSourcePartition(), sourceSplitBase, offset, WatermarkKind.LOW);
    }

    protected void dispatchHighWaterMarkEvent(FetchTask.Context context, SourceSplitBase sourceSplitBase, Offset offset) throws Exception {
        if (!(context instanceof JdbcSourceFetchTaskContext)) {
            throw new UnsupportedOperationException("Unsupported Context type: " + context.getClass().toString());
        }
        ((JdbcSourceFetchTaskContext) context).getWaterMarkDispatcher().dispatchWatermarkEvent(((JdbcSourceFetchTaskContext) context).getPartition().getSourcePartition(), sourceSplitBase, offset, WatermarkKind.HIGH);
    }

    protected void dispatchEndWaterMarkEvent(FetchTask.Context context, SourceSplitBase sourceSplitBase, Offset offset) throws Exception {
        if (!(context instanceof JdbcSourceFetchTaskContext)) {
            throw new UnsupportedOperationException("Unsupported Context type: " + context.getClass().toString());
        }
        ((JdbcSourceFetchTaskContext) context).getWaterMarkDispatcher().dispatchWatermarkEvent(((JdbcSourceFetchTaskContext) context).getPartition().getSourcePartition(), sourceSplitBase, offset, WatermarkKind.END);
    }

    @Override // org.apache.flink.cdc.connectors.base.source.reader.external.FetchTask
    public boolean isRunning() {
        return this.taskRunning;
    }

    @Override // org.apache.flink.cdc.connectors.base.source.reader.external.FetchTask
    public SnapshotSplit getSplit() {
        return this.snapshotSplit;
    }

    @Override // org.apache.flink.cdc.connectors.base.source.reader.external.FetchTask
    public void close() {
        this.taskRunning = false;
    }

    @VisibleForTesting
    public void setSnapshotPhaseHooks(SnapshotPhaseHooks snapshotPhaseHooks) {
        this.snapshotPhaseHooks = snapshotPhaseHooks;
    }
}
