/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.server;

import com.yammer.metrics.core.Gauge;
import com.yammer.metrics.core.Metric;
import com.yammer.metrics.core.MetricName;
import com.yammer.metrics.core.MetricsRegistry;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.function.Function;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.ApiVersionsResponseData;
import org.apache.kafka.common.message.AssignReplicasToDirsRequestData;
import org.apache.kafka.common.message.AssignReplicasToDirsResponseData;
import org.apache.kafka.common.metadata.FeatureLevelRecord;
import org.apache.kafka.common.metadata.PartitionRecord;
import org.apache.kafka.common.metadata.RegisterBrokerRecord;
import org.apache.kafka.common.metadata.TopicRecord;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.requests.AssignReplicasToDirsRequest;
import org.apache.kafka.common.requests.AssignReplicasToDirsResponse;
import org.apache.kafka.common.utils.ExponentialBackoff;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.MetadataProvenance;
import org.apache.kafka.server.Assignment;
import org.apache.kafka.server.AssignmentsManager;
import org.apache.kafka.server.ControllerRequestCompletionHandler;
import org.apache.kafka.server.NodeToControllerChannelManager;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.common.TopicIdPartition;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AssignmentsManagerTest {
    private static final Logger LOG = LoggerFactory.getLogger(AssignmentsManagerTest.class);
    private static final Uuid TOPIC_1 = Uuid.fromString((String)"88rnFIqYSZykX4ZSKv81bg");
    private static final Uuid TOPIC_2 = Uuid.fromString((String)"VKCnzHdhR5uDQc1shqBYrQ");
    private static final Uuid TOPIC_3 = Uuid.fromString((String)"ZeAwvYt-Ro2suQudGUdbRg");
    private static final Uuid DIR_1 = Uuid.fromString((String)"cbgD8WdLQCyzLrFIMBhv3w");
    private static final Uuid DIR_2 = Uuid.fromString((String)"zO0bDc0vSuam7Db9iH7rYQ");
    private static final Uuid DIR_3 = Uuid.fromString((String)"CGBWbrFkRkeJQy6Aryzq2Q");
    private static final MetadataImage TEST_IMAGE;

    static Optional<ClientResponse> mockClientResponse(AssignReplicasToDirsResponseData data) {
        return Optional.of(new ClientResponse(null, null, "", 0L, 0L, false, null, null, (AbstractResponse)new AssignReplicasToDirsResponse(data)));
    }

    @Test
    public void testStartAndShutdown() throws Exception {
        TestEnv testEnv = new TestEnv();
        Throwable throwable = null;
        if (testEnv != null) {
            if (throwable != null) {
                try {
                    testEnv.close();
                }
                catch (Throwable throwable2) {
                    throwable.addSuppressed(throwable2);
                }
            } else {
                testEnv.close();
            }
        }
    }

    @Test
    public void testSuccessfulAssignment() throws Exception {
        try (TestEnv testEnv = new TestEnv();){
            Assertions.assertEquals((int)0, (int)testEnv.queuedReplicaToDirAssignments());
            testEnv.onAssignment(new TopicIdPartition(TOPIC_1, 0), DIR_1);
            TestUtils.retryOnExceptionWithTimeout((long)60000L, () -> {
                Assertions.assertEquals((int)1, (int)testEnv.assignmentsManager.numPending());
                Assertions.assertEquals((int)1, (int)testEnv.queuedReplicaToDirAssignments());
            });
            Assertions.assertEquals((int)0, (int)testEnv.assignmentsManager.previousGlobalFailures());
            Assertions.assertEquals((int)1, (int)testEnv.assignmentsManager.numInFlight());
            testEnv.successfullyCompleteCallbackOfRequestAssigningTopic1ToDir1();
            TestUtils.retryOnExceptionWithTimeout((long)60000L, () -> {
                Assertions.assertEquals((int)0, (int)testEnv.assignmentsManager.numPending());
                Assertions.assertEquals((int)0, (int)testEnv.queuedReplicaToDirAssignments());
                Assertions.assertEquals((int)1, (int)testEnv.success(new TopicIdPartition(TOPIC_1, 0)));
            });
            Assertions.assertEquals((int)0, (int)testEnv.assignmentsManager.previousGlobalFailures());
        }
    }

    @ParameterizedTest
    @ValueSource(strings={"invalidRequest", "timeout"})
    public void testUnSuccessfulRequestCausesRetransmission(String failureType) throws Exception {
        try (TestEnv testEnv = new TestEnv();){
            testEnv.onAssignment(new TopicIdPartition(TOPIC_1, 0), DIR_1);
            TestUtils.retryOnExceptionWithTimeout((long)60000L, () -> Assertions.assertEquals((int)1, (int)testEnv.assignmentsManager.numPending()));
            if (failureType.equals("invalidRequest")) {
                testEnv.channelManager.completeCallback(req -> AssignmentsManagerTest.mockClientResponse(new AssignReplicasToDirsResponseData().setErrorCode(Errors.INVALID_REQUEST.code())));
            } else if (failureType.equals("timeout")) {
                testEnv.channelManager.completeCallback(req -> Optional.empty());
            }
            TestUtils.retryOnExceptionWithTimeout((long)60000L, () -> {
                Assertions.assertEquals((int)1, (int)testEnv.assignmentsManager.numPending());
                Assertions.assertEquals((int)0, (int)testEnv.success(new TopicIdPartition(TOPIC_1, 0)));
            });
            Assertions.assertEquals((int)1, (int)testEnv.assignmentsManager.previousGlobalFailures());
            testEnv.successfullyCompleteCallbackOfRequestAssigningTopic1ToDir1();
            TestUtils.retryOnExceptionWithTimeout((long)60000L, () -> {
                Assertions.assertEquals((int)0, (int)testEnv.assignmentsManager.numPending());
                Assertions.assertEquals((int)1, (int)testEnv.success(new TopicIdPartition(TOPIC_1, 0)));
            });
            Assertions.assertEquals((int)0, (int)testEnv.assignmentsManager.previousGlobalFailures());
        }
    }

    @ParameterizedTest
    @ValueSource(strings={"missingTopic", "missingPartition", "notReplica"})
    public void testMismatchedInputDoesNotTriggerCompletion(String mismatchType) throws Exception {
        try (TestEnv testEnv = new TestEnv();){
            TopicIdPartition target;
            if (mismatchType.equals("missingTopic")) {
                target = new TopicIdPartition(TOPIC_3, 0);
            } else if (mismatchType.equals("missingPartition")) {
                target = new TopicIdPartition(TOPIC_1, 2);
            } else if (mismatchType.equals("notReplica")) {
                target = new TopicIdPartition(TOPIC_2, 0);
            } else {
                throw new RuntimeException("invalid mismatchType argument.");
            }
            testEnv.onAssignment(target, DIR_1);
            TestUtils.retryOnExceptionWithTimeout((long)60000L, () -> {
                Assertions.assertEquals((int)0, (int)testEnv.assignmentsManager.numPending());
                Assertions.assertEquals((int)0, (int)testEnv.success(target));
            });
        }
    }

    @ParameterizedTest
    @ValueSource(strings={"missingResult", "errorResult"})
    public void testOneAssignmentFailsOneSucceeds(String failureType) throws Exception {
        try (TestEnv testEnv = new TestEnv();){
            testEnv.onAssignment(new TopicIdPartition(TOPIC_1, 0), DIR_1);
            testEnv.onAssignment(new TopicIdPartition(TOPIC_1, 1), DIR_1);
            TestUtils.retryOnExceptionWithTimeout((long)60000L, () -> {
                Assertions.assertEquals((int)2, (int)testEnv.assignmentsManager.numPending());
                Assertions.assertEquals((int)0, (int)testEnv.success(new TopicIdPartition(TOPIC_1, 0)));
                Assertions.assertEquals((int)0, (int)testEnv.success(new TopicIdPartition(TOPIC_1, 1)));
            });
            testEnv.channelManager.completeCallback(req -> {
                AssignReplicasToDirsRequestData.DirectoryData directoryData = (AssignReplicasToDirsRequestData.DirectoryData)req.directories().get(0);
                Assertions.assertEquals((Object)DIR_1, (Object)directoryData.id());
                AssignReplicasToDirsRequestData.TopicData topicData = (AssignReplicasToDirsRequestData.TopicData)directoryData.topics().get(0);
                Assertions.assertEquals((Object)TOPIC_1, (Object)topicData.topicId());
                HashSet foundPartitions = new HashSet();
                topicData.partitions().forEach(p -> foundPartitions.add(p.partitionIndex()));
                ArrayList<AssignReplicasToDirsResponseData.PartitionData> partitions = new ArrayList<AssignReplicasToDirsResponseData.PartitionData>();
                if (foundPartitions.contains(0)) {
                    partitions.add(new AssignReplicasToDirsResponseData.PartitionData().setPartitionIndex(0).setErrorCode((short)0));
                }
                if (foundPartitions.contains(1) && !failureType.equals("missingResult")) {
                    if (failureType.equals("errorResult")) {
                        partitions.add(new AssignReplicasToDirsResponseData.PartitionData().setPartitionIndex(1).setErrorCode(Errors.NOT_LEADER_OR_FOLLOWER.code()));
                    } else {
                        throw new RuntimeException("invalid failureType argument.");
                    }
                }
                return AssignmentsManagerTest.mockClientResponse(new AssignReplicasToDirsResponseData().setDirectories(Arrays.asList(new AssignReplicasToDirsResponseData.DirectoryData().setId(DIR_1).setTopics(Arrays.asList(new AssignReplicasToDirsResponseData.TopicData().setTopicId(TOPIC_1).setPartitions(partitions))))));
            });
            TestUtils.retryOnExceptionWithTimeout((long)60000L, () -> {
                Assertions.assertEquals((int)1, (int)testEnv.assignmentsManager.numPending());
                Assertions.assertEquals((int)1, (int)testEnv.assignmentsManager.numInFlight());
                Assertions.assertEquals((int)1, (int)testEnv.success(new TopicIdPartition(TOPIC_1, 0)));
                Assertions.assertEquals((int)0, (int)testEnv.success(new TopicIdPartition(TOPIC_1, 1)));
                Assertions.assertEquals((int)0, (int)testEnv.assignmentsManager.previousGlobalFailures());
            });
        }
    }

    @Test
    public void testGlobalResponseErrorTimeout() {
        Assertions.assertEquals(Optional.of("Timeout"), (Object)AssignmentsManager.globalResponseError(Optional.empty()));
    }

    @Test
    public void testNoGlobalResponseError() {
        Assertions.assertEquals(Optional.empty(), (Object)AssignmentsManager.globalResponseError(Optional.of(new ClientResponse(null, null, "", 0L, 0L, false, null, null, (AbstractResponse)new AssignReplicasToDirsResponse(new AssignReplicasToDirsResponseData())))));
    }

    @Test
    public void testGlobalResponseErrorAuthenticationException() {
        Assertions.assertEquals(Optional.of("AuthenticationException"), (Object)AssignmentsManager.globalResponseError(Optional.of(new ClientResponse(null, null, "", 0L, 0L, false, null, new AuthenticationException("failed"), null))));
    }

    @Test
    public void testGlobalResponseErrorUnsupportedVersionException() {
        Assertions.assertEquals(Optional.of("UnsupportedVersionException"), (Object)AssignmentsManager.globalResponseError(Optional.of(new ClientResponse(null, null, "", 0L, 0L, false, new UnsupportedVersionException("failed"), null, null))));
    }

    @Test
    public void testGlobalResponseErrorDisconnectedTimedOut() {
        Assertions.assertEquals(Optional.of("Disonnected[Timeout]"), (Object)AssignmentsManager.globalResponseError(Optional.of(new ClientResponse(null, null, "", 0L, 0L, true, true, null, null, null))));
    }

    @Test
    public void testGlobalResponseErrorEmptyResponse() {
        Assertions.assertEquals(Optional.of("EmptyResponse"), (Object)AssignmentsManager.globalResponseError(Optional.of(new ClientResponse(null, null, "", 0L, 0L, false, false, null, null, null))));
    }

    @Test
    public void testGlobalResponseErrorClassCastException() {
        Assertions.assertEquals(Optional.of("ClassCastException"), (Object)AssignmentsManager.globalResponseError(Optional.of(new ClientResponse(null, null, "", 0L, 0L, false, false, null, null, (AbstractResponse)new ApiVersionsResponse(new ApiVersionsResponseData())))));
    }

    @Test
    public void testGlobalResponseErrorResponseLevelError() {
        Assertions.assertEquals(Optional.of("Response-level error: INVALID_REQUEST"), (Object)AssignmentsManager.globalResponseError(Optional.of(new ClientResponse(null, null, "", 0L, 0L, false, false, null, null, (AbstractResponse)new AssignReplicasToDirsResponse(new AssignReplicasToDirsResponseData().setErrorCode(Errors.INVALID_REQUEST.code()))))));
    }

    @Test
    void testBuildRequestData() {
        LinkedHashMap<TopicIdPartition, Uuid> assignments = new LinkedHashMap<TopicIdPartition, Uuid>();
        assignments.put(new TopicIdPartition(TOPIC_1, 1), DIR_1);
        assignments.put(new TopicIdPartition(TOPIC_1, 2), DIR_2);
        assignments.put(new TopicIdPartition(TOPIC_1, 3), DIR_3);
        assignments.put(new TopicIdPartition(TOPIC_1, 4), DIR_1);
        assignments.put(new TopicIdPartition(TOPIC_2, 5), DIR_2);
        LinkedHashMap targetAssignments = new LinkedHashMap();
        assignments.entrySet().forEach(e -> targetAssignments.put(e.getKey(), new Assignment((TopicIdPartition)e.getKey(), (Uuid)e.getValue(), 0L, () -> {})));
        AssignReplicasToDirsRequestData built = AssignmentsManager.buildRequestData((int)8, (long)100L, targetAssignments);
        AssignReplicasToDirsRequestData expected = new AssignReplicasToDirsRequestData().setBrokerId(8).setBrokerEpoch(100L).setDirectories(Arrays.asList(new AssignReplicasToDirsRequestData.DirectoryData().setId(DIR_2).setTopics(Arrays.asList(new AssignReplicasToDirsRequestData.TopicData().setTopicId(TOPIC_1).setPartitions(Collections.singletonList(new AssignReplicasToDirsRequestData.PartitionData().setPartitionIndex(2))), new AssignReplicasToDirsRequestData.TopicData().setTopicId(TOPIC_2).setPartitions(Collections.singletonList(new AssignReplicasToDirsRequestData.PartitionData().setPartitionIndex(5))))), new AssignReplicasToDirsRequestData.DirectoryData().setId(DIR_3).setTopics(Collections.singletonList(new AssignReplicasToDirsRequestData.TopicData().setTopicId(TOPIC_1).setPartitions(Collections.singletonList(new AssignReplicasToDirsRequestData.PartitionData().setPartitionIndex(3))))), new AssignReplicasToDirsRequestData.DirectoryData().setId(DIR_1).setTopics(Collections.singletonList(new AssignReplicasToDirsRequestData.TopicData().setTopicId(TOPIC_1).setPartitions(Arrays.asList(new AssignReplicasToDirsRequestData.PartitionData().setPartitionIndex(1), new AssignReplicasToDirsRequestData.PartitionData().setPartitionIndex(4)))))));
        Assertions.assertEquals((Object)expected, (Object)built);
    }

    static {
        MetadataDelta delta = new MetadataDelta.Builder().setImage(MetadataImage.EMPTY).build();
        delta.replay(new FeatureLevelRecord().setName("metadata.version").setFeatureLevel(MetadataVersion.IBP_3_8_IV0.featureLevel()));
        delta.replay(new RegisterBrokerRecord().setBrokerId(0).setIncarnationId(Uuid.fromString((String)"JJsH6zB0R7eKbr0Sy49ULw")).setBrokerEpoch(123L));
        delta.replay(new RegisterBrokerRecord().setBrokerId(1).setIncarnationId(Uuid.fromString((String)"DtnWclXyQ4qNDvL97JlnvQ")).setBrokerEpoch(456L));
        delta.replay(new RegisterBrokerRecord().setBrokerId(2).setIncarnationId(Uuid.fromString((String)"UFa_RKgLR4mxEXyquEPEmg")).setBrokerEpoch(789L));
        delta.replay(new RegisterBrokerRecord().setBrokerId(3).setIncarnationId(Uuid.fromString((String)"jj-cnHYASAmb_H9JR6nmtQ")).setBrokerEpoch(987L));
        delta.replay(new TopicRecord().setName("foo").setTopicId(TOPIC_1));
        delta.replay(new PartitionRecord().setPartitionId(0).setTopicId(TOPIC_1).setReplicas(Arrays.asList(0, 1, 2)).setIsr(Arrays.asList(0, 1, 2)).setLeader(1));
        delta.replay(new PartitionRecord().setPartitionId(1).setTopicId(TOPIC_1).setReplicas(Arrays.asList(1, 2, 3)).setIsr(Arrays.asList(1, 2, 3)).setLeader(1));
        delta.replay(new TopicRecord().setName("bar").setTopicId(TOPIC_2));
        delta.replay(new PartitionRecord().setPartitionId(0).setTopicId(TOPIC_2).setReplicas(Arrays.asList(0, 3, 2)).setIsr(Arrays.asList(0, 3, 2)).setLeader(1));
        delta.replay(new PartitionRecord().setPartitionId(1).setTopicId(TOPIC_2).setReplicas(Arrays.asList(1, 2, 3)).setIsr(Arrays.asList(2)).setLeader(2));
        delta.replay(new PartitionRecord().setPartitionId(2).setTopicId(TOPIC_2).setReplicas(Arrays.asList(3, 2, 1)).setIsr(Arrays.asList(3, 2, 1)).setLeader(3));
        TEST_IMAGE = delta.apply(MetadataProvenance.EMPTY);
    }

    static class TestEnv
    implements AutoCloseable {
        final ExponentialBackoff backoff;
        final MockNodeToControllerChannelManager channelManager;
        final MetricsRegistry metricsRegistry = new MetricsRegistry();
        final AssignmentsManager assignmentsManager;
        final Map<TopicIdPartition, Integer> successes;

        TestEnv() {
            this.backoff = new ExponentialBackoff(1L, 2, 4L, 0.0);
            this.channelManager = new MockNodeToControllerChannelManager();
            this.assignmentsManager = new AssignmentsManager(this.backoff, Time.SYSTEM, (NodeToControllerChannelManager)this.channelManager, 1, () -> TEST_IMAGE, t -> t.toString(), this.metricsRegistry);
            this.successes = new HashMap<TopicIdPartition, Integer>();
        }

        void onAssignment(TopicIdPartition topicIdPartition, Uuid directoryId) {
            this.assignmentsManager.onAssignment(topicIdPartition, directoryId, "test", () -> {
                Map<TopicIdPartition, Integer> map = this.successes;
                synchronized (map) {
                    this.successes.put(topicIdPartition, this.successes.getOrDefault(topicIdPartition, 0) + 1);
                }
            });
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        int success(TopicIdPartition topicIdPartition) {
            Map<TopicIdPartition, Integer> map = this.successes;
            synchronized (map) {
                return this.successes.getOrDefault(topicIdPartition, 0);
            }
        }

        void successfullyCompleteCallbackOfRequestAssigningTopic1ToDir1() throws Exception {
            this.channelManager.completeCallback(req -> {
                AssignReplicasToDirsRequestData.DirectoryData directoryData = (AssignReplicasToDirsRequestData.DirectoryData)req.directories().get(0);
                Assertions.assertEquals((Object)DIR_1, (Object)directoryData.id());
                AssignReplicasToDirsRequestData.TopicData topicData = (AssignReplicasToDirsRequestData.TopicData)directoryData.topics().get(0);
                Assertions.assertEquals((Object)TOPIC_1, (Object)topicData.topicId());
                Assertions.assertEquals((int)0, (int)((AssignReplicasToDirsRequestData.PartitionData)topicData.partitions().get(0)).partitionIndex());
                return AssignmentsManagerTest.mockClientResponse(new AssignReplicasToDirsResponseData().setDirectories(Arrays.asList(new AssignReplicasToDirsResponseData.DirectoryData().setId(DIR_1).setTopics(Arrays.asList(new AssignReplicasToDirsResponseData.TopicData().setTopicId(TOPIC_1).setPartitions(Arrays.asList(new AssignReplicasToDirsResponseData.PartitionData().setPartitionIndex(0).setErrorCode((short)0))))))));
            });
        }

        Metric findMetric(MetricName name) {
            for (Map.Entry entry : this.metricsRegistry.allMetrics().entrySet()) {
                if (!name.equals(entry.getKey())) continue;
                return (Metric)entry.getValue();
            }
            throw new IllegalArgumentException("metric named " + name + " not found");
        }

        int queuedReplicaToDirAssignments() {
            Gauge queuedReplicaToDirAssignments = (Gauge)this.findMetric(AssignmentsManager.QUEUED_REPLICA_TO_DIR_ASSIGNMENTS_METRIC);
            return (Integer)queuedReplicaToDirAssignments.value();
        }

        @Override
        public void close() throws Exception {
            try {
                this.assignmentsManager.close();
            }
            catch (Exception e) {
                LOG.error("error shutting down assignmentsManager", (Throwable)e);
            }
            try {
                this.metricsRegistry.shutdown();
            }
            catch (Exception e) {
                LOG.error("error shutting down metricsRegistry", (Throwable)e);
            }
        }
    }

    static class MockNodeToControllerChannelManager
    implements NodeToControllerChannelManager {
        LinkedBlockingDeque<Map.Entry<AssignReplicasToDirsRequestData, ControllerRequestCompletionHandler>> callbacks = new LinkedBlockingDeque();

        MockNodeToControllerChannelManager() {
        }

        public void start() {
        }

        public void shutdown() {
        }

        public Optional<NodeApiVersions> controllerApiVersions() {
            return Optional.empty();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void sendRequest(AbstractRequest.Builder<? extends AbstractRequest> request, ControllerRequestCompletionHandler callback) {
            AssignReplicasToDirsRequest inputRequest = (AssignReplicasToDirsRequest)request.build();
            MockNodeToControllerChannelManager mockNodeToControllerChannelManager = this;
            synchronized (mockNodeToControllerChannelManager) {
                this.callbacks.add(new AbstractMap.SimpleEntry<AssignReplicasToDirsRequestData, ControllerRequestCompletionHandler>(inputRequest.data(), callback));
            }
        }

        public long getTimeoutMs() {
            return 0L;
        }

        void completeCallback(Function<AssignReplicasToDirsRequestData, Optional<ClientResponse>> completionist) throws InterruptedException {
            Map.Entry<AssignReplicasToDirsRequestData, ControllerRequestCompletionHandler> entry = this.callbacks.take();
            Optional<ClientResponse> clientResponse = completionist.apply(entry.getKey());
            if (clientResponse.isPresent()) {
                entry.getValue().onComplete(clientResponse.get());
            } else {
                entry.getValue().onTimeout();
            }
        }
    }
}

