package org.springframework.cloud.netflix.turbine.amqp;

import com.netflix.turbine.aggregator.InstanceKey;
import com.netflix.turbine.aggregator.StreamAggregator;
import com.netflix.turbine.internal.JsonUtility;
import io.netty.buffer.ByteBuf;
import io.reactivex.netty.RxNetty;
import io.reactivex.netty.pipeline.PipelineConfigurators;
import io.reactivex.netty.protocol.http.server.HttpServer;
import io.reactivex.netty.protocol.text.sse.ServerSentEvent;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.SmartLifecycle;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.SocketUtils;
import rx.Observable;
import rx.subjects.PublishSubject;

@EnableConfigurationProperties({TurbineAmqpProperties.class})
@Configuration
/* loaded from: input_file:org/springframework/cloud/netflix/turbine/amqp/TurbineAmqpConfiguration.class */
public class TurbineAmqpConfiguration implements SmartLifecycle {
    private static final Log log = LogFactory.getLog(TurbineAmqpConfiguration.class);
    private boolean running = false;

    @Autowired
    private TurbineAmqpProperties turbine;
    private int turbinePort;

    @Bean
    public PublishSubject<Map<String, Object>> hystrixSubject() {
        return PublishSubject.create();
    }

    @Bean
    public HttpServer<ByteBuf, ServerSentEvent> aggregatorServer() {
        Observable refCount = StreamAggregator.aggregateGroupedStreams(hystrixSubject().groupBy(map -> {
            return InstanceKey.create((String) map.get("instanceId"));
        })).doOnUnsubscribe(() -> {
            log.info("Unsubscribing aggregation.");
        }).doOnSubscribe(() -> {
            log.info("Starting aggregation");
        }).flatMap(groupedObservable -> {
            return groupedObservable;
        }).publish().refCount();
        this.turbinePort = this.turbine.getPort();
        if (this.turbinePort <= 0) {
            this.turbinePort = SocketUtils.findAvailableTcpPort(40000);
        }
        return RxNetty.createHttpServer(this.turbinePort, (httpServerRequest, httpServerResponse) -> {
            log.info("SSE Request Received");
            httpServerResponse.getHeaders().setHeader("Content-Type", "text/event-stream");
            return refCount.doOnUnsubscribe(() -> {
                log.info("Unsubscribing RxNetty server connection");
            }).flatMap(map2 -> {
                return httpServerResponse.writeAndFlush(new ServerSentEvent((String) null, (String) null, JsonUtility.mapToJson(map2)));
            });
        }, PipelineConfigurators.sseServerConfigurator());
    }

    public boolean isAutoStartup() {
        return true;
    }

    public void stop(Runnable runnable) {
        stop();
        runnable.run();
    }

    public void start() {
        aggregatorServer().start();
    }

    public void stop() {
        try {
            aggregatorServer().shutdown();
        } catch (InterruptedException e) {
            log.error("Error shutting down", e);
        }
        this.running = false;
    }

    public boolean isRunning() {
        return this.running;
    }

    public int getPhase() {
        return 0;
    }

    public int getTurbinePort() {
        return this.turbinePort;
    }
}
