package io.debezium.pipeline;

import io.debezium.DebeziumException;
import io.debezium.annotation.ThreadSafe;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.connector.AbstractSourceInfo;
import io.debezium.connector.base.ChangeEventQueueMetrics;
import io.debezium.connector.common.CdcSourceTaskContext;
import io.debezium.pipeline.metrics.SnapshotChangeEventSourceMetrics;
import io.debezium.pipeline.metrics.StreamingChangeEventSourceMetrics;
import io.debezium.pipeline.metrics.spi.ChangeEventSourceMetricsFactory;
import io.debezium.pipeline.notification.NotificationService;
import io.debezium.pipeline.signal.SignalProcessor;
import io.debezium.pipeline.signal.actions.SignalAction;
import io.debezium.pipeline.signal.actions.SignalActionProvider;
import io.debezium.pipeline.signal.actions.snapshotting.SnapshotConfiguration;
import io.debezium.pipeline.source.SnapshottingTask;
import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotChangeEventSource;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.pipeline.source.spi.ChangeEventSourceFactory;
import io.debezium.pipeline.source.spi.EventMetadataProvider;
import io.debezium.pipeline.source.spi.SnapshotChangeEventSource;
import io.debezium.pipeline.source.spi.StreamingChangeEventSource;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Offsets;
import io.debezium.pipeline.spi.Partition;
import io.debezium.pipeline.spi.SnapshotResult;
import io.debezium.schema.DatabaseSchema;
import io.debezium.spi.schema.DataCollectionId;
import io.debezium.util.LoggingContext;
import io.debezium.util.Threads;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.ServiceLoader;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.kafka.connect.source.SourceConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
/* loaded from: input_file:io/debezium/pipeline/ChangeEventSourceCoordinator.class */
public class ChangeEventSourceCoordinator<P extends Partition, O extends OffsetContext> {
    private static final Logger LOGGER = LoggerFactory.getLogger(ChangeEventSourceCoordinator.class);
    public static final Duration SHUTDOWN_WAIT_TIMEOUT = Duration.ofSeconds(90);
    protected final Offsets<P, O> previousOffsets;
    protected final ErrorHandler errorHandler;
    protected final ChangeEventSourceFactory<P, O> changeEventSourceFactory;
    protected final ChangeEventSourceMetricsFactory<P> changeEventSourceMetricsFactory;
    protected final ExecutorService executor;
    private final ExecutorService blockingSnapshotExecutor;
    protected final EventDispatcher<P, ?> eventDispatcher;
    protected final DatabaseSchema<?> schema;
    protected final SignalProcessor<P, O> signalProcessor;
    protected final NotificationService<P, O> notificationService;
    protected final CommonConnectorConfig connectorConfig;
    private volatile boolean running;
    private volatile boolean paused;
    private volatile boolean streaming;
    protected volatile StreamingChangeEventSource<P, O> streamingSource;
    protected final ReentrantLock commitOffsetLock = new ReentrantLock();
    protected SnapshotChangeEventSourceMetrics<P> snapshotMetrics;
    protected StreamingChangeEventSourceMetrics<P> streamingMetrics;
    private ChangeEventSource.ChangeEventSourceContext context;
    private SnapshotChangeEventSource<P, O> snapshotSource;
    private AtomicReference<LoggingContext.PreviousContext> previousLogContext;
    private CdcSourceTaskContext taskContext;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:io/debezium/pipeline/ChangeEventSourceCoordinator$CatchUpStreamingResult.class */
    public class CatchUpStreamingResult {
        public boolean performedCatchUpStreaming;

        public CatchUpStreamingResult(boolean z) {
            this.performedCatchUpStreaming = z;
        }
    }

    /* loaded from: input_file:io/debezium/pipeline/ChangeEventSourceCoordinator$ChangeEventSourceContextImpl.class */
    public class ChangeEventSourceContextImpl implements ChangeEventSource.ChangeEventSourceContext {
        private final Lock lock = new ReentrantLock();
        private final Condition snapshotFinished = this.lock.newCondition();
        private final Condition streamingPaused = this.lock.newCondition();

        public ChangeEventSourceContextImpl() {
        }

        @Override // io.debezium.pipeline.source.spi.ChangeEventSource.ChangeEventSourceContext
        public boolean isPaused() {
            return ChangeEventSourceCoordinator.this.paused;
        }

        @Override // io.debezium.pipeline.source.spi.ChangeEventSource.ChangeEventSourceContext
        public boolean isRunning() {
            return ChangeEventSourceCoordinator.this.running;
        }

        @Override // io.debezium.pipeline.source.spi.ChangeEventSource.ChangeEventSourceContext
        public void resumeStreaming() {
            this.lock.lock();
            try {
                this.snapshotFinished.signalAll();
                ChangeEventSourceCoordinator.LOGGER.trace("Streaming will now resume.");
            } finally {
                this.lock.unlock();
            }
        }

        @Override // io.debezium.pipeline.source.spi.ChangeEventSource.ChangeEventSourceContext
        public void waitSnapshotCompletion() throws InterruptedException {
            this.lock.lock();
            while (ChangeEventSourceCoordinator.this.paused) {
                try {
                    ChangeEventSourceCoordinator.LOGGER.trace("Waiting for snapshot to be completed.");
                    this.snapshotFinished.await();
                    ChangeEventSourceCoordinator.this.streaming = true;
                } finally {
                    this.lock.unlock();
                }
            }
        }

        @Override // io.debezium.pipeline.source.spi.ChangeEventSource.ChangeEventSourceContext
        public void streamingPaused() {
            this.lock.lock();
            try {
                ChangeEventSourceCoordinator.LOGGER.trace("Streaming paused. Blocking snapshot can now start.");
                ChangeEventSourceCoordinator.this.streaming = false;
                this.streamingPaused.signalAll();
            } finally {
                this.lock.unlock();
            }
        }

        @Override // io.debezium.pipeline.source.spi.ChangeEventSource.ChangeEventSourceContext
        public void waitStreamingPaused() throws InterruptedException {
            this.lock.lock();
            while (ChangeEventSourceCoordinator.this.streaming) {
                try {
                    ChangeEventSourceCoordinator.LOGGER.trace("Requested a blocking snapshot. Waiting for streaming to be paused.");
                    this.streamingPaused.await();
                } finally {
                    this.lock.unlock();
                }
            }
        }
    }

    public ChangeEventSourceCoordinator(Offsets<P, O> offsets, ErrorHandler errorHandler, Class<? extends SourceConnector> cls, CommonConnectorConfig commonConnectorConfig, ChangeEventSourceFactory<P, O> changeEventSourceFactory, ChangeEventSourceMetricsFactory<P> changeEventSourceMetricsFactory, EventDispatcher<P, ?> eventDispatcher, DatabaseSchema<?> databaseSchema, SignalProcessor<P, O> signalProcessor, NotificationService<P, O> notificationService) {
        this.previousOffsets = offsets;
        this.errorHandler = errorHandler;
        this.changeEventSourceFactory = changeEventSourceFactory;
        this.changeEventSourceMetricsFactory = changeEventSourceMetricsFactory;
        this.executor = Threads.newSingleThreadExecutor(cls, commonConnectorConfig.getLogicalName(), "change-event-source-coordinator");
        this.blockingSnapshotExecutor = Threads.newSingleThreadExecutor(cls, commonConnectorConfig.getLogicalName(), "blocking-snapshot");
        this.eventDispatcher = eventDispatcher;
        this.schema = databaseSchema;
        this.signalProcessor = signalProcessor;
        this.notificationService = notificationService;
        this.connectorConfig = commonConnectorConfig;
    }

    public synchronized void start(CdcSourceTaskContext cdcSourceTaskContext, ChangeEventQueueMetrics changeEventQueueMetrics, EventMetadataProvider eventMetadataProvider) {
        this.previousLogContext = new AtomicReference<>();
        try {
            this.taskContext = cdcSourceTaskContext;
            this.snapshotMetrics = this.changeEventSourceMetricsFactory.getSnapshotMetrics(cdcSourceTaskContext, changeEventQueueMetrics, eventMetadataProvider);
            this.streamingMetrics = this.changeEventSourceMetricsFactory.getStreamingMetrics(cdcSourceTaskContext, changeEventQueueMetrics, eventMetadataProvider);
            this.running = true;
            this.executor.submit(() -> {
                try {
                    this.previousLogContext.set(cdcSourceTaskContext.configureLoggingContext(AbstractSourceInfo.SNAPSHOT_KEY));
                    this.snapshotMetrics.register();
                    this.streamingMetrics.register();
                    LOGGER.info("Metrics registered");
                    this.context = new ChangeEventSourceContextImpl();
                    LOGGER.info("Context created");
                    this.snapshotSource = this.changeEventSourceFactory.getSnapshotChangeEventSource(this.snapshotMetrics, this.notificationService);
                    executeChangeEventSources(cdcSourceTaskContext, this.snapshotSource, this.previousOffsets, this.previousLogContext, this.context);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    LOGGER.warn("Change event source executor was interrupted", e);
                } catch (Throwable th) {
                    this.errorHandler.setProducerThrowable(th);
                } finally {
                    streamingConnected(false);
                }
            });
            getSignalProcessor(this.previousOffsets).ifPresent(signalProcessor -> {
                registerSignalActionsAndStartProcessor(signalProcessor, this.eventDispatcher, this, this.connectorConfig);
            });
            if (this.previousLogContext.get() != null) {
                this.previousLogContext.get().restore();
            }
        } catch (Throwable th) {
            if (this.previousLogContext.get() != null) {
                this.previousLogContext.get().restore();
            }
            throw th;
        }
    }

    protected void registerSignalActionsAndStartProcessor(SignalProcessor<P, O> signalProcessor, EventDispatcher<P, ? extends DataCollectionId> eventDispatcher, ChangeEventSourceCoordinator<P, ?> changeEventSourceCoordinator, CommonConnectorConfig commonConnectorConfig) {
        Map map = (Map) ((List) StreamSupport.stream(ServiceLoader.load(SignalActionProvider.class).spliterator(), false).collect(Collectors.toList())).stream().map(signalActionProvider -> {
            return signalActionProvider.createActions(eventDispatcher, changeEventSourceCoordinator, commonConnectorConfig);
        }).flatMap(map2 -> {
            return map2.entrySet().stream();
        }).collect(Collectors.toMap(entry -> {
            return (String) entry.getKey();
        }, entry2 -> {
            return (SignalAction) entry2.getValue();
        }));
        Objects.requireNonNull(signalProcessor);
        map.forEach(signalProcessor::registerSignalAction);
        signalProcessor.start();
    }

    public Optional<SignalProcessor<P, O>> getSignalProcessor(Offsets<P, O> offsets) {
        return (offsets == null || offsets.getOffsets().size() == 1) ? Optional.ofNullable(this.signalProcessor) : Optional.empty();
    }

    protected void executeChangeEventSources(CdcSourceTaskContext cdcSourceTaskContext, SnapshotChangeEventSource<P, O> snapshotChangeEventSource, Offsets<P, O> offsets, AtomicReference<LoggingContext.PreviousContext> atomicReference, ChangeEventSource.ChangeEventSourceContext changeEventSourceContext) throws InterruptedException {
        P theOnlyPartition = offsets.getTheOnlyPartition();
        O theOnlyOffset = offsets.getTheOnlyOffset();
        atomicReference.set(cdcSourceTaskContext.configureLoggingContext(AbstractSourceInfo.SNAPSHOT_KEY, theOnlyPartition));
        SnapshotResult<O> doSnapshot = doSnapshot(snapshotChangeEventSource, changeEventSourceContext, theOnlyPartition, theOnlyOffset);
        getSignalProcessor(offsets).ifPresent(signalProcessor -> {
            signalProcessor.setContext(doSnapshot.getOffset());
        });
        LOGGER.debug("Snapshot result {}", doSnapshot);
        if (this.running && doSnapshot.isCompletedOrSkipped()) {
            atomicReference.set(cdcSourceTaskContext.configureLoggingContext("streaming", theOnlyPartition));
            streamEvents(changeEventSourceContext, theOnlyPartition, doSnapshot.getOffset());
        }
    }

    public void doBlockingSnapshot(P p, OffsetContext offsetContext, SnapshotConfiguration snapshotConfiguration) {
        this.blockingSnapshotExecutor.submit(() -> {
            this.previousLogContext.set(this.taskContext.configureLoggingContext("streaming", p));
            this.paused = true;
            this.streaming = true;
            try {
                this.context.waitStreamingPaused();
                this.previousLogContext.set(this.taskContext.configureLoggingContext(AbstractSourceInfo.SNAPSHOT_KEY));
                LOGGER.info("Starting snapshot");
                SnapshotResult<O> doSnapshot = doSnapshot(this.snapshotSource, this.context, p, offsetContext, this.snapshotSource.getBlockingSnapshottingTask(p, offsetContext, snapshotConfiguration));
                if (this.running && doSnapshot.isCompletedOrSkipped()) {
                    this.previousLogContext.set(this.taskContext.configureLoggingContext("streaming", p));
                    this.paused = false;
                    this.context.resumeStreaming();
                }
            } catch (InterruptedException e) {
                throw new DebeziumException("Blocking snapshot has been interrupted");
            }
        });
    }

    protected SnapshotResult<O> doSnapshot(SnapshotChangeEventSource<P, O> snapshotChangeEventSource, ChangeEventSource.ChangeEventSourceContext changeEventSourceContext, P p, O o) throws InterruptedException {
        return doSnapshot(snapshotChangeEventSource, changeEventSourceContext, p, o, snapshotChangeEventSource.getSnapshottingTask(p, o));
    }

    protected SnapshotResult<O> doSnapshot(SnapshotChangeEventSource<P, O> snapshotChangeEventSource, ChangeEventSource.ChangeEventSourceContext changeEventSourceContext, P p, O o, SnapshottingTask snapshottingTask) throws InterruptedException {
        if (executeCatchUpStreaming(changeEventSourceContext, snapshotChangeEventSource, p, o).performedCatchUpStreaming) {
            streamingConnected(false);
            this.commitOffsetLock.lock();
            this.streamingSource = null;
            this.commitOffsetLock.unlock();
        }
        this.eventDispatcher.setEventListener(this.snapshotMetrics);
        SnapshotResult<O> execute = snapshotChangeEventSource.execute(changeEventSourceContext, p, o, snapshottingTask);
        LOGGER.info("Snapshot ended with {}", execute);
        if (execute.getStatus() == SnapshotResult.SnapshotResultStatus.COMPLETED || this.schema.tableInformationComplete()) {
            this.schema.assureNonEmptySchema();
        }
        return execute;
    }

    protected ChangeEventSourceCoordinator<P, O>.CatchUpStreamingResult executeCatchUpStreaming(ChangeEventSource.ChangeEventSourceContext changeEventSourceContext, SnapshotChangeEventSource<P, O> snapshotChangeEventSource, P p, O o) throws InterruptedException {
        return new CatchUpStreamingResult(false);
    }

    protected void streamEvents(ChangeEventSource.ChangeEventSourceContext changeEventSourceContext, P p, O o) throws InterruptedException {
        initStreamEvents(p, o);
        LOGGER.info("Starting streaming");
        this.streamingSource.execute(changeEventSourceContext, p, o);
        LOGGER.info("Finished streaming");
    }

    protected void initStreamEvents(P p, O o) throws InterruptedException {
        this.streamingSource = this.changeEventSourceFactory.getStreamingChangeEventSource();
        this.eventDispatcher.setEventListener(this.streamingMetrics);
        streamingConnected(true);
        this.streamingSource.init(o);
        getSignalProcessor(this.previousOffsets).ifPresent(signalProcessor -> {
            signalProcessor.setContext(this.streamingSource.getOffsetContext());
        });
        Optional<IncrementalSnapshotChangeEventSource<P, ? extends DataCollectionId>> incrementalSnapshotChangeEventSource = this.changeEventSourceFactory.getIncrementalSnapshotChangeEventSource(o, this.snapshotMetrics, this.snapshotMetrics, this.notificationService);
        this.eventDispatcher.setIncrementalSnapshotChangeEventSource(incrementalSnapshotChangeEventSource);
        incrementalSnapshotChangeEventSource.ifPresent(incrementalSnapshotChangeEventSource2 -> {
            incrementalSnapshotChangeEventSource2.init(p, o);
        });
    }

    public void commitOffset(Map<String, ?> map, Map<String, ?> map2) {
        if (this.commitOffsetLock.isLocked() || this.streamingSource == null || map2 == null) {
            return;
        }
        this.streamingSource.commitOffset(map, map2);
    }

    public synchronized void stop() throws InterruptedException {
        this.running = false;
        try {
            Thread.interrupted();
            this.executor.shutdown();
            this.blockingSnapshotExecutor.shutdown();
            boolean awaitTermination = this.executor.awaitTermination(SHUTDOWN_WAIT_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
            boolean awaitTermination2 = this.blockingSnapshotExecutor.awaitTermination(SHUTDOWN_WAIT_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
            if (!awaitTermination) {
                LOGGER.warn("Coordinator didn't stop in the expected time, shutting down executor now");
                Thread.interrupted();
                this.executor.shutdownNow();
                this.executor.awaitTermination(SHUTDOWN_WAIT_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
            }
            if (!awaitTermination2) {
                LOGGER.warn("Coordinator didn't stop in the expected time, shutting down blocking snapshot executor now");
                Thread.interrupted();
                this.blockingSnapshotExecutor.shutdownNow();
                this.blockingSnapshotExecutor.awaitTermination(SHUTDOWN_WAIT_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
            }
            Optional<SignalProcessor<P, O>> signalProcessor = getSignalProcessor(this.previousOffsets);
            if (signalProcessor.isPresent()) {
                signalProcessor.get().stop();
            }
            if (this.notificationService != null) {
                this.notificationService.stop();
            }
            this.eventDispatcher.close();
            this.snapshotMetrics.unregister();
            this.streamingMetrics.unregister();
        } catch (Throwable th) {
            this.snapshotMetrics.unregister();
            this.streamingMetrics.unregister();
            throw th;
        }
    }

    public ErrorHandler getErrorHandler() {
        return this.errorHandler;
    }

    protected void streamingConnected(boolean z) {
        if (this.changeEventSourceMetricsFactory.connectionMetricHandledByCoordinator()) {
            this.streamingMetrics.connected(z);
            LOGGER.info("Connected metrics set to '{}'", Boolean.valueOf(z));
        }
    }
}
