package io.confluent.ksql.execution.scalablepush.consumer;

import com.google.common.collect.ImmutableList;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.util.KsqlConfig;
import java.time.Clock;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/ksql/execution/scalablepush/consumer/LatestConsumer.class */
public class LatestConsumer extends ScalablePushConsumer {
    private static final Logger LOG = LoggerFactory.getLogger(LatestConsumer.class);
    private final CatchupCoordinator catchupCoordinator;
    private final Consumer<Collection<TopicPartition>> catchupAssignmentUpdater;
    private final KsqlConfig ksqlConfig;
    private boolean gotFirstAssignment;

    /* loaded from: input_file:io/confluent/ksql/execution/scalablepush/consumer/LatestConsumer$LatestConsumerFactory.class */
    public interface LatestConsumerFactory {
        LatestConsumer create(String str, boolean z, LogicalSchema logicalSchema, KafkaConsumer<Object, GenericRow> kafkaConsumer, CatchupCoordinator catchupCoordinator, Consumer<Collection<TopicPartition>> consumer, KsqlConfig ksqlConfig, Clock clock);
    }

    @SuppressFBWarnings({"EI_EXPOSE_REP2"})
    public LatestConsumer(String str, boolean z, LogicalSchema logicalSchema, KafkaConsumer<Object, GenericRow> kafkaConsumer, CatchupCoordinator catchupCoordinator, Consumer<Collection<TopicPartition>> consumer, KsqlConfig ksqlConfig, Clock clock) {
        super(str, z, logicalSchema, kafkaConsumer, clock);
        this.gotFirstAssignment = false;
        this.catchupCoordinator = catchupCoordinator;
        this.catchupAssignmentUpdater = consumer;
        this.ksqlConfig = ksqlConfig;
    }

    @Override // io.confluent.ksql.execution.scalablepush.consumer.ScalablePushConsumer
    protected void subscribeOrAssign() {
        try {
            Thread.sleep(this.ksqlConfig.getLong("ksql.query.push.v2.new.latest.delay.ms").longValue());
            this.consumer.subscribe(ImmutableList.of(this.topicName), new ConsumerRebalanceListener() { // from class: io.confluent.ksql.execution.scalablepush.consumer.LatestConsumer.1
                public void onPartitionsRevoked(Collection<TopicPartition> collection) {
                    LatestConsumer.LOG.info("Latest consumer had partitions revoked {}", collection);
                    LatestConsumer.this.newAssignment(null);
                }

                public void onPartitionsAssigned(Collection<TopicPartition> collection) {
                    LatestConsumer.LOG.info("Latest consumer had partitions assigned {}", collection);
                    if (collection == null) {
                        return;
                    }
                    LatestConsumer.this.newAssignment(collection);
                    LatestConsumer.this.updateCurrentPositions();
                    LatestConsumer.this.catchupAssignmentUpdater.accept(collection);
                    if (!LatestConsumer.this.gotFirstAssignment) {
                        LatestConsumer.this.maybeSeekToEnd();
                    }
                    LatestConsumer.this.gotFirstAssignment = true;
                }
            });
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException("Got interrupted", e);
        }
    }

    @Override // io.confluent.ksql.execution.scalablepush.consumer.ScalablePushConsumer
    protected void onEmptyRecords() {
        this.catchupCoordinator.checkShouldWaitForCatchup();
    }

    @Override // io.confluent.ksql.execution.scalablepush.consumer.ScalablePushConsumer
    protected void afterBatchProcessed() {
        this.catchupCoordinator.checkShouldWaitForCatchup();
    }

    @Override // io.confluent.ksql.execution.scalablepush.consumer.ScalablePushConsumer
    protected void onNewAssignment() {
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void maybeSeekToEnd() {
        Set<TopicPartition> set = this.topicPartitions.get();
        long millis = this.clock.millis() - this.ksqlConfig.getLong("ksql.query.push.v2.latest.reset.age.ms").longValue();
        HashMap hashMap = new HashMap();
        Iterator<TopicPartition> it = set.iterator();
        while (it.hasNext()) {
            hashMap.put(it.next(), Long.valueOf(millis));
        }
        Map offsetsForTimes = this.consumer.offsetsForTimes(hashMap);
        Map committed = this.consumer.committed(set);
        LOG.info("Latest maybe seeking to end offsetAndTimestampMap {}, offsetAndMetadataMap {}", offsetsForTimes, committed);
        boolean z = false;
        for (Map.Entry entry : offsetsForTimes.entrySet()) {
            OffsetAndMetadata offsetAndMetadata = (OffsetAndMetadata) committed.get(entry.getKey());
            if (offsetAndMetadata != null && entry.getValue() != null && ((OffsetAndTimestamp) entry.getValue()).offset() <= offsetAndMetadata.offset()) {
                z = true;
            }
        }
        if (z) {
            return;
        }
        this.consumer.seekToEnd(set);
        updateCurrentPositions();
        LOG.info("LatestConsumer seeking to end {}", this.currentPositions);
    }
}
