/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flume.api;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.FlumeException;
import org.apache.flume.api.AbstractRpcClient;
import org.apache.flume.api.HostInfo;
import org.apache.flume.api.RpcClient;
import org.apache.flume.api.RpcClientFactory;
import org.apache.flume.util.SpecificOrderIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LoadBalancingRpcClient
extends AbstractRpcClient {
    private static final Logger LOGGER = LoggerFactory.getLogger(LoadBalancingRpcClient.class);
    private List<HostInfo> hosts;
    private HostSelector selector;
    private Map<String, RpcClient> clientMap;
    private Properties configurationProperties;

    @Override
    public void append(Event event) throws EventDeliveryException {
        boolean eventSent = false;
        Iterator<HostInfo> it = this.selector.createHostIterator();
        while (it.hasNext()) {
            HostInfo host = it.next();
            try {
                RpcClient client = this.getClient(host);
                client.append(event);
                eventSent = true;
                break;
            }
            catch (Exception ex) {
                LOGGER.warn("Failed to send event to host " + host, (Throwable)ex);
            }
        }
        if (!eventSent) {
            throw new EventDeliveryException("Unable to send event to any host");
        }
    }

    @Override
    public void appendBatch(List<Event> events) throws EventDeliveryException {
        boolean batchSent = false;
        Iterator<HostInfo> it = this.selector.createHostIterator();
        while (it.hasNext()) {
            HostInfo host = it.next();
            RpcClient client = this.getClient(host);
            try {
                client.appendBatch(events);
                batchSent = true;
                break;
            }
            catch (Exception ex) {
                LOGGER.warn("Failed to send batch to host " + host, (Throwable)ex);
            }
        }
        if (!batchSent) {
            throw new EventDeliveryException("Unable to send batch to any host");
        }
    }

    @Override
    public boolean isActive() {
        return true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close() throws FlumeException {
        LoadBalancingRpcClient loadBalancingRpcClient = this;
        synchronized (loadBalancingRpcClient) {
            Iterator<String> it = this.clientMap.keySet().iterator();
            while (it.hasNext()) {
                String name = it.next();
                RpcClient client = this.clientMap.get(name);
                if (client != null) {
                    try {
                        client.close();
                    }
                    catch (Exception ex) {
                        LOGGER.warn("Failed to close client: " + name, (Throwable)ex);
                    }
                }
                it.remove();
            }
        }
    }

    @Override
    protected void configure(Properties properties) throws FlumeException {
        this.clientMap = new HashMap<String, RpcClient>();
        this.configurationProperties = new Properties();
        this.configurationProperties.putAll((Map<?, ?>)properties);
        this.hosts = HostInfo.getHostInfoList(properties);
        if (this.hosts.size() < 2) {
            throw new FlumeException("At least two hosts are required to use the load balancing RPC client.");
        }
        String lbTypeName = properties.getProperty("host-selector", "ROUND_ROBIN");
        if (lbTypeName.equalsIgnoreCase("ROUND_ROBIN")) {
            this.selector = new RoundRobinHostSelector();
        } else if (lbTypeName.equalsIgnoreCase("RANDOM")) {
            this.selector = new RandomOrderHostSelector();
        } else {
            try {
                Class<?> klass = Class.forName(lbTypeName);
                this.selector = (HostSelector)klass.newInstance();
            }
            catch (Exception ex) {
                throw new FlumeException("Unable to instantiate host selector: " + lbTypeName, ex);
            }
        }
        this.selector.setHosts(this.hosts);
    }

    private synchronized RpcClient getClient(HostInfo info) {
        String name = info.getReferenceName();
        RpcClient client = this.clientMap.get(name);
        if (client == null) {
            client = this.createClient(name);
            this.clientMap.put(name, client);
        } else if (!client.isActive()) {
            try {
                client.close();
            }
            catch (Exception ex) {
                LOGGER.warn("Failed to close client for " + info, (Throwable)ex);
            }
            client = this.createClient(name);
            this.clientMap.put(name, client);
        }
        return client;
    }

    private RpcClient createClient(String referenceName) {
        Properties props = this.getClientConfigurationProperties(referenceName);
        return RpcClientFactory.getInstance(props);
    }

    private Properties getClientConfigurationProperties(String referenceName) {
        Properties props = new Properties();
        props.putAll((Map<?, ?>)this.configurationProperties);
        props.put("client.type", (Object)RpcClientFactory.ClientType.DEFAULT);
        props.put("hosts", referenceName);
        return props;
    }

    private static class RandomOrderHostSelector
    implements HostSelector {
        private List<HostInfo> hostList;
        private Random random = new Random(System.currentTimeMillis());

        private RandomOrderHostSelector() {
        }

        @Override
        public synchronized Iterator<HostInfo> createHostIterator() {
            int size = this.hostList.size();
            int[] indexOrder = new int[size];
            ArrayList<Integer> indexList = new ArrayList<Integer>();
            for (int i = 0; i < size; ++i) {
                indexList.add(i);
            }
            while (indexList.size() != 1) {
                int position = indexList.size();
                int pick = this.random.nextInt(position);
                indexOrder[position - 1] = (Integer)indexList.remove(pick);
            }
            indexOrder[0] = (Integer)indexList.get(0);
            return new SpecificOrderIterator<HostInfo>(indexOrder, this.hostList);
        }

        @Override
        public synchronized void setHosts(List<HostInfo> hosts) {
            ArrayList<HostInfo> infos = new ArrayList<HostInfo>();
            infos.addAll(hosts);
            this.hostList = Collections.unmodifiableList(infos);
        }
    }

    private static class RoundRobinHostSelector
    implements HostSelector {
        private int nextHead;
        private List<HostInfo> hostList;

        private RoundRobinHostSelector() {
        }

        @Override
        public synchronized Iterator<HostInfo> createHostIterator() {
            int size = this.hostList.size();
            int[] indexOrder = new int[size];
            int begin = this.nextHead++;
            if (this.nextHead == size) {
                this.nextHead = 0;
            }
            for (int i = 0; i < size; ++i) {
                indexOrder[i] = (begin + i) % size;
            }
            return new SpecificOrderIterator<HostInfo>(indexOrder, this.hostList);
        }

        @Override
        public synchronized void setHosts(List<HostInfo> hosts) {
            ArrayList<HostInfo> infos = new ArrayList<HostInfo>();
            infos.addAll(hosts);
            this.hostList = Collections.unmodifiableList(infos);
        }
    }

    public static interface HostSelector {
        public void setHosts(List<HostInfo> var1);

        public Iterator<HostInfo> createHostIterator();
    }
}

