package org.springframework.integration.aggregator;

import java.time.Duration;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.Predicate;
import org.springframework.integration.IntegrationMessageHeaderAccessor;
import org.springframework.integration.IntegrationPatternType;
import org.springframework.integration.channel.ReactiveStreamsSubscribableChannel;
import org.springframework.integration.handler.AbstractMessageProducingHandler;
import org.springframework.integration.support.management.ManageableLifecycle;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.util.Assert;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;

/* loaded from: input_file:BOOT-INF/lib/spring-integration-core-6.0.2.jar:org/springframework/integration/aggregator/FluxAggregatorMessageHandler.class */
public class FluxAggregatorMessageHandler extends AbstractMessageProducingHandler implements ManageableLifecycle {
    private Predicate<Message<?>> boundaryTrigger;
    private Function<Flux<Message<?>>, Flux<Flux<Message<?>>>> windowConfigurer;
    private Duration windowTimespan;
    private FluxSink<Message<?>> sink;
    private volatile Disposable subscription;
    private final AtomicBoolean subscribed = new AtomicBoolean();
    private CorrelationStrategy correlationStrategy = new HeaderAttributeCorrelationStrategy(IntegrationMessageHeaderAccessor.CORRELATION_ID);
    private Function<Message<?>, Integer> windowSizeFunction = FluxAggregatorMessageHandler::sequenceSizeHeader;
    private Function<Flux<Message<?>>, Mono<Message<?>>> combineFunction = this::messageForWindowFlux;
    private final Flux<Message<?>> aggregatorFlux = Flux.create(fluxSink -> {
        this.sink = fluxSink;
    }, FluxSink.OverflowStrategy.BUFFER).groupBy(this::groupBy).flatMap(groupedFlux -> {
        return groupedFlux.transform(this::releaseBy);
    }).publish().autoConnect();

    private Object groupBy(Message<?> message) {
        return this.correlationStrategy.getCorrelationKey(message);
    }

    private Flux<Message<?>> releaseBy(Flux<Message<?>> flux) {
        return flux.transform(this.windowConfigurer != null ? this.windowConfigurer : this::applyWindowOptions).flatMap(flux2 -> {
            return flux2.transform(this.combineFunction);
        });
    }

    private Flux<Flux<Message<?>>> applyWindowOptions(Flux<Message<?>> flux) {
        return this.boundaryTrigger != null ? flux.windowUntil(this.boundaryTrigger) : flux.switchOnFirst((signal, flux2) -> {
            if (!signal.hasValue()) {
                return Flux.just(flux2);
            }
            Integer apply = this.windowSizeFunction.apply((Message) signal.get());
            return apply != null ? this.windowTimespan != null ? flux2.windowTimeout(apply.intValue(), this.windowTimespan) : flux2.window(apply.intValue()) : this.windowTimespan != null ? flux2.window(this.windowTimespan) : Flux.error(new IllegalStateException("One of the 'boundaryTrigger', 'windowSizeFunction' or 'windowTimespan' options must be configured or 'sequenceSize' header must be supplied in the messages to aggregate."));
        });
    }

    public void setCorrelationStrategy(CorrelationStrategy correlationStrategy) {
        Assert.notNull(correlationStrategy, "'correlationStrategy' must not be null");
        this.correlationStrategy = correlationStrategy;
    }

    public void setCombineFunction(Function<Flux<Message<?>>, Mono<Message<?>>> function) {
        Assert.notNull(function, "'combineFunction' must not be null");
        this.combineFunction = function;
    }

    public void setBoundaryTrigger(Predicate<Message<?>> predicate) {
        this.boundaryTrigger = predicate;
    }

    public void setWindowSize(int i) {
        setWindowSizeFunction(message -> {
            return Integer.valueOf(i);
        });
    }

    public void setWindowSizeFunction(Function<Message<?>, Integer> function) {
        Assert.notNull(function, "'windowSizeFunction' must not be null");
        this.windowSizeFunction = function;
    }

    public void setWindowTimespan(Duration duration) {
        this.windowTimespan = duration;
    }

    public void setWindowConfigurer(Function<Flux<Message<?>>, Flux<Flux<Message<?>>>> function) {
        this.windowConfigurer = function;
    }

    @Override // org.springframework.integration.handler.MessageHandlerSupport, org.springframework.integration.context.IntegrationObjectSupport, org.springframework.integration.support.context.NamedComponent
    public String getComponentType() {
        return "flux-aggregator";
    }

    @Override // org.springframework.integration.handler.MessageHandlerSupport, org.springframework.integration.IntegrationPattern
    public IntegrationPatternType getIntegrationPatternType() {
        return IntegrationPatternType.aggregator;
    }

    @Override // org.springframework.integration.support.management.ManageableLifecycle, org.springframework.context.Lifecycle
    public void start() {
        if (this.subscribed.compareAndSet(false, true)) {
            MessageChannel outputChannel = getOutputChannel();
            if (outputChannel instanceof ReactiveStreamsSubscribableChannel) {
                ((ReactiveStreamsSubscribableChannel) outputChannel).subscribeTo(this.aggregatorFlux);
            } else {
                this.subscription = this.aggregatorFlux.subscribe(message -> {
                    produceOutput(message, message);
                });
            }
        }
    }

    @Override // org.springframework.integration.support.management.ManageableLifecycle, org.springframework.context.Lifecycle
    public void stop() {
        if (!this.subscribed.compareAndSet(true, false) || this.subscription == null) {
            return;
        }
        this.subscription.dispose();
    }

    @Override // org.springframework.integration.support.management.ManageableLifecycle, org.springframework.context.Lifecycle
    public boolean isRunning() {
        return this.subscribed.get();
    }

    @Override // org.springframework.integration.handler.AbstractMessageHandler
    protected void handleMessageInternal(Message<?> message) {
        Assert.state(isRunning(), "The 'FluxAggregatorMessageHandler' has not been started to accept incoming messages");
        this.sink.next(message);
    }

    @Override // org.springframework.integration.handler.AbstractMessageProducingHandler
    protected boolean shouldCopyRequestHeaders() {
        return false;
    }

    private Mono<Message<?>> messageForWindowFlux(Flux<Message<?>> flux) {
        Flux<Message<?>> autoConnect = flux.publish().autoConnect();
        return autoConnect.next().map(message -> {
            return getMessageBuilderFactory().withPayload(Flux.concat(Mono.just(message), autoConnect)).copyHeaders(message.getHeaders()).build();
        });
    }

    private static Integer sequenceSizeHeader(Message<?> message) {
        return (Integer) message.getHeaders().get(IntegrationMessageHeaderAccessor.SEQUENCE_SIZE, Integer.class);
    }
}
