package io.debezium.connector.spanner.kafka.internal;

import io.debezium.util.Clock;
import io.debezium.util.Metronome;
import java.lang.Thread;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.TemporalAmount;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Predicate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/connector/spanner/kafka/internal/BufferedPublisher.class */
public class BufferedPublisher<V> {
    private static final Logger LOGGER = LoggerFactory.getLogger(BufferedPublisher.class);
    private volatile Thread thread;
    private final Predicate<V> publishImmediately;
    private final Consumer<V> onPublish;
    private final String taskUid;
    private final AtomicReference<V> value = new AtomicReference<>();
    private final Duration sleepInterval = Duration.ofMillis(100);
    private final Clock clock = Clock.system();

    public BufferedPublisher(String str, String str2, long j, Predicate<V> predicate, Consumer<V> consumer) {
        this.publishImmediately = predicate;
        this.onPublish = consumer;
        this.taskUid = str;
        this.thread = new Thread(() -> {
            Instant now = Instant.now();
            while (!Thread.currentThread().isInterrupted()) {
                try {
                    if (Instant.now().isAfter(now.plus((TemporalAmount) Duration.ofSeconds(600L)))) {
                        LOGGER.info("Task Uid {} is still publishing with AtomicReference value {}", this.taskUid, Boolean.valueOf(this.value.get() == null));
                        now = Instant.now();
                    }
                    publishBuffered();
                    Thread.sleep(j);
                } catch (InterruptedException e) {
                    return;
                }
            }
        }, "SpannerConnector-" + str2);
    }

    public void buffer(V v) {
        if (!this.publishImmediately.test(v)) {
            this.value.set(v);
            return;
        }
        synchronized (this) {
            this.value.set(null);
            this.onPublish.accept(v);
        }
    }

    private synchronized void publishBuffered() {
        V andSet = this.value.getAndSet(null);
        if (andSet != null) {
            this.onPublish.accept(andSet);
        }
    }

    public void start() {
        this.thread.start();
    }

    public void close() {
        LOGGER.info("Stopping BufferedPublisher for Task Uid {}", this.taskUid, Boolean.valueOf(this.value.get() == null));
        this.thread.interrupt();
        Metronome sleeper = Metronome.sleeper(this.sleepInterval, this.clock);
        while (!this.thread.getState().equals(Thread.State.TERMINATED)) {
            try {
                sleeper.pause();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        this.thread = null;
        LOGGER.info("Stopped BufferedPublisher for Task Uid {}", this.taskUid, Boolean.valueOf(this.value.get() == null));
    }
}
