package org.apache.kafka.streams.processor.internals.assignment;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.SortedMap;
import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.assignment.AssignmentConfigs;
import org.apache.kafka.streams.processor.assignment.ProcessId;
import org.apache.kafka.streams.processor.internals.TopologyMetadata;
import org.apache.kafka.test.MockClientSupplier;
import org.apache.kafka.test.MockInternalTopicManager;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.jupiter.api.Assertions;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignorTest.class */
public class RackAwareTaskAssignorTest {
    private int trafficCost;
    private int nonOverlapCost;

    @Parameterized.Parameter
    public boolean stateful;

    @Parameterized.Parameter(1)
    public String assignmentStrategy;
    private final MockTime time = new MockTime();
    private final StreamsConfig streamsConfig = new StreamsConfig(AssignmentTestUtils.configProps("min_traffic"));
    private final MockClientSupplier mockClientSupplier = new MockClientSupplier();
    private final MockInternalTopicManager mockInternalTopicManager = new MockInternalTopicManager(this.time, this.streamsConfig, this.mockClientSupplier.restoreConsumer, false);

    @Parameterized.Parameters(name = "stateful={0}, Strategy={1}")
    public static Collection<Object[]> getParamStoreType() {
        return Arrays.asList(new Object[]{true, "balance_subtopology"}, new Object[]{false, "min_traffic"}, new Object[]{true, "min_traffic"}, new Object[]{false, "balance_subtopology"});
    }

    @Before
    public void setUp() {
        if (this.stateful) {
            this.trafficCost = 10;
            this.nonOverlapCost = 1;
        } else {
            this.trafficCost = 1;
            this.nonOverlapCost = 0;
        }
    }

    private AssignmentConfigs getRackAwareEnabledConfig() {
        return new AssignorConfiguration(new StreamsConfig(AssignmentTestUtils.configProps(this.assignmentStrategy)).originals()).assignmentConfigs();
    }

    private AssignmentConfigs getRackAwareEnabledConfigWithStandby(int i) {
        return new AssignorConfiguration(new StreamsConfig(AssignmentTestUtils.configProps(this.assignmentStrategy, i)).originals()).assignmentConfigs();
    }

    private AssignmentConfigs getRackAwareDisabledConfig() {
        return new AssignorConfiguration(new StreamsConfig(AssignmentTestUtils.configProps("none")).originals()).assignmentConfigs();
    }

    @Test
    public void shouldDisableAssignorFromConfig() {
        RackAwareTaskAssignor rackAwareTaskAssignor = (RackAwareTaskAssignor) Mockito.spy(new RackAwareTaskAssignor(getClusterForTopic0(), getTaskTopicPartitionMapForTask0(true), Utils.mkMap(new Map.Entry[0]), AssignmentTestUtils.getTopologyGroupTaskMap(), getProcessRacksForProcess0(), this.mockInternalTopicManager, getRackAwareDisabledConfig(), this.time));
        Assert.assertFalse(rackAwareTaskAssignor.canEnableRackAwareAssignor());
        ((RackAwareTaskAssignor) Mockito.verify(rackAwareTaskAssignor, Mockito.never())).populateTopicsToDescribe(ArgumentMatchers.anySet(), ArgumentMatchers.eq(false));
        ((RackAwareTaskAssignor) Mockito.verify(rackAwareTaskAssignor, Mockito.never())).populateTopicsToDescribe(ArgumentMatchers.anySet(), ArgumentMatchers.eq(true));
    }

    @Test
    public void shouldDisableActiveWhenMissingClusterInfo() {
        RackAwareTaskAssignor rackAwareTaskAssignor = (RackAwareTaskAssignor) Mockito.spy(new RackAwareTaskAssignor(getClusterForTopic0(), getTaskTopicPartitionMapForTask0(true), Utils.mkMap(new Map.Entry[0]), AssignmentTestUtils.getTopologyGroupTaskMap(), getProcessRacksForProcess0(), this.mockInternalTopicManager, getRackAwareEnabledConfig(), this.time));
        Assert.assertFalse(rackAwareTaskAssignor.canEnableRackAwareAssignor());
        ((RackAwareTaskAssignor) Mockito.verify(rackAwareTaskAssignor, Mockito.times(1))).populateTopicsToDescribe(ArgumentMatchers.anySet(), ArgumentMatchers.eq(false));
        ((RackAwareTaskAssignor) Mockito.verify(rackAwareTaskAssignor, Mockito.never())).populateTopicsToDescribe(ArgumentMatchers.anySet(), ArgumentMatchers.eq(true));
        Assert.assertFalse(rackAwareTaskAssignor.populateTopicsToDescribe(new HashSet(), false));
    }

    @Test
    public void shouldDisableActiveWhenRackMissingInNode() {
        RackAwareTaskAssignor rackAwareTaskAssignor = (RackAwareTaskAssignor) Mockito.spy(new RackAwareTaskAssignor(getClusterWithPartitionMissingRack(), getTaskTopicPartitionMapForTask0(), Utils.mkMap(new Map.Entry[0]), AssignmentTestUtils.getTopologyGroupTaskMap(), getProcessRacksForProcess0(), this.mockInternalTopicManager, getRackAwareEnabledConfig(), this.time));
        Assert.assertFalse(rackAwareTaskAssignor.canEnableRackAwareAssignor());
        ((RackAwareTaskAssignor) Mockito.verify(rackAwareTaskAssignor, Mockito.times(1))).populateTopicsToDescribe(ArgumentMatchers.anySet(), ArgumentMatchers.eq(false));
        ((RackAwareTaskAssignor) Mockito.verify(rackAwareTaskAssignor, Mockito.never())).populateTopicsToDescribe(ArgumentMatchers.anySet(), ArgumentMatchers.eq(true));
        Assert.assertFalse(rackAwareTaskAssignor.populateTopicsToDescribe(new HashSet(), false));
    }

    @Test
    public void shouldReturnInvalidClientRackWhenRackMissingInClientConsumer() {
        Assert.assertFalse(new RackAwareTaskAssignor(getClusterForTopic0(), getTaskTopicPartitionMapForTask0(), Utils.mkMap(new Map.Entry[0]), AssignmentTestUtils.getTopologyGroupTaskMap(), getProcessRacksForProcess0(true), this.mockInternalTopicManager, getRackAwareEnabledConfig(), this.time).validClientRack());
    }

    @Test
    public void shouldReturnFalseWhenRackMissingInProcess() {
        Assert.assertFalse(new RackAwareTaskAssignor(getClusterForTopic0(), getTaskTopicPartitionMapForTask0(), Utils.mkMap(new Map.Entry[0]), AssignmentTestUtils.getTopologyGroupTaskMap(), getProcessWithNoConsumerRacks(), this.mockInternalTopicManager, getRackAwareEnabledConfig(), this.time).validClientRack());
    }

    @Test
    public void shouldPopulateRacksForProcess() {
        Assert.assertEquals(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(AssignmentTestUtils.PID_1, AssignmentTestUtils.RACK_1)}), new RackAwareTaskAssignor(getClusterForTopic0(), getTaskTopicPartitionMapForTask0(), Utils.mkMap(new Map.Entry[0]), AssignmentTestUtils.getTopologyGroupTaskMap(), getProcessRacksForProcess0(), this.mockInternalTopicManager, getRackAwareEnabledConfig(), this.time).racksForProcess());
    }

    @Test
    public void shouldReturnInvalidClientRackWhenRackDiffersInSameProcess() {
        HashMap hashMap = new HashMap();
        ((Map) hashMap.computeIfAbsent(AssignmentTestUtils.PID_1, processId -> {
            return new HashMap();
        })).put("consumer1", Optional.of(AssignmentTestUtils.RACK_1));
        ((Map) hashMap.computeIfAbsent(AssignmentTestUtils.PID_1, processId2 -> {
            return new HashMap();
        })).put("consumer2", Optional.of(AssignmentTestUtils.RACK_2));
        Assert.assertFalse(new RackAwareTaskAssignor(getClusterForTopic0(), getTaskTopicPartitionMapForTask0(), Utils.mkMap(new Map.Entry[0]), AssignmentTestUtils.getTopologyGroupTaskMap(), hashMap, this.mockInternalTopicManager, getRackAwareEnabledConfig(), this.time).validClientRack());
    }

    @Test
    public void shouldEnableRackAwareAssignorWithoutDescribingTopics() {
        Assert.assertTrue(new RackAwareTaskAssignor(getClusterForTopic0(), getTaskTopicPartitionMapForTask0(), Utils.mkMap(new Map.Entry[0]), AssignmentTestUtils.getTopologyGroupTaskMap(), getProcessRacksForProcess0(), this.mockInternalTopicManager, getRackAwareEnabledConfig(), this.time).canEnableRackAwareAssignor());
    }

    @Test
    public void shouldEnableRackAwareAssignorWithCacheResult() {
        RackAwareTaskAssignor rackAwareTaskAssignor = (RackAwareTaskAssignor) Mockito.spy(new RackAwareTaskAssignor(getClusterForTopic0(), getTaskTopicPartitionMapForTask0(), Utils.mkMap(new Map.Entry[0]), AssignmentTestUtils.getTopologyGroupTaskMap(), getProcessRacksForProcess0(), this.mockInternalTopicManager, getRackAwareEnabledConfig(), this.time));
        Assert.assertTrue(rackAwareTaskAssignor.canEnableRackAwareAssignor());
        ((RackAwareTaskAssignor) Mockito.verify(rackAwareTaskAssignor, Mockito.times(1))).populateTopicsToDescribe(ArgumentMatchers.anySet(), ArgumentMatchers.eq(false));
        Mockito.reset(new RackAwareTaskAssignor[]{rackAwareTaskAssignor});
        Assert.assertTrue(rackAwareTaskAssignor.canEnableRackAwareAssignor());
        ((RackAwareTaskAssignor) Mockito.verify(rackAwareTaskAssignor, Mockito.never())).populateTopicsToDescribe(ArgumentMatchers.anySet(), ArgumentMatchers.eq(false));
    }

    @Test
    public void shouldEnableRackAwareAssignorWithDescribingTopics() {
        MockInternalTopicManager mockInternalTopicManager = (MockInternalTopicManager) Mockito.spy(this.mockInternalTopicManager);
        ((MockInternalTopicManager) Mockito.doReturn(Collections.singletonMap(AssignmentTestUtils.TP_0_NAME, Collections.singletonList(new TopicPartitionInfo(0, AssignmentTestUtils.NODE_0, Arrays.asList(AssignmentTestUtils.REPLICA_1), Collections.emptyList())))).when(mockInternalTopicManager)).getTopicPartitionInfo(Collections.singleton(AssignmentTestUtils.TP_0_NAME));
        Assert.assertTrue(new RackAwareTaskAssignor(getClusterWithNoNode(), getTaskTopicPartitionMapForTask0(), Utils.mkMap(new Map.Entry[0]), AssignmentTestUtils.getTopologyGroupTaskMap(), getProcessRacksForProcess0(), mockInternalTopicManager, getRackAwareEnabledConfig(), this.time).canEnableRackAwareAssignor());
    }

    @Test
    public void shouldEnableRackAwareAssignorWithStandbyDescribingTopics() {
        MockInternalTopicManager mockInternalTopicManager = (MockInternalTopicManager) Mockito.spy(this.mockInternalTopicManager);
        ((MockInternalTopicManager) Mockito.doReturn(Collections.singletonMap(AssignmentTestUtils.TP_0_NAME, Collections.singletonList(new TopicPartitionInfo(0, AssignmentTestUtils.NODE_0, Arrays.asList(AssignmentTestUtils.REPLICA_1), Collections.emptyList())))).when(mockInternalTopicManager)).getTopicPartitionInfo(Collections.singleton(AssignmentTestUtils.TP_0_NAME));
        ((MockInternalTopicManager) Mockito.doReturn(Collections.singletonMap(AssignmentTestUtils.CHANGELOG_TP_0_NAME, Collections.singletonList(new TopicPartitionInfo(0, AssignmentTestUtils.NODE_0, Arrays.asList(AssignmentTestUtils.REPLICA_1), Collections.emptyList())))).when(mockInternalTopicManager)).getTopicPartitionInfo(Collections.singleton(AssignmentTestUtils.CHANGELOG_TP_0_NAME));
        RackAwareTaskAssignor rackAwareTaskAssignor = (RackAwareTaskAssignor) Mockito.spy(new RackAwareTaskAssignor(getClusterWithNoNode(), getTaskTopicPartitionMapForTask0(), getTaskChangeLogTopicPartitionMapForTask0(), AssignmentTestUtils.getTopologyGroupTaskMap(), getProcessRacksForProcess0(), mockInternalTopicManager, getRackAwareEnabledConfigWithStandby(1), this.time));
        Assert.assertTrue(rackAwareTaskAssignor.canEnableRackAwareAssignor());
        ((RackAwareTaskAssignor) Mockito.verify(rackAwareTaskAssignor, Mockito.times(1))).populateTopicsToDescribe(ArgumentMatchers.anySet(), ArgumentMatchers.eq(false));
        ((RackAwareTaskAssignor) Mockito.verify(rackAwareTaskAssignor, Mockito.times(1))).populateTopicsToDescribe(ArgumentMatchers.anySet(), ArgumentMatchers.eq(true));
        Assert.assertEquals(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(AssignmentTestUtils.TP_0_0, Utils.mkSet(new String[]{AssignmentTestUtils.RACK_1, AssignmentTestUtils.RACK_2})), Utils.mkEntry(AssignmentTestUtils.CHANGELOG_TP_0_0, Utils.mkSet(new String[]{AssignmentTestUtils.RACK_1, AssignmentTestUtils.RACK_2}))}), rackAwareTaskAssignor.racksForPartition());
    }

    @Test
    public void shouldDisableRackAwareAssignorWithStandbyDescribingTopicsFailure() {
        MockInternalTopicManager mockInternalTopicManager = (MockInternalTopicManager) Mockito.spy(this.mockInternalTopicManager);
        ((MockInternalTopicManager) Mockito.doReturn(Collections.singletonMap(AssignmentTestUtils.TP_0_NAME, Collections.singletonList(new TopicPartitionInfo(0, AssignmentTestUtils.NODE_0, Arrays.asList(AssignmentTestUtils.REPLICA_1), Collections.emptyList())))).when(mockInternalTopicManager)).getTopicPartitionInfo(Collections.singleton(AssignmentTestUtils.TP_0_NAME));
        ((MockInternalTopicManager) Mockito.doThrow(new Throwable[]{new TimeoutException("Timeout describing topic")}).when(mockInternalTopicManager)).getTopicPartitionInfo(Collections.singleton(AssignmentTestUtils.CHANGELOG_TP_0_NAME));
        RackAwareTaskAssignor rackAwareTaskAssignor = (RackAwareTaskAssignor) Mockito.spy(new RackAwareTaskAssignor(getClusterWithNoNode(), getTaskTopicPartitionMapForTask0(), getTaskChangeLogTopicPartitionMapForTask0(), AssignmentTestUtils.getTopologyGroupTaskMap(), getProcessRacksForProcess0(), mockInternalTopicManager, getRackAwareEnabledConfigWithStandby(1), this.time));
        Assert.assertFalse(rackAwareTaskAssignor.canEnableRackAwareAssignor());
        ((RackAwareTaskAssignor) Mockito.verify(rackAwareTaskAssignor, Mockito.times(1))).populateTopicsToDescribe(ArgumentMatchers.anySet(), ArgumentMatchers.eq(false));
        ((RackAwareTaskAssignor) Mockito.verify(rackAwareTaskAssignor, Mockito.times(1))).populateTopicsToDescribe(ArgumentMatchers.anySet(), ArgumentMatchers.eq(true));
    }

    @Test
    public void shouldDisableRackAwareAssignorWithDescribingTopicsFailure() {
        MockInternalTopicManager mockInternalTopicManager = (MockInternalTopicManager) Mockito.spy(this.mockInternalTopicManager);
        ((MockInternalTopicManager) Mockito.doThrow(new Throwable[]{new TimeoutException("Timeout describing topic")}).when(mockInternalTopicManager)).getTopicPartitionInfo(Collections.singleton(AssignmentTestUtils.TP_0_NAME));
        RackAwareTaskAssignor rackAwareTaskAssignor = (RackAwareTaskAssignor) Mockito.spy(new RackAwareTaskAssignor(getClusterWithNoNode(), getTaskTopicPartitionMapForTask0(), Utils.mkMap(new Map.Entry[0]), AssignmentTestUtils.getTopologyGroupTaskMap(), getProcessRacksForProcess0(), mockInternalTopicManager, getRackAwareEnabledConfig(), this.time));
        Assert.assertFalse(rackAwareTaskAssignor.canEnableRackAwareAssignor());
        ((RackAwareTaskAssignor) Mockito.verify(rackAwareTaskAssignor, Mockito.times(1))).populateTopicsToDescribe(ArgumentMatchers.anySet(), ArgumentMatchers.eq(false));
        ((RackAwareTaskAssignor) Mockito.verify(rackAwareTaskAssignor, Mockito.never())).populateTopicsToDescribe(ArgumentMatchers.anySet(), ArgumentMatchers.eq(true));
        Assert.assertTrue(rackAwareTaskAssignor.populateTopicsToDescribe(new HashSet(), false));
    }

    @Test
    public void shouldOptimizeEmptyActiveTasks() {
        RackAwareTaskAssignor rackAwareTaskAssignor = new RackAwareTaskAssignor(AssignmentTestUtils.getClusterForAllTopics(), AssignmentTestUtils.getTaskTopicPartitionMapForAllTasks(), Utils.mkMap(new Map.Entry[0]), AssignmentTestUtils.getTopologyGroupTaskMap(), AssignmentTestUtils.getProcessRacksForAllProcess(), this.mockInternalTopicManager, getRackAwareEnabledConfig(), this.time);
        ClientState clientState = new ClientState(Collections.emptySet(), Collections.emptySet(), Collections.emptyMap(), AssignmentTestUtils.EMPTY_CLIENT_TAGS, 1);
        clientState.assignActiveTasks(Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_1_1}));
        TreeMap treeMap = new TreeMap(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(AssignmentTestUtils.PID_1, clientState)}));
        SortedSet mkSortedSet = Utils.mkSortedSet(new TaskId[0]);
        Assert.assertTrue(rackAwareTaskAssignor.canEnableRackAwareAssignor());
        Assert.assertEquals(0L, rackAwareTaskAssignor.activeTasksCost(mkSortedSet, treeMap, this.trafficCost, this.nonOverlapCost));
        Assert.assertEquals(0L, rackAwareTaskAssignor.optimizeActiveTasks(mkSortedSet, treeMap, this.trafficCost, this.nonOverlapCost));
        Assert.assertEquals(Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_1_1}), clientState.activeTasks());
    }

    @Test
    public void shouldOptimizeActiveTasks() {
        RackAwareTaskAssignor rackAwareTaskAssignor = new RackAwareTaskAssignor(AssignmentTestUtils.getClusterForAllTopics(), AssignmentTestUtils.getTaskTopicPartitionMapForAllTasks(), Utils.mkMap(new Map.Entry[0]), Utils.mkMap(new Map.Entry[]{Utils.mkEntry(new TopologyMetadata.Subtopology(0, (String) null), Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1})), Utils.mkEntry(new TopologyMetadata.Subtopology(1, (String) null), Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_1_0, AssignmentTestUtils.TASK_1_1}))}), AssignmentTestUtils.getProcessRacksForAllProcess(), this.mockInternalTopicManager, getRackAwareEnabledConfig(), this.time);
        ClientState clientState = new ClientState(Collections.emptySet(), Collections.emptySet(), Collections.emptyMap(), AssignmentTestUtils.EMPTY_CLIENT_TAGS, 1);
        ClientState clientState2 = new ClientState(Collections.emptySet(), Collections.emptySet(), Collections.emptyMap(), AssignmentTestUtils.EMPTY_CLIENT_TAGS, 1);
        ClientState clientState3 = new ClientState(Collections.emptySet(), Collections.emptySet(), Collections.emptyMap(), AssignmentTestUtils.EMPTY_CLIENT_TAGS, 1);
        clientState.assignActiveTasks(Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_1_1}));
        clientState2.assignActive(AssignmentTestUtils.TASK_1_0);
        clientState3.assignActive(AssignmentTestUtils.TASK_0_0);
        TreeMap treeMap = new TreeMap(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(AssignmentTestUtils.PID_1, clientState), Utils.mkEntry(AssignmentTestUtils.PID_2, clientState2), Utils.mkEntry(AssignmentTestUtils.PID_3, clientState3)}));
        SortedSet mkSortedSet = Utils.mkSortedSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_1_0, AssignmentTestUtils.TASK_1_1});
        Assert.assertTrue(rackAwareTaskAssignor.canEnableRackAwareAssignor());
        Assert.assertEquals(this.stateful ? 40 : 4, rackAwareTaskAssignor.activeTasksCost(mkSortedSet, treeMap, this.trafficCost, this.nonOverlapCost));
        Assert.assertEquals(this.stateful ? 4 : 0, rackAwareTaskAssignor.optimizeActiveTasks(mkSortedSet, treeMap, this.trafficCost, this.nonOverlapCost));
        Assert.assertEquals(Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_1_0}), clientState.activeTasks());
        Assert.assertEquals(Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_1_1}), clientState2.activeTasks());
        Assert.assertEquals(Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_1}), clientState3.activeTasks());
    }

    @Test
    public void shouldOptimizeRandomActive() {
        SortedMap<TaskId, Set<TopicPartition>> taskTopicPartitionMap = AssignmentTestUtils.getTaskTopicPartitionMap(40, 3, false);
        RackAwareTaskAssignor rackAwareTaskAssignor = new RackAwareTaskAssignor(AssignmentTestUtils.getRandomCluster(30, 40, 3), taskTopicPartitionMap, Utils.mkMap(new Map.Entry[0]), AssignmentTestUtils.getTasksForTopicGroup(40, 3), AssignmentTestUtils.getRandomProcessRacks(30, 30), this.mockInternalTopicManager, getRackAwareEnabledConfig(), this.time);
        SortedSet sortedSet = (SortedSet) taskTopicPartitionMap.keySet();
        SortedMap<ProcessId, ClientState> randomClientState = AssignmentTestUtils.getRandomClientState(30, 40, 3, 1, sortedSet);
        Map<ProcessId, Integer> clientTaskCount = AssignmentTestUtils.clientTaskCount(randomClientState, (v0) -> {
            return v0.activeTaskCount();
        });
        Assert.assertTrue(rackAwareTaskAssignor.canEnableRackAwareAssignor());
        MatcherAssert.assertThat(Long.valueOf(rackAwareTaskAssignor.optimizeActiveTasks(sortedSet, randomClientState, this.trafficCost, this.nonOverlapCost)), Matchers.lessThanOrEqualTo(Long.valueOf(rackAwareTaskAssignor.activeTasksCost(sortedSet, randomClientState, this.trafficCost, this.nonOverlapCost))));
        for (Map.Entry<ProcessId, ClientState> entry : randomClientState.entrySet()) {
            Assert.assertEquals(clientTaskCount.get(entry.getKey()).intValue(), entry.getValue().activeTasks().size());
        }
        if (this.assignmentStrategy.equals("balance_subtopology")) {
            AssignmentTestUtils.assertBalancedTasks(randomClientState);
        }
    }

    @Test
    public void shouldMaintainOriginalAssignmentForMinCost() {
        SortedMap<TaskId, Set<TopicPartition>> taskTopicPartitionMap = AssignmentTestUtils.getTaskTopicPartitionMap(40, 3, false);
        RackAwareTaskAssignor rackAwareTaskAssignor = new RackAwareTaskAssignor(AssignmentTestUtils.getRandomCluster(20, 40, 3), taskTopicPartitionMap, Utils.mkMap(new Map.Entry[0]), AssignmentTestUtils.getTasksForTopicGroup(40, 3), AssignmentTestUtils.getRandomProcessRacks(30, 20), this.mockInternalTopicManager, getRackAwareEnabledConfig(), this.time);
        SortedSet sortedSet = (SortedSet) taskTopicPartitionMap.keySet();
        SortedMap<ProcessId, ClientState> randomClientState = AssignmentTestUtils.getRandomClientState(30, 40, 3, 1, sortedSet);
        HashMap hashMap = new HashMap();
        for (Map.Entry<ProcessId, ClientState> entry : randomClientState.entrySet()) {
            entry.getValue().activeTasks().forEach(taskId -> {
            });
        }
        Assert.assertEquals(sortedSet.size(), hashMap.size());
        Assert.assertTrue(rackAwareTaskAssignor.canEnableRackAwareAssignor());
        Assert.assertEquals(0L, rackAwareTaskAssignor.activeTasksCost(sortedSet, randomClientState, 0, 1));
        long optimizeActiveTasks = rackAwareTaskAssignor.optimizeActiveTasks(sortedSet, randomClientState, 0, 1);
        if (!this.assignmentStrategy.equals("min_traffic")) {
            AssignmentTestUtils.assertValidAssignment(0, sortedSet, Collections.emptySet(), randomClientState, new StringBuilder());
            return;
        }
        Assert.assertEquals(0L, optimizeActiveTasks);
        for (Map.Entry entry2 : hashMap.entrySet()) {
            Assert.assertTrue(randomClientState.get(entry2.getValue()).hasAssignedTask((TaskId) entry2.getKey()));
        }
    }

    @Test
    public void shouldOptimizeActiveTasksWithMoreClients() {
        RackAwareTaskAssignor rackAwareTaskAssignor = new RackAwareTaskAssignor(AssignmentTestUtils.getClusterForAllTopics(), AssignmentTestUtils.getTaskTopicPartitionMapForAllTasks(), Utils.mkMap(new Map.Entry[0]), Utils.mkMap(new Map.Entry[]{Utils.mkEntry(new TopologyMetadata.Subtopology(0, (String) null), Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0})), Utils.mkEntry(new TopologyMetadata.Subtopology(1, (String) null), Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_1_0}))}), AssignmentTestUtils.getProcessRacksForAllProcess(), this.mockInternalTopicManager, getRackAwareEnabledConfig(), this.time);
        ClientState clientState = new ClientState(Collections.emptySet(), Collections.emptySet(), Collections.emptyMap(), AssignmentTestUtils.EMPTY_CLIENT_TAGS, 1);
        ClientState clientState2 = new ClientState(Collections.emptySet(), Collections.emptySet(), Collections.emptyMap(), AssignmentTestUtils.EMPTY_CLIENT_TAGS, 1);
        ClientState clientState3 = new ClientState(Collections.emptySet(), Collections.emptySet(), Collections.emptyMap(), AssignmentTestUtils.EMPTY_CLIENT_TAGS, 1);
        clientState2.assignActive(AssignmentTestUtils.TASK_1_0);
        clientState3.assignActive(AssignmentTestUtils.TASK_0_0);
        TreeMap treeMap = new TreeMap(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(AssignmentTestUtils.PID_1, clientState), Utils.mkEntry(AssignmentTestUtils.PID_2, clientState2), Utils.mkEntry(AssignmentTestUtils.PID_3, clientState3)}));
        SortedSet mkSortedSet = Utils.mkSortedSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_1_0});
        Assert.assertTrue(rackAwareTaskAssignor.canEnableRackAwareAssignor());
        Assert.assertEquals(this.stateful ? 20 : 2, rackAwareTaskAssignor.activeTasksCost(mkSortedSet, treeMap, this.trafficCost, this.nonOverlapCost));
        Assert.assertEquals(this.stateful ? 2 : 0, rackAwareTaskAssignor.optimizeActiveTasks(mkSortedSet, treeMap, this.trafficCost, this.nonOverlapCost));
        Assert.assertEquals(Utils.mkSet(new Object[0]), clientState.activeTasks());
        Assert.assertEquals(Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0}), clientState2.activeTasks());
        Assert.assertEquals(Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_1_0}), clientState3.activeTasks());
    }

    @Test
    public void shouldOptimizeActiveTasksWithMoreClientsWithMoreThanOneTask() {
        RackAwareTaskAssignor rackAwareTaskAssignor = new RackAwareTaskAssignor(AssignmentTestUtils.getClusterForAllTopics(), AssignmentTestUtils.getTaskTopicPartitionMapForAllTasks(), Utils.mkMap(new Map.Entry[0]), Utils.mkMap(new Map.Entry[]{Utils.mkEntry(new TopologyMetadata.Subtopology(0, (String) null), Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1})), Utils.mkEntry(new TopologyMetadata.Subtopology(1, (String) null), Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_1_0}))}), AssignmentTestUtils.getProcessRacksForAllProcess(), this.mockInternalTopicManager, getRackAwareEnabledConfig(), this.time);
        ClientState clientState = new ClientState(Collections.emptySet(), Collections.emptySet(), Collections.emptyMap(), AssignmentTestUtils.EMPTY_CLIENT_TAGS, 1);
        ClientState clientState2 = new ClientState(Collections.emptySet(), Collections.emptySet(), Collections.emptyMap(), AssignmentTestUtils.EMPTY_CLIENT_TAGS, 1);
        ClientState clientState3 = new ClientState(Collections.emptySet(), Collections.emptySet(), Collections.emptyMap(), AssignmentTestUtils.EMPTY_CLIENT_TAGS, 1);
        clientState2.assignActiveTasks(Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_1_0}));
        clientState3.assignActive(AssignmentTestUtils.TASK_0_0);
        TreeMap treeMap = new TreeMap(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(AssignmentTestUtils.PID_1, clientState), Utils.mkEntry(AssignmentTestUtils.PID_2, clientState2), Utils.mkEntry(AssignmentTestUtils.PID_3, clientState3)}));
        SortedSet mkSortedSet = Utils.mkSortedSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_1_0});
        Assert.assertTrue(rackAwareTaskAssignor.canEnableRackAwareAssignor());
        Assert.assertEquals(this.stateful ? 20 : 2, rackAwareTaskAssignor.activeTasksCost(mkSortedSet, treeMap, this.trafficCost, this.nonOverlapCost));
        Assert.assertEquals(this.stateful ? 2 : 0, rackAwareTaskAssignor.optimizeActiveTasks(mkSortedSet, treeMap, this.trafficCost, this.nonOverlapCost));
        Assert.assertEquals(Utils.mkSet(new Object[0]), clientState.activeTasks());
        Assert.assertEquals(Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1}), clientState2.activeTasks());
        Assert.assertEquals(Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_1_0}), clientState3.activeTasks());
    }

    @Test
    public void shouldBalanceAssignmentWithMoreCost() {
        RackAwareTaskAssignor rackAwareTaskAssignor = new RackAwareTaskAssignor(AssignmentTestUtils.getClusterForAllTopics(), AssignmentTestUtils.getTaskTopicPartitionMapForAllTasks(), Utils.mkMap(new Map.Entry[0]), Utils.mkMap(new Map.Entry[]{Utils.mkEntry(new TopologyMetadata.Subtopology(0, (String) null), Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1})), Utils.mkEntry(new TopologyMetadata.Subtopology(1, (String) null), Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_1_1}))}), AssignmentTestUtils.getProcessRacksForAllProcess(), this.mockInternalTopicManager, getRackAwareEnabledConfig(), this.time);
        ClientState clientState = new ClientState(Collections.emptySet(), Collections.emptySet(), Collections.emptyMap(), AssignmentTestUtils.EMPTY_CLIENT_TAGS, 1);
        ClientState clientState2 = new ClientState(Collections.emptySet(), Collections.emptySet(), Collections.emptyMap(), AssignmentTestUtils.EMPTY_CLIENT_TAGS, 1);
        clientState.assignActiveTasks(Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_1_1}));
        clientState2.assignActive(AssignmentTestUtils.TASK_0_1);
        TreeMap treeMap = new TreeMap(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(AssignmentTestUtils.PID_2, clientState), Utils.mkEntry(AssignmentTestUtils.PID_5, clientState2)}));
        SortedSet mkSortedSet = Utils.mkSortedSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_1_1});
        Assert.assertTrue(rackAwareTaskAssignor.canEnableRackAwareAssignor());
        int i = this.stateful ? 10 : 1;
        Assert.assertEquals(i, rackAwareTaskAssignor.activeTasksCost(mkSortedSet, treeMap, this.trafficCost, this.nonOverlapCost));
        Assert.assertEquals(i, rackAwareTaskAssignor.optimizeActiveTasks(mkSortedSet, treeMap, this.trafficCost, this.nonOverlapCost));
        if (this.stateful || this.assignmentStrategy.equals("min_traffic")) {
            Assert.assertEquals(Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_1_1}), clientState.activeTasks());
            Assert.assertEquals(Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_1}), clientState2.activeTasks());
        } else {
            Assert.assertEquals(Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1}), clientState.activeTasks());
            Assert.assertEquals(Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_1_1}), clientState2.activeTasks());
        }
    }

    @Test
    public void shouldThrowIfMissingCallcanEnableRackAwareAssignor() {
        RackAwareTaskAssignor rackAwareTaskAssignor = new RackAwareTaskAssignor(AssignmentTestUtils.getClusterForAllTopics(), AssignmentTestUtils.getTaskTopicPartitionMapForAllTasks(), Utils.mkMap(new Map.Entry[0]), AssignmentTestUtils.getTopologyGroupTaskMap(), AssignmentTestUtils.getProcessRacksForAllProcess(), this.mockInternalTopicManager, getRackAwareEnabledConfig(), this.time);
        ClientState clientState = new ClientState(Collections.emptySet(), Collections.emptySet(), Collections.emptyMap(), AssignmentTestUtils.EMPTY_CLIENT_TAGS, 1);
        ClientState clientState2 = new ClientState(Collections.emptySet(), Collections.emptySet(), Collections.emptyMap(), AssignmentTestUtils.EMPTY_CLIENT_TAGS, 1);
        clientState.assignActiveTasks(Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_1_1}));
        clientState2.assignActive(AssignmentTestUtils.TASK_0_1);
        TreeMap treeMap = new TreeMap(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(AssignmentTestUtils.PID_2, clientState), Utils.mkEntry(AssignmentTestUtils.PID_5, clientState2)}));
        SortedSet mkSortedSet = Utils.mkSortedSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_1_1});
        Assertions.assertEquals("TopicPartition topic0-0 has no rack information. Maybe forgot to call canEnableRackAwareAssignor first", ((Exception) Assertions.assertThrows(IllegalStateException.class, () -> {
            rackAwareTaskAssignor.optimizeActiveTasks(mkSortedSet, treeMap, this.trafficCost, this.nonOverlapCost);
        })).getMessage());
    }

    @Test
    public void shouldThrowIfTaskInMultipleClients() {
        RackAwareTaskAssignor rackAwareTaskAssignor = new RackAwareTaskAssignor(AssignmentTestUtils.getClusterForAllTopics(), AssignmentTestUtils.getTaskTopicPartitionMapForAllTasks(), Utils.mkMap(new Map.Entry[0]), AssignmentTestUtils.getTopologyGroupTaskMap(), AssignmentTestUtils.getProcessRacksForAllProcess(), this.mockInternalTopicManager, getRackAwareEnabledConfig(), this.time);
        ClientState clientState = new ClientState(Collections.emptySet(), Collections.emptySet(), Collections.emptyMap(), AssignmentTestUtils.EMPTY_CLIENT_TAGS, 1);
        ClientState clientState2 = new ClientState(Collections.emptySet(), Collections.emptySet(), Collections.emptyMap(), AssignmentTestUtils.EMPTY_CLIENT_TAGS, 1);
        clientState.assignActiveTasks(Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_1_1}));
        clientState2.assignActiveTasks(Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_1_1}));
        TreeMap treeMap = new TreeMap(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(AssignmentTestUtils.PID_2, clientState), Utils.mkEntry(AssignmentTestUtils.PID_5, clientState2)}));
        SortedSet mkSortedSet = Utils.mkSortedSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_1_1});
        Assert.assertTrue(rackAwareTaskAssignor.canEnableRackAwareAssignor());
        Assertions.assertEquals("Task 1_1 assigned to multiple clients 00000000-0000-0000-0000-000000000005, 00000000-0000-0000-0000-000000000002", ((Exception) Assertions.assertThrows(IllegalArgumentException.class, () -> {
            rackAwareTaskAssignor.optimizeActiveTasks(mkSortedSet, treeMap, this.trafficCost, this.nonOverlapCost);
        })).getMessage());
    }

    @Test
    public void shouldThrowIfTaskMissingInClients() {
        RackAwareTaskAssignor rackAwareTaskAssignor = new RackAwareTaskAssignor(AssignmentTestUtils.getClusterForAllTopics(), AssignmentTestUtils.getTaskTopicPartitionMapForAllTasks(), Utils.mkMap(new Map.Entry[0]), AssignmentTestUtils.getTopologyGroupTaskMap(), AssignmentTestUtils.getProcessRacksForAllProcess(), this.mockInternalTopicManager, getRackAwareEnabledConfig(), this.time);
        ClientState clientState = new ClientState(Collections.emptySet(), Collections.emptySet(), Collections.emptyMap(), AssignmentTestUtils.EMPTY_CLIENT_TAGS, 1);
        ClientState clientState2 = new ClientState(Collections.emptySet(), Collections.emptySet(), Collections.emptyMap(), AssignmentTestUtils.EMPTY_CLIENT_TAGS, 1);
        clientState.assignActiveTasks(Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_1_1}));
        clientState2.assignActive(AssignmentTestUtils.TASK_0_1);
        TreeMap treeMap = new TreeMap(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(AssignmentTestUtils.PID_2, clientState), Utils.mkEntry(AssignmentTestUtils.PID_5, clientState2)}));
        SortedSet mkSortedSet = Utils.mkSortedSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_1_0, AssignmentTestUtils.TASK_1_1});
        Assert.assertTrue(rackAwareTaskAssignor.canEnableRackAwareAssignor());
        Assertions.assertEquals("Task 1_0 not assigned to any client", ((Exception) Assertions.assertThrows(IllegalArgumentException.class, () -> {
            rackAwareTaskAssignor.optimizeActiveTasks(mkSortedSet, treeMap, this.trafficCost, this.nonOverlapCost);
        })).getMessage());
    }

    @Test
    public void shouldNotCrashForEmptyStandby() {
        RackAwareTaskAssignor rackAwareTaskAssignor = new RackAwareTaskAssignor(AssignmentTestUtils.getClusterForAllTopics(), AssignmentTestUtils.getTaskTopicPartitionMapForAllTasks(), Utils.mkMap(new Map.Entry[0]), AssignmentTestUtils.getTopologyGroupTaskMap(), AssignmentTestUtils.getProcessRacksForAllProcess(), AssignmentTestUtils.mockInternalTopicManagerForChangelog(), getRackAwareEnabledConfigWithStandby(1), this.time);
        ClientState clientState = new ClientState(Collections.emptySet(), Collections.emptySet(), Collections.emptyMap(), AssignmentTestUtils.EMPTY_CLIENT_TAGS, 1, AssignmentTestUtils.PID_1);
        ClientState clientState2 = new ClientState(Collections.emptySet(), Collections.emptySet(), Collections.emptyMap(), AssignmentTestUtils.EMPTY_CLIENT_TAGS, 1, AssignmentTestUtils.PID_2);
        ClientState clientState3 = new ClientState(Collections.emptySet(), Collections.emptySet(), Collections.emptyMap(), AssignmentTestUtils.EMPTY_CLIENT_TAGS, 1, AssignmentTestUtils.PID_3);
        clientState.assignActiveTasks(Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_1_1}));
        clientState2.assignActive(AssignmentTestUtils.TASK_1_0);
        clientState3.assignActive(AssignmentTestUtils.TASK_0_0);
        TreeMap treeMap = new TreeMap(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(AssignmentTestUtils.PID_1, clientState), Utils.mkEntry(AssignmentTestUtils.PID_2, clientState2), Utils.mkEntry(AssignmentTestUtils.PID_3, clientState3)}));
        Assert.assertEquals(0L, rackAwareTaskAssignor.standByTasksCost(new TreeSet(), treeMap, 10, 1));
        Assert.assertEquals(0L, rackAwareTaskAssignor.optimizeStandbyTasks(treeMap, 10, 1, (clientState4, clientState5, taskId, map) -> {
            return true;
        }));
    }

    @Test
    public void shouldOptimizeStandbyTasksWhenTasksAllMovable() {
        RackAwareTaskAssignor rackAwareTaskAssignor = new RackAwareTaskAssignor(AssignmentTestUtils.getClusterForAllTopics(), AssignmentTestUtils.getTaskTopicPartitionMapForAllTasks(), AssignmentTestUtils.getTaskChangelogMapForAllTasks(), AssignmentTestUtils.getTopologyGroupTaskMap(), AssignmentTestUtils.getProcessRacksForAllProcess(), AssignmentTestUtils.mockInternalTopicManagerForChangelog(), getRackAwareEnabledConfigWithStandby(2), this.time);
        ClientState clientState = new ClientState(Collections.emptySet(), Collections.emptySet(), Collections.emptyMap(), AssignmentTestUtils.EMPTY_CLIENT_TAGS, 1, AssignmentTestUtils.PID_1);
        ClientState clientState2 = new ClientState(Collections.emptySet(), Collections.emptySet(), Collections.emptyMap(), AssignmentTestUtils.EMPTY_CLIENT_TAGS, 1, AssignmentTestUtils.PID_2);
        ClientState clientState3 = new ClientState(Collections.emptySet(), Collections.emptySet(), Collections.emptyMap(), AssignmentTestUtils.EMPTY_CLIENT_TAGS, 1, AssignmentTestUtils.PID_3);
        ClientState clientState4 = new ClientState(Collections.emptySet(), Collections.emptySet(), Collections.emptyMap(), AssignmentTestUtils.EMPTY_CLIENT_TAGS, 1, AssignmentTestUtils.PID_4);
        ClientState clientState5 = new ClientState(Collections.emptySet(), Collections.emptySet(), Collections.emptyMap(), AssignmentTestUtils.EMPTY_CLIENT_TAGS, 1, AssignmentTestUtils.PID_6);
        ClientState clientState6 = new ClientState(Collections.emptySet(), Collections.emptySet(), Collections.emptyMap(), AssignmentTestUtils.EMPTY_CLIENT_TAGS, 1, AssignmentTestUtils.PID_7);
        TreeMap treeMap = new TreeMap(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(AssignmentTestUtils.PID_1, clientState), Utils.mkEntry(AssignmentTestUtils.PID_2, clientState2), Utils.mkEntry(AssignmentTestUtils.PID_3, clientState3), Utils.mkEntry(AssignmentTestUtils.PID_4, clientState4), Utils.mkEntry(AssignmentTestUtils.PID_6, clientState5), Utils.mkEntry(AssignmentTestUtils.PID_7, clientState6)}));
        clientState.assignActive(AssignmentTestUtils.TASK_0_0);
        clientState2.assignActive(AssignmentTestUtils.TASK_0_1);
        clientState3.assignActive(AssignmentTestUtils.TASK_1_0);
        clientState4.assignActive(AssignmentTestUtils.TASK_1_1);
        clientState5.assignActive(AssignmentTestUtils.TASK_0_2);
        clientState6.assignActive(AssignmentTestUtils.TASK_1_2);
        clientState.assignStandbyTasks(Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_1_1}));
        clientState2.assignStandbyTasks(Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_1_0}));
        clientState3.assignStandbyTasks(Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_2}));
        clientState4.assignStandbyTasks(Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_1_2}));
        clientState5.assignStandbyTasks(Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_1_0, AssignmentTestUtils.TASK_1_2}));
        clientState6.assignStandbyTasks(Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_1_1}));
        TreeSet treeSet = new TreeSet(Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_1_0, AssignmentTestUtils.TASK_1_1, AssignmentTestUtils.TASK_1_2}));
        Map<ProcessId, Integer> clientTaskCount = AssignmentTestUtils.clientTaskCount(treeMap, (v0) -> {
            return v0.standbyTaskCount();
        });
        Assert.assertTrue(rackAwareTaskAssignor.canEnableRackAwareAssignor());
        AssignmentTestUtils.verifyStandbySatisfyRackReplica(treeSet, rackAwareTaskAssignor.racksForProcess(), treeMap, 2, false, null);
        Assert.assertEquals(60L, rackAwareTaskAssignor.standByTasksCost(treeSet, treeMap, 10, 1));
        Assert.assertEquals(20L, rackAwareTaskAssignor.optimizeStandbyTasks(treeMap, 10, 1, (clientState7, clientState8, taskId, map) -> {
            return true;
        }));
        AssignmentTestUtils.verifyStandbySatisfyRackReplica(treeSet, rackAwareTaskAssignor.racksForProcess(), treeMap, 2, true, clientTaskCount);
    }

    @Test
    public void shouldOptimizeStandbyTasksWithMovingConstraint() {
        AssignmentConfigs rackAwareEnabledConfigWithStandby = getRackAwareEnabledConfigWithStandby(2);
        RackAwareTaskAssignor rackAwareTaskAssignor = new RackAwareTaskAssignor(AssignmentTestUtils.getClusterForAllTopics(), AssignmentTestUtils.getTaskTopicPartitionMapForAllTasks(), AssignmentTestUtils.getTaskChangelogMapForAllTasks(), AssignmentTestUtils.getTopologyGroupTaskMap(), AssignmentTestUtils.getProcessRacksForAllProcess(), AssignmentTestUtils.mockInternalTopicManagerForChangelog(), rackAwareEnabledConfigWithStandby, this.time);
        ClientState clientState = new ClientState(Collections.emptySet(), Collections.emptySet(), Collections.emptyMap(), AssignmentTestUtils.EMPTY_CLIENT_TAGS, 1, AssignmentTestUtils.PID_1);
        ClientState clientState2 = new ClientState(Collections.emptySet(), Collections.emptySet(), Collections.emptyMap(), AssignmentTestUtils.EMPTY_CLIENT_TAGS, 1, AssignmentTestUtils.PID_2);
        ClientState clientState3 = new ClientState(Collections.emptySet(), Collections.emptySet(), Collections.emptyMap(), AssignmentTestUtils.EMPTY_CLIENT_TAGS, 1, AssignmentTestUtils.PID_3);
        ClientState clientState4 = new ClientState(Collections.emptySet(), Collections.emptySet(), Collections.emptyMap(), AssignmentTestUtils.EMPTY_CLIENT_TAGS, 1, AssignmentTestUtils.PID_4);
        ClientState clientState5 = new ClientState(Collections.emptySet(), Collections.emptySet(), Collections.emptyMap(), AssignmentTestUtils.EMPTY_CLIENT_TAGS, 1, AssignmentTestUtils.PID_6);
        ClientState clientState6 = new ClientState(Collections.emptySet(), Collections.emptySet(), Collections.emptyMap(), AssignmentTestUtils.EMPTY_CLIENT_TAGS, 1, AssignmentTestUtils.PID_7);
        TreeMap treeMap = new TreeMap(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(AssignmentTestUtils.PID_1, clientState), Utils.mkEntry(AssignmentTestUtils.PID_2, clientState2), Utils.mkEntry(AssignmentTestUtils.PID_3, clientState3), Utils.mkEntry(AssignmentTestUtils.PID_4, clientState4), Utils.mkEntry(AssignmentTestUtils.PID_6, clientState5), Utils.mkEntry(AssignmentTestUtils.PID_7, clientState6)}));
        clientState.assignActive(AssignmentTestUtils.TASK_0_0);
        clientState2.assignActive(AssignmentTestUtils.TASK_0_1);
        clientState3.assignActive(AssignmentTestUtils.TASK_1_0);
        clientState4.assignActive(AssignmentTestUtils.TASK_1_1);
        clientState5.assignActive(AssignmentTestUtils.TASK_0_2);
        clientState6.assignActive(AssignmentTestUtils.TASK_1_2);
        clientState.assignStandbyTasks(Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_1_1}));
        clientState2.assignStandbyTasks(Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_1_0}));
        clientState3.assignStandbyTasks(Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_2}));
        clientState4.assignStandbyTasks(Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_1_2}));
        clientState5.assignStandbyTasks(Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_1_0, AssignmentTestUtils.TASK_1_2}));
        clientState6.assignStandbyTasks(Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_1_1}));
        TreeSet treeSet = new TreeSet(Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_1_0, AssignmentTestUtils.TASK_1_1, AssignmentTestUtils.TASK_1_2}));
        Map<ProcessId, Integer> clientTaskCount = AssignmentTestUtils.clientTaskCount(treeMap, (v0) -> {
            return v0.standbyTaskCount();
        });
        Assert.assertTrue(rackAwareTaskAssignor.canEnableRackAwareAssignor());
        AssignmentTestUtils.verifyStandbySatisfyRackReplica(treeSet, rackAwareTaskAssignor.racksForProcess(), treeMap, 2, false, null);
        Assert.assertEquals(60L, rackAwareTaskAssignor.standByTasksCost(treeSet, treeMap, 10, 1));
        StandbyTaskAssignor create = StandbyTaskAssignorFactory.create(rackAwareEnabledConfigWithStandby, rackAwareTaskAssignor);
        Assertions.assertInstanceOf(ClientTagAwareStandbyTaskAssignor.class, create);
        create.getClass();
        Assert.assertEquals(50L, rackAwareTaskAssignor.optimizeStandbyTasks(treeMap, 10, 1, create::isAllowedTaskMovement));
        AssignmentTestUtils.verifyStandbySatisfyRackReplica(treeSet, rackAwareTaskAssignor.racksForProcess(), treeMap, 2, false, clientTaskCount);
    }

    @Test
    public void shouldOptimizeRandomStandby() {
        SortedMap<TaskId, Set<TopicPartition>> taskTopicPartitionMap = AssignmentTestUtils.getTaskTopicPartitionMap(60, 3, false);
        AssignmentConfigs rackAwareEnabledConfigWithStandby = getRackAwareEnabledConfigWithStandby(3);
        RackAwareTaskAssignor rackAwareTaskAssignor = new RackAwareTaskAssignor(AssignmentTestUtils.getRandomCluster(50, 60, 3), taskTopicPartitionMap, AssignmentTestUtils.getTaskTopicPartitionMap(60, 3, true), AssignmentTestUtils.getTopologyGroupTaskMap(), AssignmentTestUtils.getRandomProcessRacks(50, 50), AssignmentTestUtils.mockInternalTopicManagerForRandomChangelog(50, 60, 3), rackAwareEnabledConfigWithStandby, this.time);
        SortedSet sortedSet = (SortedSet) taskTopicPartitionMap.keySet();
        SortedMap<ProcessId, ClientState> randomClientState = AssignmentTestUtils.getRandomClientState(50, 60, 3, 3, sortedSet);
        StandbyTaskAssignor create = StandbyTaskAssignorFactory.create(rackAwareEnabledConfigWithStandby, rackAwareTaskAssignor);
        Assertions.assertInstanceOf(ClientTagAwareStandbyTaskAssignor.class, create);
        create.assign(randomClientState, sortedSet, sortedSet, rackAwareEnabledConfigWithStandby);
        Map<ProcessId, Integer> clientTaskCount = AssignmentTestUtils.clientTaskCount(randomClientState, (v0) -> {
            return v0.standbyTaskCount();
        });
        Assert.assertTrue(rackAwareTaskAssignor.canEnableRackAwareAssignor());
        AssignmentTestUtils.verifyStandbySatisfyRackReplica(sortedSet, rackAwareTaskAssignor.racksForProcess(), randomClientState, 3, false, null);
        long standByTasksCost = rackAwareTaskAssignor.standByTasksCost(sortedSet, randomClientState, 10, 1);
        MatcherAssert.assertThat(Long.valueOf(standByTasksCost), Matchers.greaterThanOrEqualTo(0L));
        create.getClass();
        MatcherAssert.assertThat(Long.valueOf(rackAwareTaskAssignor.optimizeStandbyTasks(randomClientState, 10, 1, create::isAllowedTaskMovement)), Matchers.lessThanOrEqualTo(Long.valueOf(standByTasksCost)));
        AssignmentTestUtils.verifyStandbySatisfyRackReplica(sortedSet, rackAwareTaskAssignor.racksForProcess(), randomClientState, 3, false, clientTaskCount);
    }

    private Cluster getClusterForTopic0() {
        return new Cluster("cluster", Utils.mkSet(new Node[]{AssignmentTestUtils.NODE_0, AssignmentTestUtils.NODE_1, AssignmentTestUtils.NODE_2}), Utils.mkSet(new PartitionInfo[]{AssignmentTestUtils.PI_0_0, AssignmentTestUtils.PI_0_1}), Collections.emptySet(), Collections.emptySet());
    }

    private Cluster getClusterWithPartitionMissingRack() {
        Node[] nodeArr = {AssignmentTestUtils.NODE_0, AssignmentTestUtils.NO_RACK_NODE};
        return new Cluster("cluster", Utils.mkSet(new Node[]{AssignmentTestUtils.NODE_0, AssignmentTestUtils.NODE_1, AssignmentTestUtils.NODE_2}), Utils.mkSet(new PartitionInfo[]{new PartitionInfo(AssignmentTestUtils.TP_0_NAME, 0, AssignmentTestUtils.NODE_0, nodeArr, nodeArr), AssignmentTestUtils.PI_0_1}), Collections.emptySet(), Collections.emptySet());
    }

    private Cluster getClusterWithNoNode() {
        return new Cluster("cluster", Utils.mkSet(new Node[]{AssignmentTestUtils.NODE_0, AssignmentTestUtils.NODE_1, AssignmentTestUtils.NODE_2, Node.noNode()}), Collections.singleton(new PartitionInfo(AssignmentTestUtils.TP_0_NAME, 0, (Node) null, new Node[0], new Node[0])), Collections.emptySet(), Collections.emptySet());
    }

    private Map<ProcessId, Map<String, Optional<String>>> getProcessRacksForProcess0() {
        return getProcessRacksForProcess0(false);
    }

    private Map<ProcessId, Map<String, Optional<String>>> getProcessRacksForProcess0(boolean z) {
        HashMap hashMap = new HashMap();
        hashMap.put(AssignmentTestUtils.PID_1, Collections.singletonMap("consumer1", z ? Optional.empty() : Optional.of(AssignmentTestUtils.RACK_1)));
        return hashMap;
    }

    private Map<ProcessId, Map<String, Optional<String>>> getProcessWithNoConsumerRacks() {
        return Utils.mkMap(new Map.Entry[]{Utils.mkEntry(AssignmentTestUtils.PID_1, Utils.mkMap(new Map.Entry[0]))});
    }

    private Map<TaskId, Set<TopicPartition>> getTaskTopicPartitionMapForTask0() {
        return getTaskTopicPartitionMapForTask0(false);
    }

    private Map<TaskId, Set<TopicPartition>> getTaskChangeLogTopicPartitionMapForTask0() {
        return Utils.mkMap(new Map.Entry[]{Utils.mkEntry(AssignmentTestUtils.TASK_0_0, Utils.mkSet(new TopicPartition[]{AssignmentTestUtils.CHANGELOG_TP_0_0}))});
    }

    private Map<TaskId, Set<TopicPartition>> getTaskTopicPartitionMapForTask0(boolean z) {
        HashSet hashSet = new HashSet();
        hashSet.add(AssignmentTestUtils.TP_0_0);
        if (z) {
            hashSet.add(AssignmentTestUtils.TP_1_0);
        }
        return Collections.singletonMap(AssignmentTestUtils.TASK_0_0, hashSet);
    }
}
