package com.hazelcast.jet.kafka.connect.impl;

import com.hazelcast.function.FunctionEx;
import com.hazelcast.internal.metrics.DynamicMetricsProvider;
import com.hazelcast.internal.metrics.MetricDescriptor;
import com.hazelcast.internal.metrics.MetricsCollectionContext;
import com.hazelcast.internal.metrics.impl.ProviderHelper;
import com.hazelcast.internal.util.Preconditions;
import com.hazelcast.internal.util.Timer;
import com.hazelcast.jet.Traverser;
import com.hazelcast.jet.Traversers;
import com.hazelcast.jet.core.AbstractProcessor;
import com.hazelcast.jet.core.BroadcastKey;
import com.hazelcast.jet.core.EventTimeMapper;
import com.hazelcast.jet.core.EventTimePolicy;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.ProcessorSupplier;
import com.hazelcast.jet.impl.util.Util;
import java.time.Duration;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.kafka.connect.source.SourceRecord;

/* loaded from: input_file:com/hazelcast/jet/kafka/connect/impl/ReadKafkaConnectP.class */
public class ReadKafkaConnectP<T> extends AbstractProcessor implements DynamicMetricsProvider {
    private final ConnectorWrapper connectorWrapper;
    private final EventTimeMapper<T> eventTimeMapper;
    private final FunctionEx<SourceRecord, T> projectionFn;
    private TaskRunner taskRunner;
    private boolean snapshotInProgress;
    private Traverser<Map.Entry<BroadcastKey<String>, State>> snapshotTraverser;
    private boolean snapshotsEnabled;
    private int processorIndex;
    private Traverser<?> traverser = Traversers.empty();
    private final LocalKafkaConnectStatsImpl localKafkaConnectStats = new LocalKafkaConnectStatsImpl();

    public ReadKafkaConnectP(@Nonnull ConnectorWrapper connectorWrapper, @Nonnull EventTimePolicy<? super T> eventTimePolicy, @Nonnull FunctionEx<SourceRecord, T> functionEx) {
        Preconditions.checkNotNull(connectorWrapper, "connectorWrapper is required");
        Preconditions.checkNotNull(eventTimePolicy, "eventTimePolicy is required");
        Preconditions.checkNotNull(functionEx, "projectionFn is required");
        this.connectorWrapper = connectorWrapper;
        this.eventTimeMapper = new EventTimeMapper<>(eventTimePolicy);
        this.projectionFn = functionEx;
        this.eventTimeMapper.addPartitions(1);
    }

    protected void init(@Nonnull Processor.Context context) {
        this.taskRunner = this.connectorWrapper.createTaskRunner();
        this.snapshotsEnabled = context.snapshottingEnabled();
        this.processorIndex = context.globalProcessorIndex();
        Util.getNodeEngine(context.hazelcastInstance()).getMetricsRegistry().registerDynamicMetricsProvider(this);
    }

    public boolean isCooperative() {
        return false;
    }

    public boolean complete() {
        if (this.snapshotInProgress || !emitFromTraverser(this.traverser)) {
            return false;
        }
        long nanos = Timer.nanos();
        List<SourceRecord> poll = this.taskRunner.poll();
        this.localKafkaConnectStats.addSourceRecordPollDuration(Duration.ofNanos(Timer.nanosElapsed(nanos)));
        this.localKafkaConnectStats.incrementSourceRecordPoll(poll.size());
        this.traverser = poll.isEmpty() ? this.eventTimeMapper.flatMapIdle() : Traversers.traverseIterable(poll).flatMap(sourceRecord -> {
            long longValue = sourceRecord.timestamp() == null ? Long.MIN_VALUE : sourceRecord.timestamp().longValue();
            Object apply = this.projectionFn.apply(sourceRecord);
            this.taskRunner.commitRecord(sourceRecord);
            return this.eventTimeMapper.flatMapEvent(apply, 0, longValue);
        });
        emitFromTraverser(this.traverser);
        return false;
    }

    public boolean saveToSnapshot() {
        if (!this.snapshotsEnabled) {
            return true;
        }
        this.snapshotInProgress = true;
        if (this.snapshotTraverser == null) {
            this.snapshotTraverser = Traversers.singleton(com.hazelcast.jet.Util.entry(snapshotKey(), this.taskRunner.createSnapshot())).onFirstNull(() -> {
                this.snapshotTraverser = null;
                getLogger().finest("Finished saving snapshot");
            });
        }
        return emitFromTraverserToSnapshot(this.snapshotTraverser);
    }

    private BroadcastKey<String> snapshotKey() {
        return BroadcastKey.broadcastKey("snapshot-" + this.processorIndex);
    }

    protected void restoreFromSnapshot(@Nonnull Object obj, @Nonnull Object obj2) {
        if (snapshotKey().equals(obj)) {
            this.taskRunner.restoreSnapshot((State) obj2);
        }
    }

    public boolean snapshotCommitFinish(boolean z) {
        if (z) {
            try {
                this.taskRunner.commit();
            } finally {
                this.snapshotInProgress = false;
            }
        }
        return true;
    }

    public void close() {
        if (this.taskRunner != null) {
            this.taskRunner.stop();
        }
    }

    public Map<String, LocalKafkaConnectStats> getStats() {
        HashMap hashMap = new HashMap();
        if (this.taskRunner != null) {
            hashMap.put(this.taskRunner.getName(), this.localKafkaConnectStats);
        }
        return hashMap;
    }

    public void provideDynamicMetrics(MetricDescriptor metricDescriptor, MetricsCollectionContext metricsCollectionContext) {
        if (this.taskRunner != null) {
            metricDescriptor.copy().withTag("task.runner", this.taskRunner.getName());
        }
        ProviderHelper.provide(metricDescriptor, metricsCollectionContext, "kafka.connect", getStats());
    }

    public static <T> ProcessorSupplier processSupplier(@Nonnull final Properties properties, @Nonnull final EventTimePolicy<? super T> eventTimePolicy, @Nonnull final FunctionEx<SourceRecord, T> functionEx) {
        return new ProcessorSupplier() { // from class: com.hazelcast.jet.kafka.connect.impl.ReadKafkaConnectP.1
            private transient ConnectorWrapper connectorWrapper;

            public void init(@Nonnull ProcessorSupplier.Context context) {
                properties.put("tasks.max", Integer.toString(context.localParallelism()));
                this.connectorWrapper = new ConnectorWrapper(properties);
            }

            public void close(@Nullable Throwable th) {
                if (this.connectorWrapper != null) {
                    this.connectorWrapper.stop();
                }
            }

            @Nonnull
            public Collection<? extends Processor> get(int i) {
                IntStream range = IntStream.range(0, i);
                EventTimePolicy eventTimePolicy2 = eventTimePolicy;
                FunctionEx functionEx2 = functionEx;
                return (Collection) range.mapToObj(i2 -> {
                    return new ReadKafkaConnectP(this.connectorWrapper, eventTimePolicy2, functionEx2);
                }).collect(Collectors.toList());
            }
        };
    }
}
