package io.debezium.connector.spanner.task.leader;

import io.debezium.connector.spanner.SpannerConnectorConfig;
import io.debezium.connector.spanner.processor.SpannerEventDispatcher;
import io.debezium.connector.spanner.task.TaskSyncContextHolder;
import java.lang.Thread;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/connector/spanner/task/leader/LowWatermarkStampPublisher.class */
public class LowWatermarkStampPublisher {
    private static final Logger LOGGER = LoggerFactory.getLogger(LowWatermarkStampPublisher.class);
    private final Duration publishInterval;
    private volatile Thread publisherThread;
    private final SpannerEventDispatcher spannerEventDispatcher;
    private final boolean lowWatermarkEnabled;
    private final AtomicBoolean suspendFlag = new AtomicBoolean(false);
    private final Consumer<Throwable> errorHandler;
    private final TaskSyncContextHolder taskSyncContextHolder;

    public LowWatermarkStampPublisher(SpannerConnectorConfig spannerConnectorConfig, SpannerEventDispatcher spannerEventDispatcher, Consumer<Throwable> consumer, TaskSyncContextHolder taskSyncContextHolder) {
        this.publishInterval = spannerConnectorConfig.getLowWatermarkStampInterval();
        this.spannerEventDispatcher = spannerEventDispatcher;
        this.lowWatermarkEnabled = spannerConnectorConfig.isLowWatermarkEnabled();
        this.errorHandler = consumer;
        this.taskSyncContextHolder = taskSyncContextHolder;
    }

    public void init() {
        if (this.lowWatermarkEnabled && this.publisherThread == null) {
            this.publisherThread = createPublisherThread();
        }
    }

    public void start() {
        if (this.lowWatermarkEnabled) {
            if (this.publisherThread.getState().equals(Thread.State.NEW)) {
                this.publisherThread.start();
            }
            this.suspendFlag.compareAndSet(true, false);
        }
    }

    public void suspend() {
        this.suspendFlag.set(true);
    }

    public void destroy() throws InterruptedException {
        if (this.publisherThread == null || this.publisherThread.getState().equals(Thread.State.NEW)) {
            return;
        }
        this.suspendFlag.set(true);
        this.publisherThread.interrupt();
        do {
        } while (this.publisherThread != null);
        this.spannerEventDispatcher.publishLowWatermarkStampEvent();
    }

    private Thread createPublisherThread() {
        Thread thread = new Thread(() -> {
            do {
                try {
                    if (this.taskSyncContextHolder.get().isInitialized()) {
                        break;
                    }
                } finally {
                    this.publisherThread = null;
                }
            } while (!Thread.currentThread().isInterrupted());
            while (!Thread.currentThread().isInterrupted()) {
                try {
                    if (!this.suspendFlag.get()) {
                        this.spannerEventDispatcher.publishLowWatermarkStampEvent();
                    }
                    Thread.sleep(this.publishInterval.toMillis());
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        }, "SpannerConnector-LowWatermarkStampPublisher");
        thread.setUncaughtExceptionHandler((thread2, th) -> {
            LOGGER.error("LowWatermarkStampPublisher execution error", th);
            this.publisherThread = null;
            this.errorHandler.accept(th);
        });
        return thread;
    }
}
