package com.ververica.cdc.connectors.mysql.debezium.task.context;

import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils;
import com.ververica.cdc.connectors.mysql.debezium.EmbeddedFlinkDatabaseHistory;
import com.ververica.cdc.connectors.mysql.debezium.dispatcher.EventDispatcherImpl;
import com.ververica.cdc.connectors.mysql.debezium.dispatcher.SignalEventDispatcher;
import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfig;
import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset;
import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffsetUtils;
import com.ververica.cdc.connectors.mysql.source.split.MySqlSplit;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.connector.mysql.GtidSet;
import io.debezium.connector.mysql.GtidUtils;
import io.debezium.connector.mysql.MySqlChangeEventSourceMetricsFactory;
import io.debezium.connector.mysql.MySqlConnection;
import io.debezium.connector.mysql.MySqlConnectorConfig;
import io.debezium.connector.mysql.MySqlDatabaseSchema;
import io.debezium.connector.mysql.MySqlOffsetContext;
import io.debezium.connector.mysql.MySqlPartition;
import io.debezium.connector.mysql.MySqlStreamingChangeEventSourceMetrics;
import io.debezium.connector.mysql.MySqlTopicSelector;
import io.debezium.pipeline.DataChangeEvent;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.metrics.SnapshotChangeEventSourceMetrics;
import io.debezium.pipeline.metrics.StreamingChangeEventSourceMetrics;
import io.debezium.pipeline.source.spi.EventMetadataProvider;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Offsets;
import io.debezium.relational.TableId;
import io.debezium.schema.DataCollectionId;
import io.debezium.schema.TopicSelector;
import io.debezium.util.Clock;
import io.debezium.util.Collect;
import io.debezium.util.SchemaNameAdjuster;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Stream;
import org.apache.kafka.connect.data.Struct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/ververica/cdc/connectors/mysql/debezium/task/context/StatefulTaskContext.class */
public class StatefulTaskContext {
    private static final int DEFAULT_BINLOG_QUEUE_SIZE_IN_SNAPSHOT_SCAN = 1024;
    private final MySqlSourceConfig sourceConfig;
    private final MySqlConnectorConfig connectorConfig;
    private final MySqlConnection connection;
    private final BinaryLogClient binaryLogClient;
    private MySqlDatabaseSchema databaseSchema;
    private MySqlTaskContextImpl taskContext;
    private MySqlOffsetContext offsetContext;
    private MySqlPartition mySqlPartition;
    private TopicSelector<TableId> topicSelector;
    private SnapshotChangeEventSourceMetrics<MySqlPartition> snapshotChangeEventSourceMetrics;
    private StreamingChangeEventSourceMetrics<MySqlPartition> streamingChangeEventSourceMetrics;
    private EventDispatcherImpl<TableId> dispatcher;
    private EventDispatcher.SnapshotReceiver<MySqlPartition> snapshotReceiver;
    private SignalEventDispatcher signalEventDispatcher;
    private ChangeEventQueue<DataChangeEvent> queue;
    private ErrorHandler errorHandler;
    private static final Logger LOG = LoggerFactory.getLogger(StatefulTaskContext.class);
    private static final Clock clock = Clock.SYSTEM;
    private final SchemaNameAdjuster schemaNameAdjuster = SchemaNameAdjuster.create();
    private final MySqlEventMetadataProvider metadataProvider = new MySqlEventMetadataProvider();

    /* loaded from: input_file:com/ververica/cdc/connectors/mysql/debezium/task/context/StatefulTaskContext$MySqlEventMetadataProvider.class */
    public static class MySqlEventMetadataProvider implements EventMetadataProvider {
        public static final String SERVER_ID_KEY = "server_id";
        public static final String GTID_KEY = "gtid";
        public static final String BINLOG_FILENAME_OFFSET_KEY = "file";
        public static final String BINLOG_POSITION_OFFSET_KEY = "pos";
        public static final String BINLOG_ROW_IN_EVENT_OFFSET_KEY = "row";
        public static final String THREAD_KEY = "thread";
        public static final String QUERY_KEY = "query";

        public Instant getEventTimestamp(DataCollectionId dataCollectionId, OffsetContext offsetContext, Object obj, Struct struct) {
            Long int64;
            if (struct == null) {
                return null;
            }
            Struct struct2 = struct.getStruct("source");
            if (dataCollectionId == null || (int64 = struct2.getInt64("ts_ms")) == null) {
                return null;
            }
            return Instant.ofEpochMilli(int64.longValue());
        }

        public Map<String, String> getEventSourcePosition(DataCollectionId dataCollectionId, OffsetContext offsetContext, Object obj, Struct struct) {
            if (struct == null) {
                return null;
            }
            Struct struct2 = struct.getStruct("source");
            if (dataCollectionId == null) {
                return null;
            }
            return Collect.hashMapOf("file", struct2.getString("file"), "pos", Long.toString(struct2.getInt64("pos").longValue()), "row", Integer.toString(struct2.getInt32("row").intValue()));
        }

        public String getTransactionId(DataCollectionId dataCollectionId, OffsetContext offsetContext, Object obj, Struct struct) {
            return ((MySqlOffsetContext) offsetContext).getTransactionId();
        }
    }

    public StatefulTaskContext(MySqlSourceConfig mySqlSourceConfig, BinaryLogClient binaryLogClient, MySqlConnection mySqlConnection) {
        this.sourceConfig = mySqlSourceConfig;
        this.connectorConfig = mySqlSourceConfig.getMySqlConnectorConfig();
        this.binaryLogClient = binaryLogClient;
        this.connection = mySqlConnection;
    }

    public void configure(MySqlSplit mySqlSplit) {
        boolean isTableIdCaseSensitive = this.connection.isTableIdCaseSensitive();
        this.topicSelector = MySqlTopicSelector.defaultSelector(this.connectorConfig);
        EmbeddedFlinkDatabaseHistory.registerHistory(this.sourceConfig.getDbzConfiguration().getString(EmbeddedFlinkDatabaseHistory.DATABASE_HISTORY_INSTANCE_NAME), mySqlSplit.getTableSchemas().values());
        Optional.ofNullable(this.databaseSchema).ifPresent((v0) -> {
            v0.close();
        });
        this.databaseSchema = DebeziumUtils.createMySqlDatabaseSchema(this.connectorConfig, isTableIdCaseSensitive);
        this.mySqlPartition = new MySqlPartition(this.connectorConfig.getLogicalName());
        this.offsetContext = loadStartingOffsetState(new MySqlOffsetContext.Loader(this.connectorConfig), mySqlSplit);
        validateAndLoadDatabaseHistory(this.offsetContext, this.databaseSchema);
        this.taskContext = new MySqlTaskContextImpl(this.connectorConfig, this.databaseSchema, this.binaryLogClient);
        this.queue = new ChangeEventQueue.Builder().pollInterval(this.connectorConfig.getPollInterval()).maxBatchSize(this.connectorConfig.getMaxBatchSize()).maxQueueSize(mySqlSplit.isSnapshotSplit() ? this.sourceConfig.getSplitSize() + DEFAULT_BINLOG_QUEUE_SIZE_IN_SNAPSHOT_SCAN : this.connectorConfig.getMaxQueueSize()).maxQueueSizeInBytes(this.connectorConfig.getMaxQueueSizeInBytes()).loggingContextSupplier(() -> {
            return this.taskContext.configureLoggingContext("mysql-cdc-connector-task");
        }).build();
        this.dispatcher = new EventDispatcherImpl<>(this.connectorConfig, this.topicSelector, this.databaseSchema, this.queue, this.connectorConfig.getTableFilters().dataCollectionFilter(), DataChangeEvent::new, this.metadataProvider, this.schemaNameAdjuster);
        this.snapshotReceiver = this.dispatcher.getSnapshotChangeEventReceiver();
        this.signalEventDispatcher = new SignalEventDispatcher(this.offsetContext.getOffset(), this.topicSelector.getPrimaryTopic(), this.queue);
        MySqlChangeEventSourceMetricsFactory mySqlChangeEventSourceMetricsFactory = new MySqlChangeEventSourceMetricsFactory(new MySqlStreamingChangeEventSourceMetrics(this.taskContext, this.queue, this.metadataProvider));
        this.snapshotChangeEventSourceMetrics = mySqlChangeEventSourceMetricsFactory.getSnapshotMetrics(this.taskContext, this.queue, this.metadataProvider);
        this.streamingChangeEventSourceMetrics = mySqlChangeEventSourceMetricsFactory.getStreamingMetrics(this.taskContext, this.queue, this.metadataProvider);
        this.errorHandler = new MySqlErrorHandler(this.connectorConfig, this.queue, this.taskContext, this.sourceConfig);
    }

    private void validateAndLoadDatabaseHistory(MySqlOffsetContext mySqlOffsetContext, MySqlDatabaseSchema mySqlDatabaseSchema) {
        mySqlDatabaseSchema.initializeStorage();
        mySqlDatabaseSchema.recover(Offsets.of(this.mySqlPartition, mySqlOffsetContext));
    }

    protected MySqlOffsetContext loadStartingOffsetState(OffsetContext.Loader<MySqlOffsetContext> loader, MySqlSplit mySqlSplit) {
        BinlogOffset ofEarliest = mySqlSplit.isSnapshotSplit() ? BinlogOffset.ofEarliest() : BinlogOffsetUtils.initializeEffectiveOffset(mySqlSplit.asBinlogSplit().getStartingOffset(), this.connection);
        LOG.info("Starting offset is initialized to {}", ofEarliest);
        MySqlOffsetContext mySqlOffsetContext = (MySqlOffsetContext) loader.load(ofEarliest.getOffset());
        if (isBinlogAvailable(mySqlOffsetContext)) {
            return mySqlOffsetContext;
        }
        throw new IllegalStateException("The connector is trying to read binlog starting at " + mySqlOffsetContext.getSourceInfo() + ", but this is no longer available on the server. Reconfigure the connector to use a snapshot when needed.");
    }

    private boolean isBinlogAvailable(MySqlOffsetContext mySqlOffsetContext) {
        return mySqlOffsetContext.gtidSet() != null ? checkGtidSet(mySqlOffsetContext) : checkBinlogFilename(mySqlOffsetContext);
    }

    private boolean checkGtidSet(MySqlOffsetContext mySqlOffsetContext) {
        String gtidSet = mySqlOffsetContext.gtidSet();
        if (gtidSet.trim().isEmpty()) {
            return true;
        }
        String knownGtidSet = this.connection.knownGtidSet();
        if (knownGtidSet == null || knownGtidSet.trim().isEmpty()) {
            LOG.warn("Connector used GTIDs previously, but MySQL does not know of any GTIDs or they are not enabled");
            return false;
        }
        GtidSet gtidSet2 = new GtidSet(knownGtidSet);
        LOG.info("Merging server GTID set {} with restored GTID set {}", gtidSet2, gtidSet);
        GtidSet fixRestoredGtidSet = GtidUtils.fixRestoredGtidSet(gtidSet2, new GtidSet(gtidSet));
        LOG.info("Merged GTID set is {}", fixRestoredGtidSet);
        if (!fixRestoredGtidSet.isContainedWithin(gtidSet2)) {
            LOG.info("Connector last known GTIDs are {}, but MySQL has {}", fixRestoredGtidSet, gtidSet2);
            return false;
        }
        LOG.info("MySQL current GTID set {} does contain the GTID set {} required by the connector.", gtidSet2, fixRestoredGtidSet);
        GtidSet subtractGtidSet = this.connection.subtractGtidSet(gtidSet2, fixRestoredGtidSet);
        GtidSet purgedGtidSet = this.connection.purgedGtidSet();
        LOG.info("Server has already purged {} GTIDs", purgedGtidSet);
        GtidSet subtractGtidSet2 = this.connection.subtractGtidSet(subtractGtidSet, purgedGtidSet);
        LOG.info("GTID set {} known by the server but not processed yet, for replication are available only GTID set {}", subtractGtidSet, subtractGtidSet2);
        if (subtractGtidSet.equals(subtractGtidSet2)) {
            return true;
        }
        LOG.warn("Some of the GTIDs needed to replicate have been already purged");
        return false;
    }

    private boolean checkBinlogFilename(MySqlOffsetContext mySqlOffsetContext) {
        String string = mySqlOffsetContext.getSourceInfo().getString("file");
        if (string == null || string.equals("")) {
            return true;
        }
        List<String> availableBinlogFiles = this.connection.availableBinlogFiles();
        Stream<String> stream = availableBinlogFiles.stream();
        Objects.requireNonNull(string);
        boolean anyMatch = stream.anyMatch((v1) -> {
            return r1.equals(v1);
        });
        if (anyMatch) {
            LOG.info("MySQL has the binlog file '{}' required by the connector", string);
        } else {
            LOG.info("Connector requires binlog file '{}', but MySQL only has {}", string, String.join(", ", availableBinlogFiles));
        }
        return anyMatch;
    }

    public static Clock getClock() {
        return clock;
    }

    public MySqlSourceConfig getSourceConfig() {
        return this.sourceConfig;
    }

    public MySqlConnectorConfig getConnectorConfig() {
        return this.connectorConfig;
    }

    public MySqlConnection getConnection() {
        return this.connection;
    }

    public BinaryLogClient getBinaryLogClient() {
        return this.binaryLogClient;
    }

    public MySqlDatabaseSchema getDatabaseSchema() {
        return this.databaseSchema;
    }

    public MySqlTaskContextImpl getTaskContext() {
        return this.taskContext;
    }

    public EventDispatcherImpl<TableId> getDispatcher() {
        return this.dispatcher;
    }

    public EventDispatcher.SnapshotReceiver<MySqlPartition> getSnapshotReceiver() {
        return this.snapshotReceiver;
    }

    public SignalEventDispatcher getSignalEventDispatcher() {
        return this.signalEventDispatcher;
    }

    public ChangeEventQueue<DataChangeEvent> getQueue() {
        return this.queue;
    }

    public ErrorHandler getErrorHandler() {
        return this.errorHandler;
    }

    public MySqlOffsetContext getOffsetContext() {
        return this.offsetContext;
    }

    public MySqlPartition getMySqlPartition() {
        return this.mySqlPartition;
    }

    public TopicSelector<TableId> getTopicSelector() {
        return this.topicSelector;
    }

    public SnapshotChangeEventSourceMetrics<MySqlPartition> getSnapshotChangeEventSourceMetrics() {
        return this.snapshotChangeEventSourceMetrics;
    }

    public StreamingChangeEventSourceMetrics<MySqlPartition> getStreamingChangeEventSourceMetrics() {
        return this.streamingChangeEventSourceMetrics;
    }

    public SchemaNameAdjuster getSchemaNameAdjuster() {
        return this.schemaNameAdjuster;
    }
}
