package io.debezium.connector.spanner;

import com.google.common.annotations.VisibleForTesting;
import io.debezium.connector.spanner.context.offset.SpannerOffsetContext;
import io.debezium.connector.spanner.context.offset.SpannerOffsetContextFactory;
import io.debezium.connector.spanner.db.metadata.SchemaRegistry;
import io.debezium.connector.spanner.db.metadata.TableId;
import io.debezium.connector.spanner.db.model.Mod;
import io.debezium.connector.spanner.db.model.Partition;
import io.debezium.connector.spanner.db.model.event.ChangeStreamEvent;
import io.debezium.connector.spanner.db.model.event.ChildPartitionsEvent;
import io.debezium.connector.spanner.db.model.event.DataChangeEvent;
import io.debezium.connector.spanner.db.model.event.FinishPartitionEvent;
import io.debezium.connector.spanner.db.model.event.HeartbeatEvent;
import io.debezium.connector.spanner.db.stream.ChangeStream;
import io.debezium.connector.spanner.db.stream.PartitionEventListener;
import io.debezium.connector.spanner.exception.FinishingPartitionTimeout;
import io.debezium.connector.spanner.metrics.MetricsEventPublisher;
import io.debezium.connector.spanner.metrics.event.ChildPartitionsMetricEvent;
import io.debezium.connector.spanner.processor.SourceRecordUtils;
import io.debezium.connector.spanner.processor.SpannerChangeRecordEmitter;
import io.debezium.connector.spanner.processor.SpannerEventDispatcher;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.util.Clock;
import java.time.Duration;
import java.util.List;
import java.util.Objects;
import java.util.function.BooleanSupplier;
import java.util.stream.Collectors;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/connector/spanner/SpannerStreamingChangeEventSource.class */
public class SpannerStreamingChangeEventSource implements CommittingRecordsStreamingChangeEventSource<SpannerPartition, SpannerOffsetContext> {
    private static final Logger LOGGER = LoggerFactory.getLogger(SpannerStreamingChangeEventSource.class);
    private static final StuckPartitionStrategy STUCK_PARTITION_STRATEGY = StuckPartitionStrategy.ESCALATE;
    private static final Duration FINISHING_PARTITION_TIMEOUT = Duration.ofSeconds(60);
    private final FinishPartitionStrategy finishPartitionStrategy;
    private final ErrorHandler errorHandler;
    private final StreamEventQueue eventQueue;
    private final MetricsEventPublisher metricsEventPublisher;
    private final ChangeStream stream;
    private final PartitionManager partitionManager;
    private final SchemaRegistry schemaRegistry;
    private final SpannerEventDispatcher spannerEventDispatcher;
    private final FinishingPartitionManager finishingPartitionManager;
    private final FinishPartitionWatchDog finishPartitionWatchDog;
    private final SpannerOffsetContextFactory offsetContextFactory;
    private final SpannerConnectorConfig connectorConfig;
    private volatile Thread thread;

    public SpannerStreamingChangeEventSource(SpannerConnectorConfig spannerConnectorConfig, ErrorHandler errorHandler, ChangeStream changeStream, StreamEventQueue streamEventQueue, MetricsEventPublisher metricsEventPublisher, PartitionManager partitionManager, SchemaRegistry schemaRegistry, SpannerEventDispatcher spannerEventDispatcher, boolean z, SpannerOffsetContextFactory spannerOffsetContextFactory) {
        this.connectorConfig = spannerConnectorConfig;
        this.offsetContextFactory = spannerOffsetContextFactory;
        this.errorHandler = errorHandler;
        this.eventQueue = streamEventQueue;
        this.metricsEventPublisher = metricsEventPublisher;
        this.stream = changeStream;
        this.partitionManager = partitionManager;
        this.schemaRegistry = schemaRegistry;
        this.spannerEventDispatcher = spannerEventDispatcher;
        Objects.requireNonNull(partitionManager);
        this.finishingPartitionManager = new FinishingPartitionManager(spannerConnectorConfig, partitionManager::updateToFinished);
        this.finishPartitionWatchDog = new FinishPartitionWatchDog(this.finishingPartitionManager, FINISHING_PARTITION_TIMEOUT, list -> {
            processFailure(new FinishingPartitionTimeout(list));
        });
        this.finishPartitionStrategy = z ? FinishPartitionStrategy.AFTER_COMMIT : FinishPartitionStrategy.AFTER_STREAMING_FINISH;
    }

    public void execute(ChangeEventSource.ChangeEventSourceContext changeEventSourceContext, SpannerPartition spannerPartition, SpannerOffsetContext spannerOffsetContext) throws InterruptedException {
        LOGGER.info("Starting streaming...");
        try {
            try {
                try {
                    startProcessing(changeEventSourceContext);
                    ChangeStream changeStream = this.stream;
                    Objects.requireNonNull(changeEventSourceContext);
                    BooleanSupplier booleanSupplier = changeEventSourceContext::isRunning;
                    StreamEventQueue streamEventQueue = this.eventQueue;
                    Objects.requireNonNull(streamEventQueue);
                    changeStream.run(booleanSupplier, streamEventQueue::put, new PartitionEventListener() { // from class: io.debezium.connector.spanner.SpannerStreamingChangeEventSource.1
                        @Override // io.debezium.connector.spanner.db.stream.PartitionEventListener
                        public void onRun(Partition partition) throws InterruptedException {
                            SpannerStreamingChangeEventSource.this.finishingPartitionManager.registerPartition(partition.getToken());
                            SpannerStreamingChangeEventSource.this.partitionManager.updateToRunning(partition.getToken());
                        }

                        @Override // io.debezium.connector.spanner.db.stream.PartitionEventListener
                        public void onFinish(Partition partition) {
                            SpannerStreamingChangeEventSource.LOGGER.info("Partition onFinish: {}", partition.getToken());
                        }

                        @Override // io.debezium.connector.spanner.db.stream.PartitionEventListener
                        public void onException(Partition partition, Exception exc) throws InterruptedException {
                            SpannerStreamingChangeEventSource.LOGGER.error("Try to stream again from partition {} after exception {}", partition.getToken(), exc.getMessage());
                            SpannerStreamingChangeEventSource.this.partitionManager.updateToReadyForStreaming(partition.getToken());
                        }

                        @Override // io.debezium.connector.spanner.db.stream.PartitionEventListener
                        public boolean onStuckPartition(String str) throws InterruptedException {
                            if (!SpannerStreamingChangeEventSource.STUCK_PARTITION_STRATEGY.equals(StuckPartitionStrategy.REPEAT_STREAMING)) {
                                return SpannerStreamingChangeEventSource.STUCK_PARTITION_STRATEGY.equals(StuckPartitionStrategy.ESCALATE);
                            }
                            SpannerStreamingChangeEventSource.LOGGER.warn("Try to requery partition {}", str);
                            SpannerStreamingChangeEventSource.this.partitionManager.updateToReadyForStreaming(str);
                            return false;
                        }
                    });
                    LOGGER.info("Stopping streaming...");
                    this.finishPartitionWatchDog.stop();
                    if (this.thread != null) {
                        this.thread.interrupt();
                    }
                } catch (Exception e) {
                    processFailure(e);
                    LOGGER.info("Stopping streaming...");
                    this.finishPartitionWatchDog.stop();
                    if (this.thread != null) {
                        this.thread.interrupt();
                    }
                }
            } catch (InterruptedException e2) {
                LOGGER.info("Continue to stop streaming...");
                LOGGER.info("Stopping streaming...");
                this.finishPartitionWatchDog.stop();
                if (this.thread != null) {
                    this.thread.interrupt();
                }
            }
        } catch (Throwable th) {
            LOGGER.info("Stopping streaming...");
            this.finishPartitionWatchDog.stop();
            if (this.thread != null) {
                this.thread.interrupt();
            }
            throw th;
        }
    }

    private void startProcessing(ChangeEventSource.ChangeEventSourceContext changeEventSourceContext) {
        this.thread = new Thread(() -> {
            while (changeEventSourceContext.isRunning()) {
                try {
                    ChangeStreamEvent take = this.eventQueue.take();
                    if (take instanceof DataChangeEvent) {
                        processDataChangeEvent((DataChangeEvent) take);
                    } else if (take instanceof HeartbeatEvent) {
                        processHeartBeatEvent((HeartbeatEvent) take);
                    } else if (take instanceof ChildPartitionsEvent) {
                        processChildPartitionsEvent((ChildPartitionsEvent) take);
                    } else if (take instanceof FinishPartitionEvent) {
                        LOGGER.info("Received FinishPartitionEvent for partition {}", take.getMetadata().getPartitionToken());
                        if (this.finishPartitionStrategy.equals(FinishPartitionStrategy.AFTER_COMMIT)) {
                            this.finishingPartitionManager.onPartitionFinishEvent(take.getMetadata().getPartitionToken());
                        } else if (this.finishPartitionStrategy.equals(FinishPartitionStrategy.AFTER_STREAMING_FINISH)) {
                            this.finishingPartitionManager.forceFinish(take.getMetadata().getPartitionToken());
                        }
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    return;
                } catch (Exception e2) {
                    processFailure(e2);
                    return;
                }
            }
        }, "SpannerConnector-SpannerStreamingChangeEventSource");
        this.thread.start();
    }

    @VisibleForTesting
    void processDataChangeEvent(DataChangeEvent dataChangeEvent) throws InterruptedException {
        TableId tableId = TableId.getTableId(dataChangeEvent.getTableName());
        this.schemaRegistry.checkSchema(tableId, dataChangeEvent.getCommitTimestamp(), dataChangeEvent.getRowType());
        SpannerPartition spannerPartition = new SpannerPartition(dataChangeEvent.getPartitionToken());
        for (Mod mod : dataChangeEvent.getMods()) {
            SpannerOffsetContext offsetContextFromDataChangeEvent = this.offsetContextFactory.getOffsetContextFromDataChangeEvent(mod.getModNumber(), dataChangeEvent);
            if (this.spannerEventDispatcher.dispatchDataChangeEvent(spannerPartition, tableId, new SpannerChangeRecordEmitter(this.finishingPartitionManager.newRecord(dataChangeEvent.getPartitionToken()), dataChangeEvent.getModType(), mod, spannerPartition, offsetContextFromDataChangeEvent, Clock.SYSTEM, this.connectorConfig))) {
                LOGGER.debug("DataChangeEvent has been dispatched form table {} with modification: {}, offset{}, event: {}", new Object[]{tableId.getTableName(), mod, offsetContextFromDataChangeEvent.getOffset(), dataChangeEvent});
            } else {
                LOGGER.info("DataChangeEvent has not been dispatched form table {} with modification: {}", tableId.getTableName(), mod);
            }
        }
    }

    private void processHeartBeatEvent(HeartbeatEvent heartbeatEvent) throws InterruptedException {
        SpannerOffsetContext offsetContextFromHeartbeatEvent = this.offsetContextFactory.getOffsetContextFromHeartbeatEvent(heartbeatEvent);
        SpannerPartition spannerPartition = new SpannerPartition(heartbeatEvent.getMetadata().getPartitionToken());
        this.spannerEventDispatcher.alwaysDispatchHeartbeatEvent(spannerPartition, offsetContextFromHeartbeatEvent);
        LOGGER.debug("Dispatching heartbeat for event {} with partition {} and offset {}", new Object[]{heartbeatEvent, spannerPartition, offsetContextFromHeartbeatEvent.getOffset()});
    }

    private void processChildPartitionsEvent(ChildPartitionsEvent childPartitionsEvent) throws InterruptedException {
        LOGGER.info("Received ChildPartitionsEvent: {}", childPartitionsEvent);
        if (childPartitionsEvent.getChildPartitions().size() > 1) {
            LOGGER.info("A split event occurred {}", childPartitionsEvent);
        } else {
            LOGGER.info("A move event occurred {}", childPartitionsEvent);
        }
        this.partitionManager.newChildPartitions((List) childPartitionsEvent.getChildPartitions().stream().map(childPartition -> {
            return Partition.builder().token(childPartition.getToken()).parentTokens(childPartition.getParentTokens()).startTimestamp(childPartitionsEvent.getStartTimestamp()).endTimestamp(childPartitionsEvent.getMetadata().getPartitionEndTimestamp()).originPartitionToken(childPartitionsEvent.getMetadata().getPartitionToken()).build();
        }).collect(Collectors.toList()));
        this.metricsEventPublisher.publishMetricEvent(new ChildPartitionsMetricEvent(childPartitionsEvent.getChildPartitions().size()));
    }

    private void processFailure(Exception exc) {
        this.errorHandler.setProducerThrowable(exc);
    }

    @Override // io.debezium.connector.spanner.CommittingRecordsStreamingChangeEventSource
    public void commitRecords(List<SourceRecord> list) throws InterruptedException {
        if (this.finishPartitionStrategy.equals(FinishPartitionStrategy.AFTER_COMMIT)) {
            for (SourceRecord sourceRecord : list) {
                String extractToken = SourceRecordUtils.extractToken(sourceRecord);
                String extractRecordUid = SourceRecordUtils.extractRecordUid(sourceRecord);
                if (extractToken != null && extractRecordUid != null) {
                    this.finishingPartitionManager.commitRecord(extractToken, extractRecordUid);
                }
            }
        }
    }
}
