package io.micronaut.configuration.kafka.intercept;

import io.micronaut.aop.MethodInterceptor;
import io.micronaut.aop.MethodInvocationContext;
import io.micronaut.configuration.kafka.annotation.KafkaClient;
import io.micronaut.configuration.kafka.annotation.KafkaKey;
import io.micronaut.configuration.kafka.annotation.KafkaTimestamp;
import io.micronaut.configuration.kafka.annotation.Topic;
import io.micronaut.configuration.kafka.config.AbstractKafkaProducerConfiguration;
import io.micronaut.configuration.kafka.config.DefaultKafkaProducerConfiguration;
import io.micronaut.configuration.kafka.config.KafkaProducerConfiguration;
import io.micronaut.configuration.kafka.serde.SerdeRegistry;
import io.micronaut.context.BeanContext;
import io.micronaut.core.annotation.AnnotationMetadata;
import io.micronaut.core.annotation.AnnotationValue;
import io.micronaut.core.async.publisher.Publishers;
import io.micronaut.core.bind.annotation.Bindable;
import io.micronaut.core.convert.ConversionService;
import io.micronaut.core.type.Argument;
import io.micronaut.core.type.ReturnType;
import io.micronaut.core.util.StringUtils;
import io.micronaut.inject.qualifiers.Qualifiers;
import io.micronaut.messaging.annotation.Body;
import io.micronaut.messaging.annotation.Header;
import io.micronaut.messaging.exceptions.MessagingClientException;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import javax.annotation.PreDestroy;
import javax.inject.Singleton;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
/* loaded from: input_file:io/micronaut/configuration/kafka/intercept/KafkaClientIntroductionAdvice.class */
public class KafkaClientIntroductionAdvice implements MethodInterceptor<Object, Object>, AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaClientIntroductionAdvice.class);
    private final BeanContext beanContext;
    private final SerdeRegistry serdeRegistry;
    private final ConversionService<?> conversionService;
    private final Map<ProducerKey, Producer> producerMap = new ConcurrentHashMap();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/micronaut/configuration/kafka/intercept/KafkaClientIntroductionAdvice$ProducerKey.class */
    public final class ProducerKey {
        final Class keyType;
        final Class valueType;
        final String id;

        ProducerKey(Class cls, Class cls2, String str) {
            this.keyType = cls;
            this.valueType = cls2;
            this.id = str;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            ProducerKey producerKey = (ProducerKey) obj;
            return Objects.equals(this.keyType, producerKey.keyType) && Objects.equals(this.valueType, producerKey.valueType) && Objects.equals(this.id, producerKey.id);
        }

        public int hashCode() {
            return Objects.hash(this.keyType, this.valueType, this.id);
        }
    }

    public KafkaClientIntroductionAdvice(BeanContext beanContext, SerdeRegistry serdeRegistry, ConversionService<?> conversionService) {
        this.beanContext = beanContext;
        this.serdeRegistry = serdeRegistry;
        this.conversionService = conversionService;
    }

    public final Object intercept(final MethodInvocationContext<Object, Object> methodInvocationContext) {
        Flowable<Object> buildSendFlowable;
        Headers headers;
        Serializer pickSerializer;
        if (!methodInvocationContext.hasAnnotation(KafkaClient.class)) {
            return methodInvocationContext.proceed();
        }
        AnnotationValue annotationValue = (AnnotationValue) methodInvocationContext.findAnnotation(KafkaClient.class).orElseThrow(() -> {
            return new IllegalStateException("No @KafkaClient annotation present on method: " + methodInvocationContext);
        });
        boolean isTrue = annotationValue.isTrue("batch");
        String str = (String) methodInvocationContext.stringValue(Topic.class).orElse(null);
        Argument argument = null;
        Argument argument2 = null;
        ArrayList arrayList = new ArrayList();
        for (AnnotationValue annotationValue2 : methodInvocationContext.getAnnotationValuesByType(Header.class)) {
            String str2 = (String) annotationValue2.stringValue("name").orElse(null);
            String str3 = (String) annotationValue2.stringValue().orElse(null);
            if (StringUtils.isNotEmpty(str2) && StringUtils.isNotEmpty(str3)) {
                arrayList.add(new RecordHeader(str2, str3.getBytes(StandardCharsets.UTF_8)));
            }
        }
        Argument[] arguments = methodInvocationContext.getArguments();
        Object[] parameterValues = methodInvocationContext.getParameterValues();
        Object obj = null;
        Object obj2 = null;
        Long l = null;
        for (int i = 0; i < arguments.length; i++) {
            Argument argument3 = arguments[i];
            if (ProducerRecord.class.isAssignableFrom(argument3.getType()) || argument3.isAnnotationPresent(Body.class)) {
                argument2 = argument3;
                obj2 = parameterValues[i];
            } else if (argument3.isAnnotationPresent(KafkaKey.class)) {
                argument = argument3;
                obj = parameterValues[i];
            } else if (argument3.isAnnotationPresent(Topic.class)) {
                Object obj3 = parameterValues[i];
                if (obj3 != null) {
                    str = obj3.toString();
                }
            } else if (argument3.isAnnotationPresent(KafkaTimestamp.class)) {
                Object obj4 = parameterValues[i];
                if (obj4 instanceof Long) {
                    l = (Long) obj4;
                }
            } else if (argument3.isAnnotationPresent(Header.class)) {
                AnnotationMetadata annotationMetadata = argument3.getAnnotationMetadata();
                String name = argument3.getName();
                String str4 = (String) annotationMetadata.stringValue(Header.class, "name").orElseGet(() -> {
                    return (String) annotationMetadata.stringValue(Header.class).orElse(name);
                });
                Object obj5 = parameterValues[i];
                if (obj5 != null && (pickSerializer = this.serdeRegistry.pickSerializer(argument3)) != null) {
                    try {
                        arrayList.add(new RecordHeader(str4, pickSerializer.serialize((String) null, obj5)));
                    } catch (Exception e) {
                        throw new MessagingClientException("Cannot serialize header argument [" + argument3 + "] for method [" + methodInvocationContext + "]: " + e.getMessage(), e);
                    }
                }
            } else if (argument3.isContainerType() && org.apache.kafka.common.header.Header.class.isAssignableFrom(((Argument) argument3.getFirstTypeVariable().orElse(Argument.OBJECT_ARGUMENT)).getType())) {
                Collection<? extends org.apache.kafka.common.header.Header> collection = (Collection) parameterValues[i];
                if (collection != null) {
                    arrayList.addAll(collection);
                }
            } else {
                Class type = argument3.getType();
                if ((type == Headers.class || type == RecordHeaders.class) && (headers = (Headers) parameterValues[i]) != null) {
                    arrayList.getClass();
                    headers.forEach((v1) -> {
                        r1.add(v1);
                    });
                }
            }
        }
        if (argument2 == null) {
            int i2 = 0;
            while (true) {
                if (i2 >= arguments.length) {
                    break;
                }
                Argument argument4 = arguments[i2];
                if (!argument4.getAnnotationMetadata().hasStereotype(Bindable.class)) {
                    argument2 = argument4;
                    obj2 = parameterValues[i2];
                    break;
                }
                i2++;
            }
            if (argument2 == null) {
                throw new MessagingClientException("No valid message body argument found for method: " + methodInvocationContext);
            }
        }
        ReturnType<Object> returnType = methodInvocationContext.getReturnType();
        Class<?> type2 = returnType.getType();
        Producer producer = getProducer(argument2, argument, methodInvocationContext);
        Long valueOf = annotationValue.isTrue("timestamp") ? Long.valueOf(System.currentTimeMillis()) : l;
        boolean isConvertibleToPublisher = Publishers.isConvertibleToPublisher(type2);
        Duration duration = (Duration) methodInvocationContext.getValue(KafkaClient.class, "maxBlock", Duration.class).orElse(null);
        boolean z = obj2 != null && Publishers.isConvertibleToPublisher(obj2.getClass());
        if (StringUtils.isEmpty(str)) {
            throw new MessagingClientException("No topic specified for method: " + methodInvocationContext);
        }
        if (isConvertibleToPublisher) {
            if (z) {
                buildSendFlowable = buildSendFlowable(methodInvocationContext, str, producer, arrayList, (Argument<?>) returnType.getFirstTypeVariable().orElse(Argument.OBJECT_ARGUMENT), obj, obj2, valueOf, duration);
            } else if (isTrue) {
                Object asList = (obj2 == null || !obj2.getClass().isArray()) ? obj2 : Arrays.asList((Object[]) obj2);
                String str5 = str;
                Argument argument5 = argument2;
                Object obj6 = obj;
                buildSendFlowable = (asList instanceof Iterable ? Flowable.fromIterable((Iterable) asList) : Flowable.just(asList)).flatMap(obj7 -> {
                    return buildSendFlowable((MethodInvocationContext<Object, Object>) methodInvocationContext, str5, argument5, producer, (List<org.apache.kafka.common.header.Header>) arrayList, (ReturnType<Object>) returnType, obj6, obj7, valueOf);
                });
            } else {
                buildSendFlowable = buildSendFlowable(methodInvocationContext, str, argument2, producer, arrayList, returnType, obj, obj2, valueOf);
            }
            return Publishers.convertPublisher(buildSendFlowable, type2);
        }
        if (Future.class.isAssignableFrom(type2)) {
            Optional firstTypeVariable = returnType.getFirstTypeVariable();
            final CompletableFuture completableFuture = new CompletableFuture();
            if (z) {
                Flowable<Object> buildSendFlowable2 = buildSendFlowable(methodInvocationContext, str, producer, arrayList, (Argument<?>) firstTypeVariable.orElse(Argument.of(RecordMetadata.class)), obj, obj2, valueOf, duration);
                if (!Publishers.isSingle(obj2.getClass())) {
                    buildSendFlowable2 = buildSendFlowable2.toList().toFlowable();
                }
                buildSendFlowable2.subscribe(new Subscriber() { // from class: io.micronaut.configuration.kafka.intercept.KafkaClientIntroductionAdvice.1
                    boolean completed = false;

                    public void onSubscribe(Subscription subscription) {
                        subscription.request(1L);
                    }

                    public void onNext(Object obj8) {
                        completableFuture.complete(obj8);
                        this.completed = true;
                    }

                    public void onError(Throwable th) {
                        completableFuture.completeExceptionally(KafkaClientIntroductionAdvice.this.wrapException(methodInvocationContext, th));
                    }

                    public void onComplete() {
                        if (this.completed) {
                            return;
                        }
                        completableFuture.complete(null);
                    }
                });
            } else {
                ProducerRecord buildProducerRecord = buildProducerRecord(str, arrayList, obj, obj2, valueOf);
                if (LOG.isTraceEnabled()) {
                    LOG.trace("@KafkaClient method [" + methodInvocationContext + "] Sending producer record: " + buildProducerRecord);
                }
                Argument argument6 = argument2;
                Object obj8 = obj2;
                producer.send(buildProducerRecord, (recordMetadata, exc) -> {
                    if (exc != null) {
                        completableFuture.completeExceptionally(wrapException(methodInvocationContext, exc));
                        return;
                    }
                    if (!firstTypeVariable.isPresent()) {
                        completableFuture.complete(null);
                        return;
                    }
                    Argument argument7 = (Argument) firstTypeVariable.get();
                    Optional convert = this.conversionService.convert(recordMetadata, argument7);
                    if (convert.isPresent()) {
                        completableFuture.complete(convert.get());
                    } else if (argument7.getType() == argument6.getType()) {
                        completableFuture.complete(obj8);
                    }
                });
            }
            return completableFuture;
        }
        Argument<?> asArgument = returnType.asArgument();
        if (z) {
            Flowable<Object> buildSendFlowable3 = buildSendFlowable(methodInvocationContext, str, producer, arrayList, asArgument, obj, obj2, valueOf, duration);
            return Iterable.class.isAssignableFrom(type2) ? this.conversionService.convert(buildSendFlowable3.toList().blockingGet(), asArgument).orElse(null) : Void.TYPE.isAssignableFrom(type2) ? buildSendFlowable3.firstElement().blockingGet() : this.conversionService.convert(buildSendFlowable3.blockingFirst(), asArgument).orElse(null);
        }
        try {
            if (!isTrue) {
                ProducerRecord buildProducerRecord2 = buildProducerRecord(str, arrayList, obj, obj2, valueOf);
                if (LOG.isTraceEnabled()) {
                    LOG.trace("@KafkaClient method [" + methodInvocationContext + "] Sending producer record: " + buildProducerRecord2);
                }
                Object obj9 = duration != null ? producer.send(buildProducerRecord2).get(duration.toMillis(), TimeUnit.MILLISECONDS) : producer.send(buildProducerRecord2).get();
                Argument argument7 = argument2;
                Object obj10 = obj2;
                return this.conversionService.convert(obj9, asArgument).orElseGet(() -> {
                    if (type2 == argument7.getType()) {
                        return obj10;
                    }
                    return null;
                });
            }
            Iterable singletonList = (obj2 == null || !obj2.getClass().isArray()) ? !(obj2 instanceof Iterable) ? Collections.singletonList(obj2) : (Iterable) obj2 : Arrays.asList((Object[]) obj2);
            ArrayList arrayList2 = new ArrayList();
            Iterator it = singletonList.iterator();
            while (it.hasNext()) {
                ProducerRecord buildProducerRecord3 = buildProducerRecord(str, arrayList, obj, it.next(), valueOf);
                if (LOG.isTraceEnabled()) {
                    LOG.trace("@KafkaClient method [" + methodInvocationContext + "] Sending producer record: " + buildProducerRecord3);
                }
                arrayList2.add(duration != null ? producer.send(buildProducerRecord3).get(duration.toMillis(), TimeUnit.MILLISECONDS) : producer.send(buildProducerRecord3).get());
            }
            Argument argument8 = argument2;
            Object obj11 = obj2;
            return this.conversionService.convert(arrayList2, asArgument).orElseGet(() -> {
                if (type2 == argument8.getType()) {
                    return obj11;
                }
                return null;
            });
        } catch (Exception e2) {
            throw wrapException(methodInvocationContext, e2);
        }
    }

    @Override // java.lang.AutoCloseable
    @PreDestroy
    public final void close() {
        try {
            Iterator<Producer> it = this.producerMap.values().iterator();
            while (it.hasNext()) {
                try {
                    it.next().close();
                } catch (Exception e) {
                    if (LOG.isWarnEnabled()) {
                        LOG.warn("Error closing Kafka producer: " + e.getMessage(), e);
                    }
                }
            }
        } finally {
            this.producerMap.clear();
        }
    }

    private Flowable buildSendFlowable(MethodInvocationContext<Object, Object> methodInvocationContext, String str, Argument argument, Producer producer, List<org.apache.kafka.common.header.Header> list, ReturnType<Object> returnType, Object obj, Object obj2, Long l) {
        ProducerRecord buildProducerRecord = buildProducerRecord(str, list, obj, obj2, l);
        Optional firstTypeVariable = returnType.getFirstTypeVariable();
        return Flowable.create(flowableEmitter -> {
            producer.send(buildProducerRecord, (recordMetadata, exc) -> {
                if (exc != null) {
                    flowableEmitter.onError(wrapException(methodInvocationContext, exc));
                    return;
                }
                if (firstTypeVariable.isPresent()) {
                    Argument argument2 = (Argument) firstTypeVariable.get();
                    Optional convert = this.conversionService.convert(recordMetadata, argument2);
                    if (convert.isPresent()) {
                        flowableEmitter.onNext(convert.get());
                    } else if (argument2.getType() == argument.getType()) {
                        flowableEmitter.onNext(obj2);
                    }
                }
                flowableEmitter.onComplete();
            });
        }, BackpressureStrategy.ERROR);
    }

    private Flowable<Object> buildSendFlowable(MethodInvocationContext<Object, Object> methodInvocationContext, String str, Producer producer, List<org.apache.kafka.common.header.Header> list, Argument<?> argument, Object obj, Object obj2, Long l, Duration duration) {
        Flowable flowable = (Flowable) Publishers.convertPublisher(obj2, Flowable.class);
        Class type = argument.getType();
        if (Iterable.class.isAssignableFrom(type)) {
            type = ((Argument) argument.getFirstTypeVariable().orElse(Argument.OBJECT_ARGUMENT)).getType();
        }
        Class cls = type;
        Flowable<Object> flatMap = flowable.flatMap(obj3 -> {
            ProducerRecord buildProducerRecord = buildProducerRecord(str, list, obj, obj3, l);
            if (LOG.isTraceEnabled()) {
                LOG.trace("@KafkaClient method [" + methodInvocationContext + "] Sending producer record: " + buildProducerRecord);
            }
            return Flowable.create(flowableEmitter -> {
                producer.send(buildProducerRecord, (recordMetadata, exc) -> {
                    if (exc != null) {
                        flowableEmitter.onError(wrapException(methodInvocationContext, exc));
                        return;
                    }
                    if (RecordMetadata.class.isAssignableFrom(cls)) {
                        flowableEmitter.onNext(recordMetadata);
                    } else if (cls.isInstance(obj3)) {
                        flowableEmitter.onNext(obj3);
                    } else {
                        Optional convert = this.conversionService.convert(recordMetadata, cls);
                        if (convert.isPresent()) {
                            flowableEmitter.onNext(convert.get());
                        }
                    }
                    flowableEmitter.onComplete();
                });
            }, BackpressureStrategy.BUFFER);
        });
        if (duration != null) {
            flatMap = flatMap.timeout(duration.toMillis(), TimeUnit.MILLISECONDS);
        }
        return flatMap;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public MessagingClientException wrapException(MethodInvocationContext<Object, Object> methodInvocationContext, Throwable th) {
        return new MessagingClientException("Exception sending producer record for method [" + methodInvocationContext + "]: " + th.getMessage(), th);
    }

    private ProducerRecord buildProducerRecord(String str, List<org.apache.kafka.common.header.Header> list, Object obj, Object obj2, Long l) {
        return new ProducerRecord(str, (Integer) null, l, obj, obj2, list.isEmpty() ? null : list);
    }

    private Producer getProducer(Argument argument, @Nullable Argument argument2, AnnotationMetadata annotationMetadata) {
        Class<byte[]> type = argument2 != null ? argument2.getType() : byte[].class;
        String str = (String) annotationMetadata.getValue(KafkaClient.class, String.class).orElse(null);
        return this.producerMap.computeIfAbsent(new ProducerKey(type, argument.getType(), str), producerKey -> {
            AbstractKafkaProducerConfiguration abstractKafkaProducerConfiguration;
            String str2 = producerKey.id;
            if (str2 != null) {
                Optional findBean = this.beanContext.findBean(KafkaProducerConfiguration.class, Qualifiers.byName(str2));
                abstractKafkaProducerConfiguration = findBean.isPresent() ? (AbstractKafkaProducerConfiguration) findBean.get() : (AbstractKafkaProducerConfiguration) this.beanContext.getBean(AbstractKafkaProducerConfiguration.class);
            } else {
                abstractKafkaProducerConfiguration = (AbstractKafkaProducerConfiguration) this.beanContext.getBean(AbstractKafkaProducerConfiguration.class);
            }
            DefaultKafkaProducerConfiguration defaultKafkaProducerConfiguration = new DefaultKafkaProducerConfiguration(abstractKafkaProducerConfiguration);
            Properties config = defaultKafkaProducerConfiguration.getConfig();
            if (str != null) {
                config.putIfAbsent("client.id", str);
            }
            annotationMetadata.getValue(KafkaClient.class, "maxBlock", Duration.class).ifPresent(duration -> {
                config.put("max.block.ms", String.valueOf(duration.toMillis()));
            });
            Integer num = (Integer) annotationMetadata.getValue(KafkaClient.class, "acks", Integer.class).orElse(Integer.valueOf(KafkaClient.Acknowledge.DEFAULT));
            if (num.intValue() != Integer.MIN_VALUE) {
                config.put("acks", num.intValue() == -1 ? "all" : String.valueOf(num));
            }
            Optional map = annotationMetadata.findAnnotation(KafkaClient.class).map(annotationValue -> {
                return annotationValue.getProperties("properties", "name");
            });
            config.getClass();
            map.ifPresent(config::putAll);
            if (LOG.isDebugEnabled()) {
                LOG.debug("Creating new KafkaProducer.");
            }
            if (!config.containsKey("key.serializer") && ((Serializer) defaultKafkaProducerConfiguration.getKeySerializer().orElse(null)) == null) {
                Serializer pickSerializer = argument2 != null ? this.serdeRegistry.pickSerializer(argument2) : new ByteArraySerializer();
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Using Kafka key serializer: {}", pickSerializer);
                }
                defaultKafkaProducerConfiguration.setKeySerializer(pickSerializer);
            }
            if (!config.containsKey("value.serializer") && ((Serializer) defaultKafkaProducerConfiguration.getValueSerializer().orElse(null)) == null) {
                Serializer pickSerializer2 = this.serdeRegistry.pickSerializer(annotationMetadata.isTrue(KafkaClient.class, "batch") ? (Argument) argument.getFirstTypeVariable().orElse(argument) : argument);
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Using Kafka value serializer: {}", pickSerializer2);
                }
                defaultKafkaProducerConfiguration.setValueSerializer(pickSerializer2);
            }
            return (Producer) this.beanContext.createBean(Producer.class, new Object[]{defaultKafkaProducerConfiguration});
        });
    }
}
