package com.hazelcast.jet.kafka.impl;

import com.hazelcast.function.FunctionEx;
import com.hazelcast.function.SupplierEx;
import com.hazelcast.jet.Traverser;
import com.hazelcast.jet.Traversers;
import com.hazelcast.jet.Util;
import com.hazelcast.jet.core.AbstractProcessor;
import com.hazelcast.jet.core.BroadcastKey;
import com.hazelcast.jet.core.EventTimeMapper;
import com.hazelcast.jet.core.EventTimePolicy;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.impl.util.LoggingUtil;
import java.lang.invoke.SerializedLambda;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.annotation.Nonnull;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.TimeoutException;

/* loaded from: input_file:com/hazelcast/jet/kafka/impl/StreamKafkaP.class */
public final class StreamKafkaP<K, V, T> extends AbstractProcessor {
    public static final int PREFERRED_LOCAL_PARALLELISM = 4;
    private static final long METADATA_CHECK_INTERVAL_NANOS;
    private static final String PARTITION_COUNTS_SNAPSHOT_KEY = "partitionCounts";
    private final Properties properties;
    private final FunctionEx<? super ConsumerRecord<K, V>, ? extends T> projectionFn;
    private final EventTimeMapper<? super T> eventTimeMapper;
    private List<String> topics;
    private int totalParallelism;
    private KafkaConsumer<K, V> consumer;
    private Traverser<Map.Entry<BroadcastKey<?>, ?>> snapshotTraverser;
    private int processorIndex;
    static final /* synthetic */ boolean $assertionsDisabled;
    Map<TopicPartition, Integer> currentAssignment = new HashMap();
    private long nextMetadataCheck = Long.MIN_VALUE;
    private final Map<String, long[]> offsets = new HashMap();
    private Traverser<Object> traverser = Traversers.empty();

    public StreamKafkaP(@Nonnull Properties properties, @Nonnull List<String> list, @Nonnull FunctionEx<? super ConsumerRecord<K, V>, ? extends T> functionEx, @Nonnull EventTimePolicy<? super T> eventTimePolicy) {
        this.properties = properties;
        this.topics = list;
        this.projectionFn = functionEx;
        this.eventTimeMapper = new EventTimeMapper<>(eventTimePolicy);
        Iterator<String> it = list.iterator();
        while (it.hasNext()) {
            this.offsets.put(it.next(), new long[0]);
        }
    }

    public boolean isCooperative() {
        return false;
    }

    protected void init(@Nonnull Processor.Context context) {
        List<String> list = (List) this.topics.stream().distinct().collect(Collectors.toList());
        if (list.size() != this.topics.size()) {
            ArrayList arrayList = new ArrayList(this.topics);
            Iterator<String> it = list.iterator();
            while (it.hasNext()) {
                arrayList.remove(it.next());
            }
            getLogger().warning("Duplicate topics found in topic list: " + arrayList);
        }
        this.topics = list;
        this.processorIndex = context.globalProcessorIndex();
        this.totalParallelism = context.totalParallelism();
        this.consumer = new KafkaConsumer<>(this.properties);
    }

    private void assignPartitions() {
        if (System.nanoTime() < this.nextMetadataCheck) {
            return;
        }
        for (int i = 0; i < this.topics.size(); i++) {
            try {
                List partitionsFor = this.consumer.partitionsFor(this.topics.get(i), Duration.ofSeconds(1L));
                handleNewPartitions(i, partitionsFor == null ? 0 : partitionsFor.size(), false);
            } catch (TimeoutException e) {
                getLogger().warning("Unable to get partition metadata, ignoring: " + e, e);
                return;
            }
        }
        this.nextMetadataCheck = System.nanoTime() + METADATA_CHECK_INTERVAL_NANOS;
    }

    private void handleNewPartitions(int i, int i2, boolean z) {
        String str = this.topics.get(i);
        long[] jArr = this.offsets.get(str);
        if (jArr.length >= i2) {
            return;
        }
        long[] copyOf = Arrays.copyOf(jArr, i2);
        Arrays.fill(copyOf, jArr.length, copyOf.length, -1L);
        this.offsets.put(str, copyOf);
        ArrayList arrayList = new ArrayList();
        for (int length = jArr.length; length < i2; length++) {
            if (handledByThisProcessor(i, length)) {
                TopicPartition topicPartition = new TopicPartition(str, length);
                this.currentAssignment.put(topicPartition, Integer.valueOf(this.currentAssignment.size()));
                arrayList.add(topicPartition);
            }
        }
        if (arrayList.isEmpty()) {
            return;
        }
        getLogger().info("New partition(s) assigned: " + arrayList);
        this.eventTimeMapper.addPartitions(arrayList.size());
        this.consumer.assign(this.currentAssignment.keySet());
        if (jArr.length > 0 && !z) {
            getLogger().info("Seeking to the beginning of newly-discovered partitions: " + arrayList);
            this.consumer.seekToBeginning(arrayList);
        }
        LoggingUtil.logFinest(getLogger(), "Currently assigned partitions: %s", this.currentAssignment);
    }

    public boolean complete() {
        if (!emitFromTraverser(this.traverser)) {
            return false;
        }
        ConsumerRecords consumerRecords = null;
        assignPartitions();
        if (!this.currentAssignment.isEmpty()) {
            consumerRecords = this.consumer.poll(Duration.ZERO);
        }
        this.traverser = isEmpty(consumerRecords) ? this.eventTimeMapper.flatMapIdle() : Traversers.traverseIterable(consumerRecords).flatMap(consumerRecord -> {
            this.offsets.get(consumerRecord.topic())[consumerRecord.partition()] = consumerRecord.offset();
            Object apply = this.projectionFn.apply(consumerRecord);
            if (apply == null) {
                return Traversers.empty();
            }
            return this.eventTimeMapper.flatMapEvent(apply, this.currentAssignment.get(new TopicPartition(consumerRecord.topic(), consumerRecord.partition())).intValue(), consumerRecord.timestamp());
        });
        emitFromTraverser(this.traverser);
        return false;
    }

    public void close() {
        if (this.consumer != null) {
            try {
                this.consumer.close();
            } catch (InterruptException e) {
            }
        }
    }

    public boolean saveToSnapshot() {
        if (!emitFromTraverser(this.traverser)) {
            return false;
        }
        if (this.snapshotTraverser == null) {
            this.snapshotTraverser = Traversers.traverseStream(this.offsets.entrySet().stream().flatMap(entry -> {
                return IntStream.range(0, ((long[]) entry.getValue()).length).filter(i -> {
                    return ((long[]) entry.getValue())[i] >= 0;
                }).mapToObj(i2 -> {
                    TopicPartition topicPartition = new TopicPartition((String) entry.getKey(), i2);
                    return Util.entry(BroadcastKey.broadcastKey(topicPartition), new long[]{((long[]) entry.getValue())[i2], this.eventTimeMapper.getWatermark(this.currentAssignment.get(topicPartition).intValue())});
                });
            })).onFirstNull(() -> {
                this.snapshotTraverser = null;
                if (getLogger().isFinestEnabled()) {
                    getLogger().finest("Finished saving snapshot. Saved offsets: " + offsets() + ", Saved watermarks: " + watermarks());
                }
            });
            if (this.processorIndex == 0) {
                this.snapshotTraverser = this.snapshotTraverser.append(Util.entry(BroadcastKey.broadcastKey(PARTITION_COUNTS_SNAPSHOT_KEY), this.topics.stream().collect(Collectors.toMap(str -> {
                    return str;
                }, str2 -> {
                    return Integer.valueOf(this.offsets.get(str2).length);
                }))));
            }
        }
        return emitFromTraverserToSnapshot(this.snapshotTraverser);
    }

    public void restoreFromSnapshot(@Nonnull Object obj, @Nonnull Object obj2) {
        Object key = ((BroadcastKey) obj).key();
        if (PARTITION_COUNTS_SNAPSHOT_KEY.equals(key)) {
            for (Map.Entry<K, V> entry : ((Map) obj2).entrySet()) {
                String str = (String) entry.getKey();
                int intValue = ((Integer) entry.getValue()).intValue();
                int indexOf = this.topics.indexOf(str);
                if (!$assertionsDisabled && indexOf < 0) {
                    throw new AssertionError();
                }
                handleNewPartitions(indexOf, intValue, true);
            }
            return;
        }
        TopicPartition topicPartition = (TopicPartition) key;
        long[] jArr = (long[]) obj2;
        long j = jArr[0];
        long j2 = jArr[1];
        if (!this.offsets.containsKey(topicPartition.topic())) {
            getLogger().warning("Offset for topic '" + topicPartition.topic() + "' is restored from the snapshot, but the topic is not supposed to be read, ignoring");
            return;
        }
        int indexOf2 = this.topics.indexOf(topicPartition.topic());
        if (!$assertionsDisabled && indexOf2 < 0) {
            throw new AssertionError();
        }
        handleNewPartitions(indexOf2, topicPartition.partition() + 1, true);
        if (handledByThisProcessor(indexOf2, topicPartition.partition())) {
            long[] jArr2 = this.offsets.get(topicPartition.topic());
            if (!$assertionsDisabled && jArr2[topicPartition.partition()] >= 0) {
                throw new AssertionError("duplicate offset for topicPartition '" + topicPartition + "' restored, offset1=" + jArr2[topicPartition.partition()] + ", offset2=" + j);
            }
            jArr2[topicPartition.partition()] = j;
            this.consumer.seek(topicPartition, j + 1);
            Integer num = this.currentAssignment.get(topicPartition);
            if (!$assertionsDisabled && num == null) {
                throw new AssertionError();
            }
            this.eventTimeMapper.restoreWatermark(num.intValue(), j2);
        }
    }

    public boolean finishSnapshotRestore() {
        if (!getLogger().isFineEnabled()) {
            return true;
        }
        getLogger().fine("Finished restoring snapshot. Restored offsets: " + offsets() + " and watermarks:" + watermarks());
        return true;
    }

    private boolean isEmpty(ConsumerRecords<K, V> consumerRecords) {
        return consumerRecords == null || consumerRecords.isEmpty();
    }

    private Map<TopicPartition, Long> offsets() {
        return (Map) this.currentAssignment.keySet().stream().collect(Collectors.toMap(topicPartition -> {
            return topicPartition;
        }, topicPartition2 -> {
            return Long.valueOf(this.offsets.get(topicPartition2.topic())[topicPartition2.partition()]);
        }));
    }

    private Map<TopicPartition, Long> watermarks() {
        return (Map) this.currentAssignment.entrySet().stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry -> {
            return Long.valueOf(this.eventTimeMapper.getWatermark(((Integer) entry.getValue()).intValue()));
        }));
    }

    @Nonnull
    public static <K, V, T> SupplierEx<Processor> processorSupplier(@Nonnull Properties properties, @Nonnull List<String> list, @Nonnull FunctionEx<? super ConsumerRecord<K, V>, ? extends T> functionEx, @Nonnull EventTimePolicy<? super T> eventTimePolicy) {
        return () -> {
            return new StreamKafkaP(properties, list, functionEx, eventTimePolicy);
        };
    }

    private boolean handledByThisProcessor(int i, int i2) {
        return handledByThisProcessor(this.totalParallelism, this.offsets.size(), this.processorIndex, i, i2);
    }

    static boolean handledByThisProcessor(int i, int i2, int i3, int i4, int i5) {
        return ((i4 * Math.max(1, i / i2)) + i5) % i == i3;
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 1239494432:
                if (implMethodName.equals("lambda$processorSupplier$4d4d6e34$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/SupplierEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("getEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/kafka/impl/StreamKafkaP") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/Properties;Ljava/util/List;Lcom/hazelcast/function/FunctionEx;Lcom/hazelcast/jet/core/EventTimePolicy;)Lcom/hazelcast/jet/core/Processor;")) {
                    Properties properties = (Properties) serializedLambda.getCapturedArg(0);
                    List list = (List) serializedLambda.getCapturedArg(1);
                    FunctionEx functionEx = (FunctionEx) serializedLambda.getCapturedArg(2);
                    EventTimePolicy eventTimePolicy = (EventTimePolicy) serializedLambda.getCapturedArg(3);
                    return () -> {
                        return new StreamKafkaP(properties, list, functionEx, eventTimePolicy);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }

    static {
        $assertionsDisabled = !StreamKafkaP.class.desiredAssertionStatus();
        METADATA_CHECK_INTERVAL_NANOS = TimeUnit.SECONDS.toNanos(5L);
    }
}
