package org.apache.pinot.tools.admin.command;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import java.io.File;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import javax.annotation.Nullable;
import org.apache.helix.PropertyPathBuilder;
import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.model.IdealState;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer;
import org.apache.pinot.common.utils.config.TableConfigUtils;
import org.apache.pinot.common.utils.helix.HelixHelper;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.spi.utils.retry.RetryPolicies;
import org.apache.pinot.tools.Command;
import org.apache.pinot.tools.PinotZKChanger;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import picocli.CommandLine;

@CommandLine.Command(name = "MoveReplicaGroup")
/* loaded from: input_file:org/apache/pinot/tools/admin/command/MoveReplicaGroup.class */
public class MoveReplicaGroup extends AbstractBaseAdminCommand implements Command {
    private static final Logger LOGGER = LoggerFactory.getLogger(MoveReplicaGroup.class);

    @CommandLine.Option(names = {"-srcHosts", "-s", "--src"}, required = true, description = {"File with names of source hosts or csv list of hostnames"})
    private String _srcHosts;

    @CommandLine.Option(names = {"-tableName", "-t", "-table"}, required = true, description = {"Table name. Supports only OFFLINE table (type is optional)"})
    private String _tableName;

    @CommandLine.Option(names = {"-zkHost", "--zk", "-z"}, required = true, description = {"Zookeeper host:port"})
    private String _zkHost;

    @CommandLine.Option(names = {"-zkPath", "--cluster", "-c"}, required = true, description = {"Zookeeper cluster path(Ex: /pinot"})
    private String _zkPath;
    private ZKHelixAdmin _helix;
    private PinotZKChanger _zkChanger;

    @CommandLine.Option(names = {"-destHostsFile", "-d", "--dest"}, required = false, description = {"File with destination servers list"})
    private String _destHostsFile = "";

    @CommandLine.Option(names = {"-maxSegmentsToMove", "-m", "--max"}, required = false, description = {"MaxSegmentsToMove"})
    private int _maxSegmentsToMove = Integer.MAX_VALUE;

    @CommandLine.Option(names = {"-exec"}, required = false, description = {"Execute replica group move. dryRun(default) if not specified"})
    private boolean _exec = false;

    @CommandLine.Option(names = {"-help", "-h", "--h", "--help"}, required = false, description = {"Prints help"})
    private boolean _help = false;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/pinot/tools/admin/command/MoveReplicaGroup$ServerInstance.class */
    public class ServerInstance {
        String _server;
        int _segments;

        ServerInstance(String str, int i) {
            this._server = str;
            this._segments = i;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/pinot/tools/admin/command/MoveReplicaGroup$SourceSegments.class */
    public class SourceSegments {
        String _segment;
        int _replicaCount;

        SourceSegments(String str, int i) {
            this._segment = str;
            this._replicaCount = i;
        }
    }

    @Override // org.apache.pinot.tools.Command
    public boolean getHelp() {
        return this._help;
    }

    @Override // org.apache.pinot.tools.AbstractBaseCommand
    public String getName() {
        return "MoveReplicaGroup";
    }

    @Override // org.apache.pinot.tools.Command
    public String description() {
        return "Move complete set of segment replica from source servers to tagged servers in cluster";
    }

    public String toString() {
        return "MoveReplicaGroup -srcHosts " + this._srcHosts + " -tableName " + this._tableName + " -zkHost " + this._zkHost + " -zkPath " + this._zkPath + (this._exec ? " -exec" : "");
    }

    @Override // org.apache.pinot.tools.AbstractBaseCommand
    public void cleanup() {
    }

    @Override // org.apache.pinot.tools.Command
    public boolean execute() throws IOException, InterruptedException {
        validateParams();
        this._zkChanger = new PinotZKChanger(this._zkHost, this._zkPath);
        this._helix = this._zkChanger.getHelixAdmin();
        if (!isExistingTable(this._tableName)) {
            LOGGER.error("Table {} does not exist", this._tableName);
        }
        List<String> readSourceHosts = readSourceHosts();
        LOGGER.info("Source hosts: {}", readSourceHosts);
        LOGGER.debug("Using server tenant: {}", getServerTenantName(this._tableName) + "_OFFLINE");
        List<String> readDestinationServers = readDestinationServers();
        LOGGER.info("Destination servers: {}", readDestinationServers);
        verifyServerLists(readSourceHosts, readDestinationServers);
        Map<String, Map<String, String>> mapFields = this._helix.getResourceIdealState(this._zkPath, this._tableName).getRecord().getMapFields();
        System.out.println("Existing idealstate:");
        printIdealState(mapFields);
        PriorityQueue<SourceSegments> segmentsToMoveQueue = getSegmentsToMoveQueue(mapFields, readSourceHosts);
        PriorityQueue<ServerInstance> destinationServerQueue = getDestinationServerQueue(mapFields, readDestinationServers);
        Map<String, Map<String, String>> computeNewIdealState = computeNewIdealState(mapFields, segmentsToMoveQueue, destinationServerQueue, readSourceHosts);
        System.out.println("Proposed idealstate:");
        printIdealState(computeNewIdealState);
        printDestinationServerCounts(destinationServerQueue);
        if (!this._exec) {
            LOGGER.info("Run with -exec to apply this IdealState");
            System.exit(0);
        }
        applyIdealState(computeNewIdealState);
        this._zkChanger.waitForStable(this._tableName);
        return true;
    }

    private List<String> readSourceHosts() throws IOException {
        List<String> hostNameToInstanceNames;
        if (this._srcHosts.isEmpty()) {
            LOGGER.error("Source hosts(-s) are required");
            System.exit(1);
        }
        if (new File(this._srcHosts).exists()) {
            hostNameToInstanceNames = readHostsFromFile(this._srcHosts);
            if (hostNameToInstanceNames.isEmpty()) {
                LOGGER.error("Empty list of servers. Nothing to do");
                System.exit(1);
            }
        } else {
            hostNameToInstanceNames = hostNameToInstanceNames(Arrays.asList(this._srcHosts.split("\\s*,\\s*")));
        }
        return hostNameToInstanceNames;
    }

    private void printDestinationServerCounts(PriorityQueue<ServerInstance> priorityQueue) {
        System.out.println("Number of segments per server: ");
        Iterator<ServerInstance> it = priorityQueue.iterator();
        while (it.hasNext()) {
            ServerInstance next = it.next();
            System.out.println(next._server + " : " + next._segments);
        }
    }

    private void printIdealState(Map<String, Map<String, String>> map) throws JsonProcessingException {
        System.out.println(JsonUtils.objectToPrettyString(map));
    }

    private void applyIdealState(final Map<String, Map<String, String>> map) {
        HelixHelper.updateIdealState(this._zkChanger.getHelixManager(), this._tableName, new Function<IdealState, IdealState>() { // from class: org.apache.pinot.tools.admin.command.MoveReplicaGroup.1
            @Nullable
            public IdealState apply(@Nullable IdealState idealState) {
                Map mapFields = idealState.getRecord().getMapFields();
                for (Map.Entry entry : map.entrySet()) {
                    mapFields.put((String) entry.getKey(), (Map) entry.getValue());
                }
                return idealState;
            }
        }, RetryPolicies.exponentialBackoffRetryPolicy(5, 500L, 2.0d));
    }

    private Map<String, Map<String, String>> computeNewIdealState(Map<String, Map<String, String>> map, PriorityQueue<SourceSegments> priorityQueue, PriorityQueue<ServerInstance> priorityQueue2, List<String> list) {
        Map<String, Map<String, String>> copyIdealState = copyIdealState(map);
        for (int i = 0; i < this._maxSegmentsToMove && !priorityQueue.isEmpty(); i++) {
            String str = priorityQueue.poll()._segment;
            Map<String, String> map2 = copyIdealState.get(str);
            String destinationServer = getDestinationServer(priorityQueue2, map2);
            if (destinationServer == null) {
                throw new RuntimeException("No destination server for segment: " + str);
            }
            String str2 = null;
            Iterator<Map.Entry<String, String>> it = map2.entrySet().iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                Map.Entry<String, String> next = it.next();
                if (list.contains(next.getKey())) {
                    str2 = next.getKey();
                    break;
                }
            }
            if (str2 == null) {
                throw new RuntimeException("Could not find a source host to remove for segment: " + str);
            }
            map2.remove(str2);
            map2.put(destinationServer, "ONLINE");
        }
        return copyIdealState;
    }

    private String getDestinationServer(PriorityQueue<ServerInstance> priorityQueue, Map<String, String> map) {
        Preconditions.checkNotNull(priorityQueue);
        Preconditions.checkArgument(!priorityQueue.isEmpty());
        ArrayList arrayList = new ArrayList();
        String str = null;
        while (true) {
            if (priorityQueue.isEmpty()) {
                break;
            }
            ServerInstance poll = priorityQueue.poll();
            arrayList.add(poll);
            if (!map.containsKey(poll._server)) {
                str = poll._server;
                poll._segments++;
                break;
            }
        }
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            priorityQueue.add((ServerInstance) it.next());
        }
        return str;
    }

    private Map<String, Map<String, String>> copyIdealState(Map<String, Map<String, String>> map) {
        HashMap hashMap = new HashMap(map);
        for (Map.Entry<String, Map<String, String>> entry : map.entrySet()) {
            HashMap hashMap2 = new HashMap(entry.getValue().size());
            for (Map.Entry<String, String> entry2 : entry.getValue().entrySet()) {
                hashMap2.put(entry2.getKey(), entry2.getValue());
            }
            hashMap.put(entry.getKey(), hashMap2);
        }
        return hashMap;
    }

    private void verifyServerLists(List<String> list, List<String> list2) {
        for (String str : list) {
            if (list2.contains(str)) {
                LOGGER.error("Source host: {} is also present in destination list", str);
                LOGGER.error("Refusing to migrate replica group");
                System.exit(1);
            }
        }
        if (hasDisabledInstances("Destination", list2)) {
            LOGGER.error("Destination server list has disabled instances. Retry after correcting input");
            System.exit(1);
        }
    }

    private boolean hasDisabledInstances(String str, List<String> list) {
        boolean z = false;
        for (String str2 : list) {
            if (!this._helix.getInstanceConfig(this._zkPath, str2).getInstanceEnabled()) {
                LOGGER.error("{} instance: {} is disabled", str, str2);
                z = true;
            }
        }
        return z;
    }

    private PriorityQueue<ServerInstance> getDestinationServerQueue(Map<String, Map<String, String>> map, List<String> list) {
        HashMap hashMap = new HashMap(list.size());
        for (String str : list) {
            hashMap.put(str, new ServerInstance(str, 0));
        }
        Iterator<Map.Entry<String, Map<String, String>>> it = map.entrySet().iterator();
        while (it.hasNext()) {
            Iterator<Map.Entry<String, String>> it2 = it.next().getValue().entrySet().iterator();
            while (it2.hasNext()) {
                ServerInstance serverInstance = (ServerInstance) hashMap.get(it2.next().getKey());
                if (serverInstance != null) {
                    serverInstance._segments++;
                }
            }
        }
        PriorityQueue<ServerInstance> priorityQueue = new PriorityQueue<>(list.size(), new Comparator<ServerInstance>() { // from class: org.apache.pinot.tools.admin.command.MoveReplicaGroup.2
            @Override // java.util.Comparator
            public int compare(ServerInstance serverInstance2, ServerInstance serverInstance3) {
                return serverInstance2._segments < serverInstance3._segments ? -1 : 1;
            }
        });
        Iterator it3 = hashMap.entrySet().iterator();
        while (it3.hasNext()) {
            priorityQueue.add((ServerInstance) ((Map.Entry) it3.next()).getValue());
        }
        return priorityQueue;
    }

    private PriorityQueue<SourceSegments> getSegmentsToMoveQueue(Map<String, Map<String, String>> map, List<String> list) {
        PriorityQueue<SourceSegments> priorityQueue = new PriorityQueue<>(map.keySet().size(), new Comparator<SourceSegments>() { // from class: org.apache.pinot.tools.admin.command.MoveReplicaGroup.3
            @Override // java.util.Comparator
            public int compare(SourceSegments sourceSegments, SourceSegments sourceSegments2) {
                return sourceSegments._replicaCount > sourceSegments2._replicaCount ? -1 : 1;
            }
        });
        for (Map.Entry<String, Map<String, String>> entry : map.entrySet()) {
            SourceSegments sourceSegments = new SourceSegments(entry.getKey(), 0);
            Iterator<Map.Entry<String, String>> it = entry.getValue().entrySet().iterator();
            while (it.hasNext()) {
                if (list.contains(it.next().getKey())) {
                    sourceSegments._replicaCount++;
                }
            }
            if (sourceSegments._replicaCount > 0) {
                priorityQueue.add(sourceSegments);
            }
        }
        return priorityQueue;
    }

    private void validateParams() {
        if (this._tableName == null || this._tableName.isEmpty()) {
            LOGGER.error("Table name is required and can not be empty");
            System.exit(1);
        }
        if (TableNameBuilder.isRealtimeTableResource(this._tableName)) {
            LOGGER.error("This operation is not supported for realtime table. table: {}", this._tableName);
            System.exit(1);
        }
        this._tableName = TableNameBuilder.OFFLINE.tableNameWithType(this._tableName);
        if (this._zkHost.isEmpty() || this._zkPath.isEmpty()) {
            LOGGER.error("zkHost or zkPath should not be empty");
            System.exit(1);
        }
        if (this._zkPath.startsWith("/")) {
            this._zkPath = this._zkPath.substring(1);
        }
        String[] split = this._zkHost.split("/");
        String[] split2 = this._zkPath.split("/");
        if (split.length == 1 || (split.length == 2 && split[1].isEmpty())) {
            this._zkHost = split[0] + "/" + split2[0];
            this._zkPath = Joiner.on("/").join(Arrays.copyOfRange(split2, 1, split2.length));
        }
        LOGGER.info("Using zkHost: {}, zkPath: {}", this._zkHost, this._zkPath);
    }

    private String getServerTenantName(String str) throws IOException {
        return getTableConfig(str).getTenantConfig().getServer();
    }

    private TableConfig getTableConfig(String str) throws IOException {
        TableConfig fromZNRecord = TableConfigUtils.fromZNRecord((ZNRecord) new ZkHelixPropertyStore(this._zkHost, new ZNRecordSerializer(), PropertyPathBuilder.propertyStore(this._zkPath)).get("/CONFIGS/TABLE/" + str, (Stat) null, 0));
        LOGGER.debug("Loaded table config");
        return fromZNRecord;
    }

    private boolean isExistingTable(String str) {
        return this._helix.getResourcesInCluster(this._zkPath).contains(str);
    }

    private List<String> readDestinationServers() throws IOException {
        if (!this._destHostsFile.isEmpty()) {
            return readHostsFromFile(this._destHostsFile);
        }
        String str = getServerTenantName(this._tableName) + "_OFFLINE";
        LOGGER.debug("Using server tenant: {}", str);
        return HelixHelper.getEnabledInstancesWithTag(this._zkChanger.getHelixManager(), str);
    }

    private List<String> readHostsFromFile(String str) throws IOException {
        return hostNameToInstanceNames(Files.readAllLines(Paths.get(str, new String[0]), Charset.defaultCharset()));
    }

    private List<String> hostNameToInstanceNames(List<String> list) {
        ArrayList arrayList = new ArrayList(list.size());
        for (String str : list) {
            if (!str.isEmpty()) {
                arrayList.add(str.split("_").length == 1 ? "Server_" + str + "_8001" : str);
            }
        }
        return arrayList;
    }

    public static void main(String[] strArr) throws Exception {
        try {
            new CommandLine(new MoveReplicaGroup()).execute(strArr);
        } catch (Exception e) {
            LOGGER.error("Failed to parse/execute with arguments", e);
            System.exit(1);
        }
    }
}
