package io.confluent.ksql.execution.scalablepush.consumer;

import com.google.common.annotations.VisibleForTesting;
import java.time.Clock;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/ksql/execution/scalablepush/consumer/CatchupCoordinatorImpl.class */
public class CatchupCoordinatorImpl implements CatchupCoordinator {
    private static final Logger LOG = LoggerFactory.getLogger(CatchupCoordinatorImpl.class);
    private static final long WAIT_TIME_MS = 10000;
    private final Clock clock;
    private int catchupJoiners;
    private boolean latestWaiting;

    public CatchupCoordinatorImpl() {
        this(Clock.systemUTC());
    }

    public CatchupCoordinatorImpl(Clock clock) {
        this.catchupJoiners = 0;
        this.latestWaiting = false;
        this.clock = clock;
    }

    @Override // io.confluent.ksql.execution.scalablepush.consumer.CatchupCoordinator
    public synchronized void checkShouldWaitForCatchup() {
        long millis = this.clock.millis();
        while (this.catchupJoiners > 0) {
            try {
                this.latestWaiting = true;
                LOG.info("Waiting for Catchups to join Latest consumer, current count {}", Integer.valueOf(this.catchupJoiners));
                wait(WAIT_TIME_MS);
                long millis2 = this.clock.millis() - millis;
                LOG.info("Waited for Catchups to join Latest consumer for {}ms and current count {}", Long.valueOf(millis2), Integer.valueOf(this.catchupJoiners));
                if (millis2 >= WAIT_TIME_MS) {
                    break;
                }
            } catch (InterruptedException e) {
                LOG.error("Caught InterruptedException during catchup waiting", e);
                Thread.currentThread().interrupt();
                throw new RuntimeException("InterruptedException during catchup waiting", e);
            }
        }
        this.latestWaiting = false;
    }

    @Override // io.confluent.ksql.execution.scalablepush.consumer.CatchupCoordinator
    public synchronized boolean checkShouldCatchUp(AtomicBoolean atomicBoolean, Function<Boolean, Boolean> function, Runnable runnable) {
        if (this.latestWaiting && function.apply(false).booleanValue()) {
            LOG.info("Catchup is joining latest, about to decrement {}", Integer.valueOf(this.catchupJoiners));
            if (atomicBoolean.get()) {
                atomicBoolean.set(false);
                this.catchupJoiners--;
                notify();
            }
            runnable.run();
            return true;
        }
        if (atomicBoolean.get() || !function.apply(true).booleanValue()) {
            return false;
        }
        LOG.info("Signalling Latest, about to increment {}", Integer.valueOf(this.catchupJoiners));
        atomicBoolean.set(true);
        this.catchupJoiners++;
        return false;
    }

    @Override // io.confluent.ksql.execution.scalablepush.consumer.CatchupCoordinator
    public synchronized void catchupIsClosing(AtomicBoolean atomicBoolean) {
        if (atomicBoolean.get()) {
            atomicBoolean.set(false);
            this.catchupJoiners--;
        }
    }

    @VisibleForTesting
    public synchronized void simulateWaitingInTest() {
        if (this.catchupJoiners > 0) {
            this.latestWaiting = true;
        }
    }
}
