package com.iohao.game.bolt.broker.cluster;

import com.iohao.game.action.skeleton.toy.IoGameBanner;
import com.iohao.game.bolt.broker.core.common.IoGameGlobalConfig;
import com.iohao.game.bolt.broker.core.kit.HessianKit;
import com.iohao.game.bolt.broker.core.message.BrokerClusterMessage;
import com.iohao.game.bolt.broker.core.message.BrokerMessage;
import com.iohao.game.common.kit.ExecutorKit;
import com.iohao.game.common.kit.NetworkKit;
import com.iohao.game.common.kit.log.IoGameLoggerFactory;
import io.scalecube.cluster.Cluster;
import io.scalecube.cluster.ClusterImpl;
import io.scalecube.cluster.ClusterMessageHandler;
import io.scalecube.cluster.Member;
import io.scalecube.cluster.membership.MembershipEvent;
import io.scalecube.cluster.transport.api.Message;
import io.scalecube.net.Address;
import io.scalecube.transport.netty.tcp.TcpTransportFactory;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.jctools.maps.NonBlockingHashMap;
import org.slf4j.Logger;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;

/* loaded from: input_file:com/iohao/game/bolt/broker/cluster/BrokerClusterManager.class */
public class BrokerClusterManager implements ClusterMessageHandler {
    static final Logger log = IoGameLoggerFactory.getLoggerCluster();
    private String brokerId;
    private int port;
    private int gossipListenPort;
    private List<String> seedAddress;
    private Broker localBroker;
    private Mono<Cluster> clusterMono;
    private ClusterMessageListener clusterMessageListener;
    private final String clusterName = "io_game_cluster";
    private Map<String, Broker> brokers = new NonBlockingHashMap();
    private AtomicBoolean openScheduledLog = new AtomicBoolean(false);
    private Sinks.Many<Collection<Broker>> brokersEmitterProcessor = Sinks.many().multicast().onBackpressureBuffer();

    public void start() {
        String str = NetworkKit.LOCAL_IP;
        List<Address> listSeedMemberAddress = listSeedMemberAddress();
        this.clusterMono = new ClusterImpl().config(clusterConfig -> {
            return clusterConfig.memberAlias("Gateway Broker").externalHost(str).externalPort(Integer.valueOf(this.gossipListenPort));
        }).membership(membershipConfig -> {
            return membershipConfig.seedMembers(listSeedMemberAddress).syncInterval(5000);
        }).transportFactory(TcpTransportFactory::new).transport(transportConfig -> {
            return transportConfig.port(this.gossipListenPort);
        }).handler(cluster -> {
            return this;
        }).start();
        this.clusterMono.subscribe();
        String str2 = str + ":" + this.gossipListenPort;
        this.localBroker = new Broker(str).setId(this.brokerId).setPort(this.port).setBrokerAddress(str + ":" + this.port).setClusterAddress(str2);
        this.brokers.put(str2, this.localBroker);
    }

    void send() {
        ExecutorKit.newSingleScheduled(BrokerClusterManager.class.toString()).scheduleAtFixedRate(() -> {
            Message fromData = Message.fromData("Greetings from Carol~~~~~");
            if (IoGameGlobalConfig.isBrokerClusterLog()) {
                log.info("message : {}", fromData);
            }
            this.clusterMono.subscribe(cluster -> {
                Flux.fromIterable(cluster.otherMembers()).flatMap(member -> {
                    return cluster.send(member, fromData);
                }).subscribe((Consumer) null, (v0) -> {
                    v0.printStackTrace();
                });
            });
        }, 1L, 8L, TimeUnit.SECONDS);
    }

    public void inform() {
        if (!Objects.isNull(this.clusterMessageListener) && IoGameGlobalConfig.isBrokerClusterLog()) {
            if (!this.openScheduledLog.get()) {
                this.openScheduledLog.set(true);
                ExecutorKit.newSingleScheduled(BrokerClusterManager.class.getName()).scheduleAtFixedRate(() -> {
                    BrokerClusterMessage brokerClusterMessage = getBrokerClusterMessage();
                    log.debug("broker（游戏网关）: [{}] --  集群数量[{}] - 详细：[{}]", new Object[]{Integer.valueOf(this.localBroker.getPort()), Integer.valueOf(brokerClusterMessage.count()), brokerClusterMessage});
                }, 1L, 10L, TimeUnit.SECONDS);
            }
            this.clusterMessageListener.inform(getBrokerClusterMessage());
        }
    }

    public BrokerClusterMessage getBrokerClusterMessage() {
        List list = (List) this.brokers.values().stream().map(broker -> {
            BrokerMessage brokerMessage = new BrokerMessage();
            brokerMessage.setAddress(broker.getBrokerAddress());
            brokerMessage.setId(broker.getId());
            return brokerMessage;
        }).collect(Collectors.toList());
        BrokerClusterMessage brokerClusterMessage = new BrokerClusterMessage();
        brokerClusterMessage.setBrokerMessageList(list);
        return brokerClusterMessage;
    }

    public void onMembershipEvent(MembershipEvent membershipEvent) {
        Broker memberToBroker = memberToBroker(membershipEvent.member());
        String clusterAddress = memberToBroker.getClusterAddress();
        if (membershipEvent.isAdded()) {
            makeCall(membershipEvent.member()).subscribe(bArr -> {
                Broker broker = (Broker) HessianKit.deserialize(bArr, Broker.class);
                log.info("onMembershipEvent {}", broker);
                IoGameBanner.render();
                memberToBroker.setId(broker.getId()).setPort(broker.getPort()).setBrokerAddress(broker.getBrokerAddress());
                this.brokers.put(clusterAddress, memberToBroker);
                inform();
            });
        } else if (membershipEvent.isRemoved()) {
            this.brokers.remove(clusterAddress);
            log.info("isRemoved onMembershipEvent: {}", clusterAddress);
            inform();
        } else if (membershipEvent.isLeaving()) {
            this.brokers.remove(clusterAddress);
            log.info("isLeaving onMembershipEvent: {}", clusterAddress);
            inform();
        }
        this.brokersEmitterProcessor.tryEmitNext(this.brokers.values());
    }

    public void onGossip(Message message) {
        if (IoGameGlobalConfig.isBrokerClusterLog()) {
            log.info("Message gossip : {}", message);
        }
    }

    public void onMessage(Message message) {
        if (message.header("added") != null) {
            byte[] serialize = HessianKit.serialize(this.localBroker);
            log.info("onMessage {}", this.localBroker);
            Message build = Message.builder().correlationId(message.correlationId()).data(serialize).build();
            this.clusterMono.flatMap(cluster -> {
                return cluster.send(message.sender(), build);
            }).subscribe();
        }
    }

    private Mono<byte[]> makeCall(Member member) {
        IoGameBanner.render();
        Message build = Message.builder().correlationId(UUID.randomUUID().toString()).header("added", "").build();
        return this.clusterMono.flatMap(cluster -> {
            return cluster.requestResponse(member, build);
        }).map((v0) -> {
            return v0.data();
        });
    }

    private Broker memberToBroker(Member member) {
        Address address = member.address();
        return new Broker(address.host()).setClusterAddress(address.toString());
    }

    private List<Address> listSeedMemberAddress() {
        return this.seedAddress.stream().map(Address::from).toList();
    }

    public String getClusterName() {
        Objects.requireNonNull(this);
        return "io_game_cluster";
    }

    public String getBrokerId() {
        return this.brokerId;
    }

    public int getPort() {
        return this.port;
    }

    public int getGossipListenPort() {
        return this.gossipListenPort;
    }

    public List<String> getSeedAddress() {
        return this.seedAddress;
    }

    public Broker getLocalBroker() {
        return this.localBroker;
    }

    public Mono<Cluster> getClusterMono() {
        return this.clusterMono;
    }

    public ClusterMessageListener getClusterMessageListener() {
        return this.clusterMessageListener;
    }

    public Map<String, Broker> getBrokers() {
        return this.brokers;
    }

    public AtomicBoolean getOpenScheduledLog() {
        return this.openScheduledLog;
    }

    public Sinks.Many<Collection<Broker>> getBrokersEmitterProcessor() {
        return this.brokersEmitterProcessor;
    }

    public BrokerClusterManager setBrokerId(String str) {
        this.brokerId = str;
        return this;
    }

    public BrokerClusterManager setPort(int i) {
        this.port = i;
        return this;
    }

    public BrokerClusterManager setGossipListenPort(int i) {
        this.gossipListenPort = i;
        return this;
    }

    public BrokerClusterManager setSeedAddress(List<String> list) {
        this.seedAddress = list;
        return this;
    }

    public BrokerClusterManager setLocalBroker(Broker broker) {
        this.localBroker = broker;
        return this;
    }

    public BrokerClusterManager setClusterMono(Mono<Cluster> mono) {
        this.clusterMono = mono;
        return this;
    }

    public BrokerClusterManager setClusterMessageListener(ClusterMessageListener clusterMessageListener) {
        this.clusterMessageListener = clusterMessageListener;
        return this;
    }

    public BrokerClusterManager setBrokers(Map<String, Broker> map) {
        this.brokers = map;
        return this;
    }

    public BrokerClusterManager setOpenScheduledLog(AtomicBoolean atomicBoolean) {
        this.openScheduledLog = atomicBoolean;
        return this;
    }

    public BrokerClusterManager setBrokersEmitterProcessor(Sinks.Many<Collection<Broker>> many) {
        this.brokersEmitterProcessor = many;
        return this;
    }
}
