package com.ververica.cdc.connectors.mysql.debezium.task;

import com.github.shyiko.mysql.binlog.event.Event;
import com.ververica.cdc.connectors.mysql.debezium.dispatcher.EventDispatcherImpl;
import com.ververica.cdc.connectors.mysql.debezium.dispatcher.SignalEventDispatcher;
import com.ververica.cdc.connectors.mysql.debezium.reader.SnapshotSplitReader;
import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset;
import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffsetUtils;
import com.ververica.cdc.connectors.mysql.source.split.MySqlBinlogSplit;
import com.ververica.cdc.connectors.mysql.source.utils.RecordUtils;
import io.debezium.DebeziumException;
import io.debezium.connector.mysql.MySqlConnection;
import io.debezium.connector.mysql.MySqlConnectorConfig;
import io.debezium.connector.mysql.MySqlOffsetContext;
import io.debezium.connector.mysql.MySqlPartition;
import io.debezium.connector.mysql.MySqlStreamingChangeEventSource;
import io.debezium.connector.mysql.MySqlStreamingChangeEventSourceMetrics;
import io.debezium.connector.mysql.MySqlTaskContext;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.relational.TableId;
import io.debezium.util.Clock;
import java.util.Map;
import java.util.function.Predicate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/ververica/cdc/connectors/mysql/debezium/task/MySqlBinlogSplitReadTask.class */
public class MySqlBinlogSplitReadTask extends MySqlStreamingChangeEventSource {
    private static final Logger LOG = LoggerFactory.getLogger(MySqlBinlogSplitReadTask.class);
    private final MySqlBinlogSplit binlogSplit;
    private final EventDispatcherImpl<TableId> eventDispatcher;
    private final SignalEventDispatcher signalEventDispatcher;
    private final ErrorHandler errorHandler;
    private final Predicate<Event> eventFilter;
    private ChangeEventSource.ChangeEventSourceContext context;

    public MySqlBinlogSplitReadTask(MySqlConnectorConfig mySqlConnectorConfig, MySqlConnection mySqlConnection, EventDispatcherImpl<TableId> eventDispatcherImpl, SignalEventDispatcher signalEventDispatcher, ErrorHandler errorHandler, Clock clock, MySqlTaskContext mySqlTaskContext, MySqlStreamingChangeEventSourceMetrics mySqlStreamingChangeEventSourceMetrics, MySqlBinlogSplit mySqlBinlogSplit, Predicate<Event> predicate) {
        super(mySqlConnectorConfig, mySqlConnection, eventDispatcherImpl, errorHandler, clock, mySqlTaskContext, mySqlStreamingChangeEventSourceMetrics);
        this.binlogSplit = mySqlBinlogSplit;
        this.eventDispatcher = eventDispatcherImpl;
        this.errorHandler = errorHandler;
        this.signalEventDispatcher = signalEventDispatcher;
        this.eventFilter = predicate;
    }

    @Override // io.debezium.connector.mysql.MySqlStreamingChangeEventSource
    public void execute(ChangeEventSource.ChangeEventSourceContext changeEventSourceContext, MySqlPartition mySqlPartition, MySqlOffsetContext mySqlOffsetContext) throws InterruptedException {
        this.context = changeEventSourceContext;
        super.execute(changeEventSourceContext, mySqlPartition, mySqlOffsetContext);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.debezium.connector.mysql.MySqlStreamingChangeEventSource
    public void handleEvent(MySqlPartition mySqlPartition, MySqlOffsetContext mySqlOffsetContext, Event event) {
        if (this.eventFilter.test(event)) {
            super.handleEvent(mySqlPartition, mySqlOffsetContext, event);
            if (isBoundedRead()) {
                BinlogOffset binlogPosition = RecordUtils.getBinlogPosition((Map<String, ?>) mySqlOffsetContext.getOffset());
                if (binlogPosition.isAtOrAfter(this.binlogSplit.getEndingOffset())) {
                    try {
                        this.signalEventDispatcher.dispatchWatermarkEvent(this.binlogSplit, binlogPosition, SignalEventDispatcher.WatermarkKind.BINLOG_END);
                    } catch (InterruptedException e) {
                        LOG.error("Send signal event error.", e);
                        this.errorHandler.setProducerThrowable(new DebeziumException("Error processing binlog signal event", e));
                    }
                    ((SnapshotSplitReader.SnapshotBinlogSplitChangeEventSourceContextImpl) this.context).finished();
                }
            }
        }
    }

    private boolean isBoundedRead() {
        return !BinlogOffsetUtils.isNonStoppingOffset(this.binlogSplit.getEndingOffset());
    }
}
