package io.confluent.ksql.execution.scalablepush.consumer;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.execution.scalablepush.ProcessingQueue;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.PushOffsetRange;
import java.time.Clock;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/ksql/execution/scalablepush/consumer/CatchupConsumer.class */
public class CatchupConsumer extends ScalablePushConsumer {
    private static final Logger LOG = LoggerFactory.getLogger(CatchupConsumer.class);

    @VisibleForTesting
    static final long WAIT_FOR_ASSIGNMENT_MS = 15000;

    @VisibleForTesting
    protected final Supplier<LatestConsumer> latestConsumerSupplier;
    private final CatchupCoordinator catchupCoordinator;
    private final PushOffsetRange offsetRange;
    private final Consumer<Long> sleepMs;
    private final BiConsumer<Object, Long> waitMs;
    private final long catchupWindow;
    private final AtomicBoolean signalledLatest;

    @VisibleForTesting
    protected final Consumer<ProcessingQueue> caughtUpCallback;

    /* loaded from: input_file:io/confluent/ksql/execution/scalablepush/consumer/CatchupConsumer$CatchupConsumerFactory.class */
    public interface CatchupConsumerFactory {
        CatchupConsumer create(String str, boolean z, LogicalSchema logicalSchema, KafkaConsumer<Object, GenericRow> kafkaConsumer, Supplier<LatestConsumer> supplier, CatchupCoordinator catchupCoordinator, PushOffsetRange pushOffsetRange, Clock clock, long j, Consumer<ProcessingQueue> consumer);
    }

    public CatchupConsumer(String str, boolean z, LogicalSchema logicalSchema, KafkaConsumer<Object, GenericRow> kafkaConsumer, Supplier<LatestConsumer> supplier, CatchupCoordinator catchupCoordinator, PushOffsetRange pushOffsetRange, Clock clock, Consumer<Long> consumer, BiConsumer<Object, Long> biConsumer, long j, Consumer<ProcessingQueue> consumer2) {
        super(str, z, logicalSchema, kafkaConsumer, clock);
        this.signalledLatest = new AtomicBoolean(false);
        this.latestConsumerSupplier = supplier;
        this.catchupCoordinator = catchupCoordinator;
        this.offsetRange = pushOffsetRange;
        this.sleepMs = consumer;
        this.waitMs = biConsumer;
        this.catchupWindow = j;
        this.caughtUpCallback = consumer2;
    }

    public CatchupConsumer(String str, boolean z, LogicalSchema logicalSchema, KafkaConsumer<Object, GenericRow> kafkaConsumer, Supplier<LatestConsumer> supplier, CatchupCoordinator catchupCoordinator, PushOffsetRange pushOffsetRange, Clock clock, long j, Consumer<ProcessingQueue> consumer) {
        this(str, z, logicalSchema, kafkaConsumer, supplier, catchupCoordinator, pushOffsetRange, clock, (v0) -> {
            sleep(v0);
        }, (v0, v1) -> {
            wait(v0, v1);
        }, j, consumer);
    }

    @Override // io.confluent.ksql.execution.scalablepush.consumer.ScalablePushConsumer
    protected void onEmptyRecords() {
        checkCaughtUp();
    }

    @Override // io.confluent.ksql.execution.scalablepush.consumer.ScalablePushConsumer
    protected void afterBatchProcessed() {
        checkCaughtUp();
    }

    protected void onNewAssignment(Optional<Map<Integer, Long>> optional) {
        this.consumer.assign(waitForNewAssignmentFromLatestConsumer());
        updateCurrentPositions(optional);
        this.newAssignment = false;
    }

    @Override // io.confluent.ksql.execution.scalablepush.consumer.ScalablePushConsumer
    protected void onNewAssignment() {
        onNewAssignment(Optional.empty());
    }

    protected synchronized Set<TopicPartition> waitForNewAssignmentFromLatestConsumer() {
        long millis = this.clock.millis();
        Set<TopicPartition> set = this.topicPartitions.get();
        while (true) {
            Set<TopicPartition> set2 = set;
            if (set2 != null) {
                return set2;
            }
            long millis2 = this.clock.millis() - millis;
            if (millis2 >= WAIT_FOR_ASSIGNMENT_MS) {
                throw new KsqlException("Timed out waiting for assignment from Latest");
            }
            this.waitMs.accept(this, Long.valueOf(WAIT_FOR_ASSIGNMENT_MS - millis2));
            set = this.topicPartitions.get();
        }
    }

    @Override // io.confluent.ksql.execution.scalablepush.consumer.ScalablePushConsumer
    protected void subscribeOrAssign() {
        LatestConsumer latestConsumer = this.latestConsumerSupplier.get();
        Preconditions.checkNotNull(latestConsumer, "Latest should always be started before catchup is run");
        newAssignment(latestConsumer.getAssignment());
        Map sparseRepresentation = this.offsetRange.getEndOffsets().getSparseRepresentation();
        onNewAssignment(Optional.of(sparseRepresentation));
        for (TopicPartition topicPartition : this.consumer.assignment()) {
            if (sparseRepresentation.containsKey(Integer.valueOf(topicPartition.partition())) && ((Long) sparseRepresentation.get(Integer.valueOf(topicPartition.partition()))).longValue() >= 0) {
                this.consumer.seek(topicPartition, ((Long) sparseRepresentation.get(Integer.valueOf(topicPartition.partition()))).longValue());
            }
        }
    }

    @Override // io.confluent.ksql.execution.scalablepush.consumer.ScalablePushConsumer
    protected void afterOfferedRow(ProcessingQueue processingQueue) {
        while (processingQueue.isAtLimit()) {
            LOG.info("Sleeping for a bit since queue is full queryid {}", processingQueue.getQueryId());
            this.sleepMs.accept(50L);
        }
    }

    @VisibleForTesting
    protected void checkCaughtUp() {
        LOG.info("Checking to see if we're caught up");
        this.catchupCoordinator.checkShouldCatchUp(this.signalledLatest, bool -> {
            LatestConsumer latestConsumer = this.latestConsumerSupplier.get();
            if (latestConsumer == null) {
                return false;
            }
            Map<TopicPartition, Long> currentOffsets = latestConsumer.getCurrentOffsets();
            if (currentOffsets.isEmpty()) {
                return false;
            }
            return Boolean.valueOf(caughtUp(currentOffsets, this.currentPositions.get(), bool.booleanValue(), this.catchupWindow));
        }, () -> {
            LatestConsumer latestConsumer = this.latestConsumerSupplier.get();
            if (latestConsumer == null) {
                LOG.warn("Couldn't switch over to latest yet because it's not running");
                return;
            }
            for (ProcessingQueue processingQueue : this.processingQueues.values()) {
                LOG.info("Switching over from catchup queryid {} to latest", processingQueue.getQueryId());
                latestConsumer.register(processingQueue);
                this.caughtUpCallback.accept(processingQueue);
            }
            this.processingQueues.clear();
            close();
        });
    }

    @Override // io.confluent.ksql.execution.scalablepush.consumer.ScalablePushConsumer, java.lang.AutoCloseable
    public void close() {
        super.close();
        this.catchupCoordinator.catchupIsClosing(this.signalledLatest);
    }

    private static boolean caughtUp(Map<TopicPartition, Long> map, Map<TopicPartition, Long> map2, boolean z, long j) {
        if (!map.keySet().equals(map2.keySet())) {
            return false;
        }
        for (Map.Entry<TopicPartition, Long> entry : map.entrySet()) {
            TopicPartition key = entry.getKey();
            Long value = entry.getValue();
            Long l = map2.get(key);
            if (value == null || l == null) {
                return false;
            }
            if (!z) {
                if (l.longValue() < value.longValue()) {
                    return false;
                }
            } else if (l.longValue() < value.longValue() && value.longValue() - l.longValue() > j) {
                return false;
            }
        }
        return true;
    }

    private static void sleep(long j) {
        try {
            Thread.sleep(j);
        } catch (InterruptedException e) {
            LOG.warn("Interrupted while sleeping", e);
            Thread.currentThread().interrupt();
        }
    }

    private static void wait(Object obj, long j) {
        try {
            obj.wait(j);
        } catch (InterruptedException e) {
            LOG.warn("Interrupted while sleeping", e);
            Thread.currentThread().interrupt();
            throw new KsqlException("Interrupted while waiting for assignment");
        }
    }
}
