package org.apache.beam.sdk.io.kafka;

import java.lang.invoke.SerializedLambda;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.schemas.NoSuchSchemaException;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.Max;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaCommitOffset.class */
public class KafkaCommitOffset<K, V> extends PTransform<PCollection<KV<KafkaSourceDescriptor, KafkaRecord<K, V>>>, PCollection<Void>> {
    private final KafkaIO.ReadSourceDescriptors<K, V> readSourceDescriptors;
    private final boolean use259implementation;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaCommitOffset$CommitOffsetDoFn.class */
    public static class CommitOffsetDoFn extends DoFn<KV<KafkaSourceDescriptor, Long>, Void> {
        private static final Logger LOG = LoggerFactory.getLogger(CommitOffsetDoFn.class);
        private final Map<String, Object> consumerConfig;
        private final SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> consumerFactoryFn;

        CommitOffsetDoFn(KafkaIO.ReadSourceDescriptors<?, ?> readSourceDescriptors) {
            this.consumerConfig = readSourceDescriptors.getConsumerConfig();
            this.consumerFactoryFn = readSourceDescriptors.getConsumerFactoryFn();
        }

        @DoFn.ProcessElement
        @DoFn.RequiresStableInput
        public void processElement(@DoFn.Element KV<KafkaSourceDescriptor, Long> kv) {
            Consumer consumer = (Consumer) this.consumerFactoryFn.apply(overrideBootstrapServersConfig(this.consumerConfig, (KafkaSourceDescriptor) kv.getKey()));
            try {
                try {
                    consumer.commitSync(Collections.singletonMap(((KafkaSourceDescriptor) kv.getKey()).getTopicPartition(), new OffsetAndMetadata(((Long) kv.getValue()).longValue() + 1)));
                } catch (Exception e) {
                    LOG.warn("Getting exception when committing offset: {}", e.getMessage());
                }
                if (consumer != null) {
                    consumer.close();
                }
            } catch (Throwable th) {
                if (consumer != null) {
                    try {
                        consumer.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }

        private Map<String, Object> overrideBootstrapServersConfig(Map<String, Object> map, KafkaSourceDescriptor kafkaSourceDescriptor) {
            Preconditions.checkState(map.containsKey("bootstrap.servers") || kafkaSourceDescriptor.getBootStrapServers() != null);
            HashMap hashMap = new HashMap(map);
            if (kafkaSourceDescriptor.getBootStrapServers() != null && !kafkaSourceDescriptor.getBootStrapServers().isEmpty()) {
                hashMap.put("bootstrap.servers", String.join(",", kafkaSourceDescriptor.getBootStrapServers()));
            }
            return hashMap;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaCommitOffset$MaxOffsetFn.class */
    public static final class MaxOffsetFn<K, V> extends DoFn<KV<KafkaSourceDescriptor, KafkaRecord<K, V>>, KV<KafkaSourceDescriptor, Long>> {
        private transient Map<KafkaSourceDescriptor, OffsetAndTimestamp> maxObserved;

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaCommitOffset$MaxOffsetFn$OffsetAndTimestamp.class */
        public static class OffsetAndTimestamp {
            long offset;
            Instant timestamp;

            OffsetAndTimestamp(long j, Instant instant) {
                this.offset = j;
                this.timestamp = instant;
            }

            void merge(long j, Instant instant) {
                if (this.offset < j) {
                    this.offset = j;
                    this.timestamp = instant;
                }
            }
        }

        private MaxOffsetFn() {
        }

        @DoFn.StartBundle
        public void startBundle() {
            if (this.maxObserved == null) {
                this.maxObserved = new HashMap();
            } else {
                this.maxObserved.clear();
            }
        }

        @DoFn.ProcessElement
        @DoFn.RequiresStableInput
        public void processElement(@DoFn.Element KV<KafkaSourceDescriptor, KafkaRecord<K, V>> kv, @DoFn.Timestamp Instant instant) {
            this.maxObserved.compute((KafkaSourceDescriptor) kv.getKey(), (kafkaSourceDescriptor, offsetAndTimestamp) -> {
                long offset = ((KafkaRecord) kv.getValue()).getOffset();
                if (offsetAndTimestamp == null) {
                    return new OffsetAndTimestamp(offset, instant);
                }
                offsetAndTimestamp.merge(offset, instant);
                return offsetAndTimestamp;
            });
        }

        @DoFn.FinishBundle
        public void finishBundle(DoFn<KV<KafkaSourceDescriptor, KafkaRecord<K, V>>, KV<KafkaSourceDescriptor, Long>>.FinishBundleContext finishBundleContext) {
            this.maxObserved.forEach((kafkaSourceDescriptor, offsetAndTimestamp) -> {
                finishBundleContext.output(KV.of(kafkaSourceDescriptor, Long.valueOf(offsetAndTimestamp.offset)), offsetAndTimestamp.timestamp, GlobalWindow.INSTANCE);
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public KafkaCommitOffset(KafkaIO.ReadSourceDescriptors<K, V> readSourceDescriptors, boolean z) {
        this.readSourceDescriptors = readSourceDescriptors;
        this.use259implementation = z;
    }

    public PCollection<Void> expand(PCollection<KV<KafkaSourceDescriptor, KafkaRecord<K, V>>> pCollection) {
        try {
            return (this.use259implementation ? (PCollection) pCollection.apply(MapElements.into(new TypeDescriptor<KV<KafkaSourceDescriptor, Long>>() { // from class: org.apache.beam.sdk.io.kafka.KafkaCommitOffset.1
            }).via(kv -> {
                return KV.of((KafkaSourceDescriptor) kv.getKey(), Long.valueOf(((KafkaRecord) kv.getValue()).getOffset()));
            })) : pCollection.apply(ParDo.of(new MaxOffsetFn()))).setCoder(KvCoder.of(pCollection.getPipeline().getSchemaRegistry().getSchemaCoder(KafkaSourceDescriptor.class), VarLongCoder.of())).apply(Window.into(FixedWindows.of(Duration.standardMinutes(5L)))).apply(Max.longsPerKey()).apply(ParDo.of(new CommitOffsetDoFn(this.readSourceDescriptors))).setCoder(VoidCoder.of());
        } catch (NoSuchSchemaException e) {
            throw new RuntimeException(e.getMessage());
        }
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -2095434817:
                if (implMethodName.equals("lambda$expand$b372281d$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/kafka/KafkaCommitOffset") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/sdk/values/KV;)Lorg/apache/beam/sdk/values/KV;")) {
                    return kv -> {
                        return KV.of((KafkaSourceDescriptor) kv.getKey(), Long.valueOf(((KafkaRecord) kv.getValue()).getOffset()));
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
