/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.extensions.kafka.autoconfig;

import java.lang.invoke.MethodHandles;
import java.time.temporal.TemporalAmount;
import java.util.Collections;
import java.util.Optional;
import org.axonframework.common.AxonConfigurationException;
import org.axonframework.config.Configuration;
import org.axonframework.config.EventProcessingConfigurer;
import org.axonframework.eventhandling.PropagatingErrorHandler;
import org.axonframework.eventhandling.tokenstore.TokenStore;
import org.axonframework.extensions.kafka.KafkaProperties;
import org.axonframework.extensions.kafka.eventhandling.DefaultKafkaMessageConverter;
import org.axonframework.extensions.kafka.eventhandling.KafkaMessageConverter;
import org.axonframework.extensions.kafka.eventhandling.cloudevent.CloudEventKafkaMessageConverter;
import org.axonframework.extensions.kafka.eventhandling.consumer.AsyncFetcher;
import org.axonframework.extensions.kafka.eventhandling.consumer.ConsumerFactory;
import org.axonframework.extensions.kafka.eventhandling.consumer.DefaultConsumerFactory;
import org.axonframework.extensions.kafka.eventhandling.consumer.Fetcher;
import org.axonframework.extensions.kafka.eventhandling.consumer.streamable.KafkaEventMessage;
import org.axonframework.extensions.kafka.eventhandling.consumer.streamable.SortedKafkaMessageBuffer;
import org.axonframework.extensions.kafka.eventhandling.consumer.streamable.StreamableKafkaMessageSource;
import org.axonframework.extensions.kafka.eventhandling.producer.ConfirmationMode;
import org.axonframework.extensions.kafka.eventhandling.producer.DefaultProducerFactory;
import org.axonframework.extensions.kafka.eventhandling.producer.KafkaEventPublisher;
import org.axonframework.extensions.kafka.eventhandling.producer.KafkaPublisher;
import org.axonframework.extensions.kafka.eventhandling.producer.ProducerFactory;
import org.axonframework.extensions.kafka.eventhandling.producer.TopicResolver;
import org.axonframework.extensions.kafka.eventhandling.tokenstore.KafkaTokenStore;
import org.axonframework.serialization.Serializer;
import org.axonframework.serialization.upcasting.event.EventUpcaster;
import org.axonframework.serialization.upcasting.event.EventUpcasterChain;
import org.axonframework.springboot.TokenStoreProperties;
import org.axonframework.springboot.autoconfig.AxonAutoConfiguration;
import org.axonframework.springboot.autoconfig.InfraConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.AutoConfigureBefore;
import org.springframework.boot.autoconfigure.condition.AnyNestedCondition;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.ConfigurationCondition;

@AutoConfiguration
@ConditionalOnExpression(value="${axon.kafka.publisher.enabled:true} or ${axon.kafka.fetcher.enabled:true}")
@AutoConfigureAfter(value={AxonAutoConfiguration.class})
@AutoConfigureBefore(value={InfraConfiguration.class})
@EnableConfigurationProperties(value={KafkaProperties.class, TokenStoreProperties.class})
public class KafkaAutoConfiguration {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private final KafkaProperties properties;
    private final TokenStoreProperties tokenStoreProperties;

    public KafkaAutoConfiguration(KafkaProperties properties, TokenStoreProperties tokenStoreProperties) {
        this.properties = properties;
        this.tokenStoreProperties = tokenStoreProperties;
    }

    @Bean
    @ConditionalOnMissingBean
    public KafkaMessageConverter<?, ?> kafkaMessageConverter(@Qualifier(value="eventSerializer") Serializer eventSerializer, Configuration configuration) {
        KafkaProperties.MessageConverterMode converterMode = this.properties.getMessageConverterMode();
        if (converterMode == KafkaProperties.MessageConverterMode.DEFAULT) {
            return DefaultKafkaMessageConverter.builder().serializer(eventSerializer).upcasterChain(configuration.upcasterChain() != null ? configuration.upcasterChain() : new EventUpcasterChain(new EventUpcaster[0])).build();
        }
        if (converterMode == KafkaProperties.MessageConverterMode.CLOUD_EVENT) {
            return CloudEventKafkaMessageConverter.builder().serializer(eventSerializer).upcasterChain(configuration.upcasterChain() != null ? configuration.upcasterChain() : new EventUpcasterChain(new EventUpcaster[0])).build();
        }
        throw new AxonConfigurationException("Unknown Kafka Message Converter Mode [" + (Object)((Object)converterMode) + "] detected");
    }

    @Bean(value={"axonKafkaProducerFactory"})
    @ConditionalOnMissingBean
    @ConditionalOnProperty(name={"axon.kafka.publisher.enabled"}, havingValue="true", matchIfMissing=true)
    public <K, V> ProducerFactory<?, ?> kafkaProducerFactory() {
        ConfirmationMode confirmationMode = this.properties.getPublisher().getConfirmationMode();
        String transactionIdPrefix = this.properties.getProducer().getTransactionIdPrefix();
        DefaultProducerFactory.Builder builder = DefaultProducerFactory.builder().configuration(this.properties.buildProducerProperties()).confirmationMode(confirmationMode);
        if (this.isNonEmptyString(transactionIdPrefix)) {
            builder.transactionalIdPrefix(transactionIdPrefix).confirmationMode(ConfirmationMode.TRANSACTIONAL);
            if (!confirmationMode.isTransactional()) {
                logger.warn("The confirmation mode is set to [{}], whilst a transactional id prefix is present. The transactional id prefix overwrites the confirmation mode choice to TRANSACTIONAL", (Object)confirmationMode);
            }
        }
        return builder.build();
    }

    private boolean isNonEmptyString(String s) {
        return s != null && !s.equals("");
    }

    @Bean
    @ConditionalOnMissingBean
    @ConditionalOnBean(value={ProducerFactory.class, KafkaMessageConverter.class})
    @ConditionalOnProperty(name={"axon.kafka.publisher.enabled"}, havingValue="true", matchIfMissing=true)
    public TopicResolver topicResolver() {
        return m -> Optional.of(this.properties.getDefaultTopic());
    }

    @ConditionalOnMissingBean
    @Bean(destroyMethod="shutDown")
    @ConditionalOnBean(value={ProducerFactory.class, KafkaMessageConverter.class})
    @ConditionalOnProperty(name={"axon.kafka.publisher.enabled"}, havingValue="true", matchIfMissing=true)
    public <K, V> KafkaPublisher<?, ?> kafkaPublisher(@Qualifier(value="eventSerializer") Serializer eventSerializer, ProducerFactory<K, V> axonKafkaProducerFactory, KafkaMessageConverter<K, V> kafkaMessageConverter, Configuration configuration, TopicResolver topicResolver) {
        return KafkaPublisher.builder().serializer(eventSerializer).producerFactory(axonKafkaProducerFactory).messageConverter(kafkaMessageConverter).messageMonitor(configuration.messageMonitor(KafkaPublisher.class, "kafkaPublisher")).topicResolver(topicResolver).build();
    }

    @Bean
    @ConditionalOnMissingBean
    @ConditionalOnBean(value={KafkaPublisher.class})
    @ConditionalOnProperty(name={"axon.kafka.publisher.enabled"}, havingValue="true", matchIfMissing=true)
    public <K, V> KafkaEventPublisher<?, ?> kafkaEventPublisher(KafkaPublisher<K, V> kafkaPublisher, KafkaProperties kafkaProperties, EventProcessingConfigurer eventProcessingConfigurer) {
        KafkaEventPublisher kafkaEventPublisher = KafkaEventPublisher.builder().processingGroup(this.properties.getPublisher().getProcessingGroup()).kafkaPublisher(kafkaPublisher).build();
        eventProcessingConfigurer.registerEventHandler(configuration -> kafkaEventPublisher).registerListenerInvocationErrorHandler(kafkaEventPublisher.getProcessingGroup(), configuration -> PropagatingErrorHandler.instance()).assignHandlerTypesMatching(kafkaEventPublisher.getProcessingGroup(), clazz -> clazz.isAssignableFrom(KafkaEventPublisher.class));
        KafkaProperties.EventProcessorMode processorMode = kafkaProperties.getProducer().getEventProcessorMode();
        if (processorMode == KafkaProperties.EventProcessorMode.SUBSCRIBING) {
            eventProcessingConfigurer.registerSubscribingEventProcessor(kafkaEventPublisher.getProcessingGroup());
        } else if (processorMode == KafkaProperties.EventProcessorMode.TRACKING) {
            eventProcessingConfigurer.registerTrackingEventProcessor(kafkaEventPublisher.getProcessingGroup());
        } else if (processorMode == KafkaProperties.EventProcessorMode.POOLED_STREAMING) {
            eventProcessingConfigurer.registerPooledStreamingEventProcessor(kafkaEventPublisher.getProcessingGroup());
        } else {
            throw new AxonConfigurationException("Unknown Event Processor Mode [" + (Object)((Object)processorMode) + "] detected");
        }
        return kafkaEventPublisher;
    }

    @Bean
    @ConditionalOnMissingBean
    @Conditional(value={ProducerStreamingProcessorModeCondition.class})
    @ConditionalOnProperty(name={"axon.kafka.fetcher.enabled"}, havingValue="true", matchIfMissing=true)
    public TokenStore tokenStore(Serializer serializer) {
        return KafkaTokenStore.builder().serializer(serializer).consumerConfiguration(this.properties.buildConsumerProperties()).producerConfiguration(this.properties.buildProducerProperties()).claimTimeout((TemporalAmount)this.tokenStoreProperties.getClaimTimeout()).build();
    }

    @Bean(value={"axonKafkaConsumerFactory"})
    @ConditionalOnMissingBean
    @ConditionalOnProperty(name={"axon.kafka.fetcher.enabled"}, havingValue="true", matchIfMissing=true)
    public ConsumerFactory<?, ?> kafkaConsumerFactory() {
        return new DefaultConsumerFactory(this.properties.buildConsumerProperties());
    }

    @ConditionalOnMissingBean
    @Bean(destroyMethod="shutdown")
    @ConditionalOnProperty(name={"axon.kafka.fetcher.enabled"}, havingValue="true", matchIfMissing=true)
    public Fetcher<?, ?, ?> kafkaFetcher() {
        return AsyncFetcher.builder().pollTimeout(this.properties.getFetcher().getPollTimeout()).build();
    }

    @Bean
    @ConditionalOnMissingBean
    @ConditionalOnBean(value={ConsumerFactory.class, KafkaMessageConverter.class, Fetcher.class})
    @Conditional(value={ConsumerStreamingProcessorModeCondition.class})
    @ConditionalOnProperty(name={"axon.kafka.fetcher.enabled"}, havingValue="true", matchIfMissing=true)
    public <K, V> StreamableKafkaMessageSource<?, ?> streamableKafkaMessageSource(@Qualifier(value="eventSerializer") Serializer eventSerializer, ConsumerFactory<K, V> kafkaConsumerFactory, Fetcher<K, V, KafkaEventMessage> kafkaFetcher, KafkaMessageConverter<K, V> kafkaMessageConverter) {
        return ((StreamableKafkaMessageSource.Builder)StreamableKafkaMessageSource.builder().topics(Collections.singletonList(this.properties.getDefaultTopic()))).serializer(eventSerializer).consumerFactory(kafkaConsumerFactory).fetcher(kafkaFetcher).messageConverter(kafkaMessageConverter).bufferFactory(() -> new SortedKafkaMessageBuffer(this.properties.getFetcher().getBufferSize())).build();
    }

    private static class ProducerStreamingProcessorModeCondition
    extends AnyNestedCondition {
        public ProducerStreamingProcessorModeCondition() {
            super(ConfigurationCondition.ConfigurationPhase.REGISTER_BEAN);
        }

        @ConditionalOnProperty(name={"axon.kafka.producer.event-processor-mode"}, havingValue="pooled_streaming")
        static class PooledStreamingCondition {
            PooledStreamingCondition() {
            }
        }

        @ConditionalOnProperty(name={"axon.kafka.producer.event-processor-mode"}, havingValue="tracking")
        static class TrackingCondition {
            TrackingCondition() {
            }
        }
    }

    private static class ConsumerStreamingProcessorModeCondition
    extends AnyNestedCondition {
        public ConsumerStreamingProcessorModeCondition() {
            super(ConfigurationCondition.ConfigurationPhase.REGISTER_BEAN);
        }

        @ConditionalOnProperty(name={"axon.kafka.consumer.event-processor-mode"}, havingValue="pooled_streaming")
        static class PooledStreamingCondition {
            PooledStreamingCondition() {
            }
        }

        @ConditionalOnProperty(name={"axon.kafka.consumer.event-processor-mode"}, havingValue="tracking")
        static class TrackingCondition {
            TrackingCondition() {
            }
        }
    }
}

