/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.server;

import java.net.UnknownHostException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData;
import org.apache.kafka.common.message.PushTelemetryRequestData;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest;
import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse;
import org.apache.kafka.common.requests.PushTelemetryRequest;
import org.apache.kafka.common.requests.PushTelemetryResponse;
import org.apache.kafka.common.requests.RequestContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.ClientMetricsManager;
import org.apache.kafka.server.metrics.ClientMetricsInstance;
import org.apache.kafka.server.metrics.ClientMetricsReceiverPlugin;
import org.apache.kafka.server.metrics.ClientMetricsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ClientMetricsManagerTest {
    private static final Logger LOG = LoggerFactory.getLogger(ClientMetricsManagerTest.class);
    private MockTime time;
    private Metrics kafkaMetrics;
    private ClientMetricsReceiverPlugin clientMetricsReceiverPlugin;
    private ClientMetricsManager clientMetricsManager;

    @AfterAll
    public static void ensureNoThreadLeak() throws InterruptedException {
        TestUtils.waitForCondition(() -> Thread.getAllStackTraces().keySet().stream().map(Thread::getName).noneMatch(t -> t.contains("client-metrics-reaper") || t.contains("executor-")), (String)"Thread leak detected");
    }

    @BeforeEach
    public void setUp() {
        this.time = new MockTime();
        this.kafkaMetrics = new Metrics();
        this.clientMetricsReceiverPlugin = new ClientMetricsReceiverPlugin();
        this.clientMetricsManager = new ClientMetricsManager(this.clientMetricsReceiverPlugin, 100, (Time)this.time, 100, this.kafkaMetrics);
    }

    @AfterEach
    public void tearDown() throws Exception {
        this.clientMetricsManager.close();
        this.kafkaMetrics.close();
    }

    @Test
    public void testUpdateSubscription() throws Exception {
        Assertions.assertTrue((boolean)this.clientMetricsManager.subscriptions().isEmpty());
        Assertions.assertEquals((int)0, (int)this.clientMetricsManager.subscriptionUpdateVersion());
        this.clientMetricsManager.updateSubscription("sub-1", ClientMetricsTestUtils.defaultProperties());
        Assertions.assertEquals((int)1, (int)this.clientMetricsManager.subscriptions().size());
        Assertions.assertNotNull((Object)this.clientMetricsManager.subscriptionInfo("sub-1"));
        ClientMetricsManager.SubscriptionInfo subscriptionInfo = this.clientMetricsManager.subscriptionInfo("sub-1");
        Set metrics = subscriptionInfo.metrics();
        Assertions.assertEquals((int)"org.apache.kafka.client.producer.partition.queue.,org.apache.kafka.client.producer.partition.latency".split(",").length, (int)metrics.size());
        Arrays.stream("org.apache.kafka.client.producer.partition.queue.,org.apache.kafka.client.producer.partition.latency".split(",")).forEach(metric -> Assertions.assertTrue((boolean)metrics.contains(metric)));
        Assertions.assertEquals((Object)ClientMetricsTestUtils.defaultProperties().getProperty("interval.ms"), (Object)String.valueOf(subscriptionInfo.intervalMs()));
        Assertions.assertEquals((int)ClientMetricsTestUtils.DEFAULT_CLIENT_MATCH_PATTERNS.size(), (int)subscriptionInfo.matchPattern().size());
        ClientMetricsTestUtils.DEFAULT_CLIENT_MATCH_PATTERNS.forEach(pattern -> {
            String[] split = pattern.split("=");
            Assertions.assertTrue((boolean)subscriptionInfo.matchPattern().containsKey(split[0]));
            Assertions.assertEquals((Object)split[1], (Object)((Pattern)subscriptionInfo.matchPattern().get(split[0])).pattern());
        });
        Assertions.assertEquals((int)1, (int)this.clientMetricsManager.subscriptionUpdateVersion());
        Assertions.assertEquals((int)4, (int)this.kafkaMetrics.metrics().size());
        Assertions.assertEquals((Object)0.0, (Object)this.getMetric("instance-count").metricValue());
    }

    @Test
    public void testUpdateSubscriptionWithEmptyProperties() {
        Assertions.assertTrue((boolean)this.clientMetricsManager.subscriptions().isEmpty());
        Assertions.assertEquals((int)0, (int)this.clientMetricsManager.subscriptionUpdateVersion());
        this.clientMetricsManager.updateSubscription("sub-1", new Properties());
        Assertions.assertEquals((int)0, (int)this.clientMetricsManager.subscriptions().size());
        Assertions.assertEquals((int)0, (int)this.clientMetricsManager.subscriptionUpdateVersion());
    }

    @Test
    public void testUpdateSubscriptionWithNullProperties() {
        Assertions.assertTrue((boolean)this.clientMetricsManager.subscriptions().isEmpty());
        Assertions.assertEquals((int)0, (int)this.clientMetricsManager.subscriptionUpdateVersion());
        Assertions.assertThrows(NullPointerException.class, () -> this.clientMetricsManager.updateSubscription("sub-1", null));
        Assertions.assertEquals((int)0, (int)this.clientMetricsManager.subscriptions().size());
        Assertions.assertEquals((int)0, (int)this.clientMetricsManager.subscriptionUpdateVersion());
    }

    @Test
    public void testUpdateSubscriptionWithInvalidMetricsProperties() {
        Assertions.assertTrue((boolean)this.clientMetricsManager.subscriptions().isEmpty());
        Properties properties = new Properties();
        properties.put("random", "random");
        Assertions.assertThrows(InvalidRequestException.class, () -> this.clientMetricsManager.updateSubscription("sub-1", properties));
    }

    @Test
    public void testUpdateSubscriptionWithPropertiesDeletion() {
        Assertions.assertTrue((boolean)this.clientMetricsManager.subscriptions().isEmpty());
        Assertions.assertEquals((int)0, (int)this.clientMetricsManager.subscriptionUpdateVersion());
        Properties properties = new Properties();
        properties.put("interval.ms", "100");
        this.clientMetricsManager.updateSubscription("sub-1", properties);
        Assertions.assertEquals((int)1, (int)this.clientMetricsManager.subscriptions().size());
        Assertions.assertNotNull((Object)this.clientMetricsManager.subscriptionInfo("sub-1"));
        Assertions.assertEquals((int)1, (int)this.clientMetricsManager.subscriptionUpdateVersion());
        this.clientMetricsManager.updateSubscription("sub-1", new Properties());
        Assertions.assertEquals((int)0, (int)this.clientMetricsManager.subscriptions().size());
        Assertions.assertEquals((int)2, (int)this.clientMetricsManager.subscriptionUpdateVersion());
    }

    @Test
    public void testGetTelemetry() throws Exception {
        this.clientMetricsManager.updateSubscription("sub-1", ClientMetricsTestUtils.defaultProperties());
        Assertions.assertEquals((int)1, (int)this.clientMetricsManager.subscriptions().size());
        GetTelemetrySubscriptionsRequest request = (GetTelemetrySubscriptionsRequest)new GetTelemetrySubscriptionsRequest.Builder(new GetTelemetrySubscriptionsRequestData(), true).build();
        GetTelemetrySubscriptionsResponse response = this.clientMetricsManager.processGetTelemetrySubscriptionRequest(request, ClientMetricsTestUtils.requestContext());
        Assertions.assertNotNull((Object)response.data().clientInstanceId());
        Assertions.assertTrue((response.data().subscriptionId() != 0 ? 1 : 0) != 0);
        Assertions.assertEquals((int)"org.apache.kafka.client.producer.partition.queue.,org.apache.kafka.client.producer.partition.latency".split(",").length, (int)response.data().requestedMetrics().size());
        Arrays.stream("org.apache.kafka.client.producer.partition.queue.,org.apache.kafka.client.producer.partition.latency".split(",")).forEach(metric -> Assertions.assertTrue((boolean)response.data().requestedMetrics().contains(metric)));
        Assertions.assertEquals((int)4, (int)response.data().acceptedCompressionTypes().size());
        Assertions.assertEquals((byte)CompressionType.ZSTD.id, (Byte)((Byte)response.data().acceptedCompressionTypes().get(0)));
        Assertions.assertEquals((byte)CompressionType.LZ4.id, (Byte)((Byte)response.data().acceptedCompressionTypes().get(1)));
        Assertions.assertEquals((byte)CompressionType.GZIP.id, (Byte)((Byte)response.data().acceptedCompressionTypes().get(2)));
        Assertions.assertEquals((byte)CompressionType.SNAPPY.id, (Byte)((Byte)response.data().acceptedCompressionTypes().get(3)));
        Assertions.assertEquals((int)30000, (int)response.data().pushIntervalMs());
        Assertions.assertTrue((boolean)response.data().deltaTemporality());
        Assertions.assertEquals((int)100, (int)response.data().telemetryMaxBytes());
        Assertions.assertEquals((Object)Errors.NONE, (Object)response.error());
        ClientMetricsInstance instance = this.clientMetricsManager.clientInstance(response.data().clientInstanceId());
        Assertions.assertNotNull((Object)instance);
        Assertions.assertEquals((Object)Errors.NONE, (Object)instance.lastKnownError());
        Assertions.assertEquals((int)12, (int)this.kafkaMetrics.metrics().size());
        Assertions.assertEquals((Object)1.0, (Object)this.getMetric("instance-count").metricValue());
        Assertions.assertEquals((Object)0.0, (Object)this.getMetric("unknown-subscription-request-count").metricValue());
        Assertions.assertEquals((Object)0.0, (Object)this.getMetric("unknown-subscription-request-rate").metricValue());
        Assertions.assertEquals((Object)0.0, (Object)this.getMetric("throttle-count").metricValue());
        Assertions.assertEquals((Object)0.0, (Object)this.getMetric("throttle-rate").metricValue());
        Assertions.assertEquals((Object)0.0, (Object)this.getMetric("plugin-export-count").metricValue());
        Assertions.assertEquals((Object)0.0, (Object)this.getMetric("plugin-export-rate").metricValue());
        Assertions.assertEquals((Object)0.0, (Object)this.getMetric("plugin-error-count").metricValue());
        Assertions.assertEquals((Object)0.0, (Object)this.getMetric("plugin-error-rate").metricValue());
        Assertions.assertEquals((Object)Double.NaN, (Object)this.getMetric("plugin-export-time-avg").metricValue());
        Assertions.assertEquals((Object)Double.NaN, (Object)this.getMetric("plugin-export-time-max").metricValue());
    }

    @Test
    public void testGetTelemetryWithoutSubscription() throws UnknownHostException {
        Assertions.assertTrue((boolean)this.clientMetricsManager.subscriptions().isEmpty());
        GetTelemetrySubscriptionsRequest request = (GetTelemetrySubscriptionsRequest)new GetTelemetrySubscriptionsRequest.Builder(new GetTelemetrySubscriptionsRequestData(), true).build();
        GetTelemetrySubscriptionsResponse response = this.clientMetricsManager.processGetTelemetrySubscriptionRequest(request, ClientMetricsTestUtils.requestContext());
        Assertions.assertNotNull((Object)response.data().clientInstanceId());
        Assertions.assertTrue((response.data().subscriptionId() != 0 ? 1 : 0) != 0);
        Assertions.assertTrue((boolean)response.data().requestedMetrics().isEmpty());
        Assertions.assertEquals((int)4, (int)response.data().acceptedCompressionTypes().size());
        Assertions.assertEquals((int)300000, (int)response.data().pushIntervalMs());
        Assertions.assertTrue((boolean)response.data().deltaTemporality());
        Assertions.assertEquals((int)100, (int)response.data().telemetryMaxBytes());
        Assertions.assertEquals((Object)Errors.NONE, (Object)response.error());
        ClientMetricsInstance instance = this.clientMetricsManager.clientInstance(response.data().clientInstanceId());
        Assertions.assertNotNull((Object)instance);
        Assertions.assertEquals((Object)Errors.NONE, (Object)instance.lastKnownError());
    }

    @Test
    public void testGetTelemetryAfterPushIntervalTime() throws UnknownHostException {
        GetTelemetrySubscriptionsRequest request = (GetTelemetrySubscriptionsRequest)new GetTelemetrySubscriptionsRequest.Builder(new GetTelemetrySubscriptionsRequestData(), true).build();
        GetTelemetrySubscriptionsResponse response = this.clientMetricsManager.processGetTelemetrySubscriptionRequest(request, ClientMetricsTestUtils.requestContext());
        Assertions.assertNotNull((Object)response.data().clientInstanceId());
        Assertions.assertEquals((Object)Errors.NONE, (Object)response.error());
        this.time.sleep(300000L);
        request = (GetTelemetrySubscriptionsRequest)new GetTelemetrySubscriptionsRequest.Builder(new GetTelemetrySubscriptionsRequestData().setClientInstanceId(response.data().clientInstanceId()), true).build();
        response = this.clientMetricsManager.processGetTelemetrySubscriptionRequest(request, ClientMetricsTestUtils.requestContext());
        Assertions.assertNotNull((Object)response.data().clientInstanceId());
        Assertions.assertEquals((Object)Errors.NONE, (Object)response.error());
    }

    @Test
    public void testGetTelemetryAllMetricSubscribedSubscription() throws UnknownHostException {
        this.clientMetricsManager.updateSubscription("sub-1", ClientMetricsTestUtils.defaultProperties());
        Properties properties = new Properties();
        properties.put("metrics", "*");
        this.clientMetricsManager.updateSubscription("sub-2", properties);
        Assertions.assertEquals((int)2, (int)this.clientMetricsManager.subscriptions().size());
        GetTelemetrySubscriptionsRequest request = (GetTelemetrySubscriptionsRequest)new GetTelemetrySubscriptionsRequest.Builder(new GetTelemetrySubscriptionsRequestData(), true).build();
        GetTelemetrySubscriptionsResponse response = this.clientMetricsManager.processGetTelemetrySubscriptionRequest(request, ClientMetricsTestUtils.requestContext());
        Assertions.assertNotNull((Object)response.data().clientInstanceId());
        Assertions.assertTrue((response.data().subscriptionId() != 0 ? 1 : 0) != 0);
        Assertions.assertEquals((int)1, (int)response.data().requestedMetrics().size());
        Assertions.assertTrue((boolean)response.data().requestedMetrics().contains("*"));
        Assertions.assertEquals((int)4, (int)response.data().acceptedCompressionTypes().size());
        Assertions.assertEquals((int)30000, (int)response.data().pushIntervalMs());
        Assertions.assertTrue((boolean)response.data().deltaTemporality());
        Assertions.assertEquals((int)100, (int)response.data().telemetryMaxBytes());
        Assertions.assertEquals((Object)Errors.NONE, (Object)response.error());
        ClientMetricsInstance instance = this.clientMetricsManager.clientInstance(response.data().clientInstanceId());
        Assertions.assertNotNull((Object)instance);
        Assertions.assertEquals((Object)Errors.NONE, (Object)instance.lastKnownError());
    }

    @Test
    public void testGetTelemetrySameClientImmediateRetryFail() throws Exception {
        GetTelemetrySubscriptionsRequest request = (GetTelemetrySubscriptionsRequest)new GetTelemetrySubscriptionsRequest.Builder(new GetTelemetrySubscriptionsRequestData(), true).build();
        GetTelemetrySubscriptionsResponse response = this.clientMetricsManager.processGetTelemetrySubscriptionRequest(request, ClientMetricsTestUtils.requestContext());
        Uuid clientInstanceId = response.data().clientInstanceId();
        Assertions.assertNotNull((Object)clientInstanceId);
        Assertions.assertEquals((Object)Errors.NONE, (Object)response.error());
        request = (GetTelemetrySubscriptionsRequest)new GetTelemetrySubscriptionsRequest.Builder(new GetTelemetrySubscriptionsRequestData().setClientInstanceId(clientInstanceId), true).build();
        response = this.clientMetricsManager.processGetTelemetrySubscriptionRequest(request, ClientMetricsTestUtils.requestContext());
        Assertions.assertEquals((Object)Errors.THROTTLING_QUOTA_EXCEEDED, (Object)response.error());
        Assertions.assertEquals((Object)1.0, (Object)this.getMetric("instance-count").metricValue());
        Assertions.assertEquals((Object)1.0, (Object)this.getMetric("throttle-count").metricValue());
        Assertions.assertTrue(((Double)this.getMetric("throttle-rate").metricValue() > 0.0 ? 1 : 0) != 0);
    }

    @Test
    public void testGetTelemetrySameClientImmediateRetryAfterPushFail() throws Exception {
        GetTelemetrySubscriptionsRequest request = (GetTelemetrySubscriptionsRequest)new GetTelemetrySubscriptionsRequest.Builder(new GetTelemetrySubscriptionsRequestData(), true).build();
        GetTelemetrySubscriptionsResponse response = this.clientMetricsManager.processGetTelemetrySubscriptionRequest(request, ClientMetricsTestUtils.requestContext());
        Uuid clientInstanceId = response.data().clientInstanceId();
        Assertions.assertNotNull((Object)clientInstanceId);
        Assertions.assertEquals((Object)Errors.NONE, (Object)response.error());
        try (Metrics kafkaMetrics = new Metrics();
             ClientMetricsManager newClientMetricsManager = new ClientMetricsManager(this.clientMetricsReceiverPlugin, 100, (Time)this.time, kafkaMetrics);){
            PushTelemetryRequest pushRequest = (PushTelemetryRequest)new PushTelemetryRequest.Builder(new PushTelemetryRequestData().setClientInstanceId(response.data().clientInstanceId()).setSubscriptionId(response.data().subscriptionId()).setCompressionType(CompressionType.NONE.id).setMetrics("test-data".getBytes(StandardCharsets.UTF_8)), true).build();
            PushTelemetryResponse pushResponse = newClientMetricsManager.processPushTelemetryRequest(pushRequest, ClientMetricsTestUtils.requestContext());
            Assertions.assertEquals((Object)Errors.NONE, (Object)pushResponse.error());
            request = (GetTelemetrySubscriptionsRequest)new GetTelemetrySubscriptionsRequest.Builder(new GetTelemetrySubscriptionsRequestData().setClientInstanceId(clientInstanceId), true).build();
            response = newClientMetricsManager.processGetTelemetrySubscriptionRequest(request, ClientMetricsTestUtils.requestContext());
            Assertions.assertEquals((Object)Errors.THROTTLING_QUOTA_EXCEEDED, (Object)response.error());
            Assertions.assertEquals((Object)1.0, (Object)this.getMetric(kafkaMetrics, "throttle-count").metricValue());
            Assertions.assertTrue(((Double)this.getMetric(kafkaMetrics, "throttle-rate").metricValue() > 0.0 ? 1 : 0) != 0);
        }
    }

    @Test
    public void testGetTelemetryUpdateSubscription() throws UnknownHostException {
        this.clientMetricsManager.updateSubscription("sub-1", ClientMetricsTestUtils.defaultProperties());
        Assertions.assertEquals((int)1, (int)this.clientMetricsManager.subscriptions().size());
        GetTelemetrySubscriptionsRequest request = (GetTelemetrySubscriptionsRequest)new GetTelemetrySubscriptionsRequest.Builder(new GetTelemetrySubscriptionsRequestData(), true).build();
        GetTelemetrySubscriptionsResponse response = this.clientMetricsManager.processGetTelemetrySubscriptionRequest(request, ClientMetricsTestUtils.requestContext());
        Uuid clientInstanceId = response.data().clientInstanceId();
        int subscriptionId = response.data().subscriptionId();
        Assertions.assertNotNull((Object)clientInstanceId);
        Assertions.assertTrue((subscriptionId != 0 ? 1 : 0) != 0);
        Assertions.assertEquals((Object)Errors.NONE, (Object)response.error());
        Properties properties = new Properties();
        properties.put("metrics", "*");
        this.clientMetricsManager.updateSubscription("sub-2", properties);
        Assertions.assertEquals((int)2, (int)this.clientMetricsManager.subscriptions().size());
        request = (GetTelemetrySubscriptionsRequest)new GetTelemetrySubscriptionsRequest.Builder(new GetTelemetrySubscriptionsRequestData().setClientInstanceId(clientInstanceId), true).build();
        response = this.clientMetricsManager.processGetTelemetrySubscriptionRequest(request, ClientMetricsTestUtils.requestContext());
        Assertions.assertEquals((Object)Errors.NONE, (Object)response.error());
        Assertions.assertTrue((subscriptionId != response.data().subscriptionId() ? 1 : 0) != 0);
    }

    @Test
    public void testGetTelemetryConcurrentRequestNewClientInstance() throws Exception {
        GetTelemetrySubscriptionsRequest request = (GetTelemetrySubscriptionsRequest)new GetTelemetrySubscriptionsRequest.Builder(new GetTelemetrySubscriptionsRequestData().setClientInstanceId(Uuid.randomUuid()), true).build();
        CountDownLatch lock = new CountDownLatch(2);
        List<GetTelemetrySubscriptionsResponse> responses = Collections.synchronizedList(new ArrayList());
        Thread thread = new Thread(() -> {
            try {
                GetTelemetrySubscriptionsResponse response = this.clientMetricsManager.processGetTelemetrySubscriptionRequest(request, ClientMetricsTestUtils.requestContext());
                responses.add(response);
            }
            catch (UnknownHostException e) {
                LOG.error("Error processing request", (Throwable)e);
            }
            finally {
                lock.countDown();
            }
        });
        Thread thread1 = new Thread(() -> {
            try {
                GetTelemetrySubscriptionsResponse response = this.clientMetricsManager.processGetTelemetrySubscriptionRequest(request, ClientMetricsTestUtils.requestContext());
                responses.add(response);
            }
            catch (UnknownHostException e) {
                LOG.error("Error processing request", (Throwable)e);
            }
            finally {
                lock.countDown();
            }
        });
        thread.start();
        thread1.start();
        Assertions.assertTrue((boolean)lock.await(2000L, TimeUnit.MILLISECONDS));
        Assertions.assertEquals((int)2, (int)responses.size());
        int throttlingErrorCount = 0;
        for (GetTelemetrySubscriptionsResponse response : responses) {
            if (response.error() == Errors.THROTTLING_QUOTA_EXCEEDED) {
                ++throttlingErrorCount;
                continue;
            }
            Assertions.assertEquals((Object)Errors.NONE, (Object)response.error());
        }
        Assertions.assertEquals((int)1, (int)throttlingErrorCount);
        Assertions.assertEquals((Object)1.0, (Object)this.getMetric("throttle-count").metricValue());
        Assertions.assertTrue(((Double)this.getMetric("throttle-rate").metricValue() > 0.0 ? 1 : 0) != 0);
    }

    @Test
    public void testGetTelemetryConcurrentRequestAfterSubscriptionUpdate() throws Exception {
        GetTelemetrySubscriptionsRequest request = (GetTelemetrySubscriptionsRequest)new GetTelemetrySubscriptionsRequest.Builder(new GetTelemetrySubscriptionsRequestData().setClientInstanceId(Uuid.randomUuid()), true).build();
        GetTelemetrySubscriptionsResponse subscriptionsResponse = this.clientMetricsManager.processGetTelemetrySubscriptionRequest(request, ClientMetricsTestUtils.requestContext());
        ClientMetricsInstance instance = this.clientMetricsManager.clientInstance(subscriptionsResponse.data().clientInstanceId());
        Assertions.assertNotNull((Object)instance);
        CountDownLatch lock = new CountDownLatch(2);
        List<GetTelemetrySubscriptionsResponse> responses = Collections.synchronizedList(new ArrayList());
        this.clientMetricsManager.updateSubscription("sub-1", ClientMetricsTestUtils.defaultProperties());
        Assertions.assertEquals((int)1, (int)this.clientMetricsManager.subscriptions().size());
        Thread thread = new Thread(() -> {
            try {
                GetTelemetrySubscriptionsResponse response = this.clientMetricsManager.processGetTelemetrySubscriptionRequest(request, ClientMetricsTestUtils.requestContext());
                responses.add(response);
            }
            catch (UnknownHostException e) {
                LOG.error("Error processing request", (Throwable)e);
            }
            finally {
                lock.countDown();
            }
        });
        Thread thread1 = new Thread(() -> {
            try {
                GetTelemetrySubscriptionsResponse response = this.clientMetricsManager.processGetTelemetrySubscriptionRequest(request, ClientMetricsTestUtils.requestContext());
                responses.add(response);
            }
            catch (UnknownHostException e) {
                LOG.error("Error processing request", (Throwable)e);
            }
            finally {
                lock.countDown();
            }
        });
        thread.start();
        thread1.start();
        Assertions.assertTrue((boolean)lock.await(2000L, TimeUnit.MILLISECONDS));
        Assertions.assertEquals((int)2, (int)responses.size());
        int throttlingErrorCount = 0;
        for (GetTelemetrySubscriptionsResponse response : responses) {
            if (response.error() == Errors.THROTTLING_QUOTA_EXCEEDED) {
                ++throttlingErrorCount;
                continue;
            }
            Assertions.assertEquals((Object)Errors.NONE, (Object)response.error());
        }
        Assertions.assertEquals((int)1, (int)throttlingErrorCount);
        Assertions.assertEquals((Object)1.0, (Object)this.getMetric("throttle-count").metricValue());
        Assertions.assertTrue(((Double)this.getMetric("throttle-rate").metricValue() > 0.0 ? 1 : 0) != 0);
    }

    @Test
    public void testPushTelemetry() throws Exception {
        this.clientMetricsManager.updateSubscription("sub-1", ClientMetricsTestUtils.defaultProperties());
        Assertions.assertEquals((int)1, (int)this.clientMetricsManager.subscriptions().size());
        GetTelemetrySubscriptionsRequest subscriptionsRequest = (GetTelemetrySubscriptionsRequest)new GetTelemetrySubscriptionsRequest.Builder(new GetTelemetrySubscriptionsRequestData(), true).build();
        GetTelemetrySubscriptionsResponse subscriptionsResponse = this.clientMetricsManager.processGetTelemetrySubscriptionRequest(subscriptionsRequest, ClientMetricsTestUtils.requestContext());
        ClientMetricsInstance instance = this.clientMetricsManager.clientInstance(subscriptionsResponse.data().clientInstanceId());
        Assertions.assertNotNull((Object)instance);
        PushTelemetryRequest request = (PushTelemetryRequest)new PushTelemetryRequest.Builder(new PushTelemetryRequestData().setClientInstanceId(subscriptionsResponse.data().clientInstanceId()).setSubscriptionId(subscriptionsResponse.data().subscriptionId()).setCompressionType(CompressionType.NONE.id).setMetrics("test-data".getBytes(StandardCharsets.UTF_8)), true).build();
        PushTelemetryResponse response = this.clientMetricsManager.processPushTelemetryRequest(request, ClientMetricsTestUtils.requestContext());
        Assertions.assertEquals((Object)Errors.NONE, (Object)response.error());
        Assertions.assertFalse((boolean)instance.terminating());
        Assertions.assertEquals((Object)Errors.NONE, (Object)instance.lastKnownError());
        Assertions.assertEquals((int)12, (int)this.kafkaMetrics.metrics().size());
        Assertions.assertEquals((Object)1.0, (Object)this.getMetric("instance-count").metricValue());
        Assertions.assertEquals((Object)0.0, (Object)this.getMetric("unknown-subscription-request-count").metricValue());
        Assertions.assertEquals((Object)0.0, (Object)this.getMetric("throttle-count").metricValue());
        Assertions.assertEquals((Object)1.0, (Object)this.getMetric("plugin-export-count").metricValue());
        Assertions.assertTrue(((Double)this.getMetric("plugin-export-rate").metricValue() > 0.0 ? 1 : 0) != 0);
        Assertions.assertEquals((Object)0.0, (Object)this.getMetric("plugin-error-count").metricValue());
        Assertions.assertEquals((Object)0.0, (Object)this.getMetric("plugin-error-rate").metricValue());
        Assertions.assertNotEquals((Object)Double.NaN, (Object)this.getMetric("plugin-export-time-avg").metricValue());
        Assertions.assertNotEquals((Object)Double.NaN, (Object)this.getMetric("plugin-export-time-max").metricValue());
    }

    @Test
    public void testPushTelemetryOnNewServer() throws Exception {
        GetTelemetrySubscriptionsRequest subscriptionsRequest = (GetTelemetrySubscriptionsRequest)new GetTelemetrySubscriptionsRequest.Builder(new GetTelemetrySubscriptionsRequestData(), true).build();
        GetTelemetrySubscriptionsResponse subscriptionsResponse = this.clientMetricsManager.processGetTelemetrySubscriptionRequest(subscriptionsRequest, ClientMetricsTestUtils.requestContext());
        try (Metrics kafkaMetrics = new Metrics();
             ClientMetricsManager newClientMetricsManager = new ClientMetricsManager(this.clientMetricsReceiverPlugin, 100, (Time)this.time, kafkaMetrics);){
            PushTelemetryRequest request = (PushTelemetryRequest)new PushTelemetryRequest.Builder(new PushTelemetryRequestData().setClientInstanceId(subscriptionsResponse.data().clientInstanceId()).setSubscriptionId(subscriptionsResponse.data().subscriptionId()).setMetrics("test-data".getBytes(StandardCharsets.UTF_8)), true).build();
            PushTelemetryResponse response = newClientMetricsManager.processPushTelemetryRequest(request, ClientMetricsTestUtils.requestContext());
            Assertions.assertEquals((Object)Errors.NONE, (Object)response.error());
            Assertions.assertEquals((int)12, (int)kafkaMetrics.metrics().size());
            Assertions.assertEquals((Object)1.0, (Object)this.getMetric(kafkaMetrics, "instance-count").metricValue());
            Assertions.assertEquals((Object)0.0, (Object)this.getMetric(kafkaMetrics, "unknown-subscription-request-count").metricValue());
            Assertions.assertEquals((Object)0.0, (Object)this.getMetric(kafkaMetrics, "throttle-count").metricValue());
            Assertions.assertEquals((Object)1.0, (Object)this.getMetric(kafkaMetrics, "plugin-export-count").metricValue());
            Assertions.assertEquals((Object)0.0, (Object)this.getMetric(kafkaMetrics, "plugin-error-count").metricValue());
            Assertions.assertNotEquals((Object)Double.NaN, (Object)this.getMetric(kafkaMetrics, "plugin-export-time-avg").metricValue());
            Assertions.assertNotEquals((Object)Double.NaN, (Object)this.getMetric(kafkaMetrics, "plugin-export-time-max").metricValue());
        }
    }

    @Test
    public void testPushTelemetryAfterPushIntervalTime() throws UnknownHostException {
        this.clientMetricsManager.updateSubscription("sub-1", ClientMetricsTestUtils.defaultProperties());
        Assertions.assertEquals((int)1, (int)this.clientMetricsManager.subscriptions().size());
        GetTelemetrySubscriptionsRequest subscriptionsRequest = (GetTelemetrySubscriptionsRequest)new GetTelemetrySubscriptionsRequest.Builder(new GetTelemetrySubscriptionsRequestData(), true).build();
        GetTelemetrySubscriptionsResponse subscriptionsResponse = this.clientMetricsManager.processGetTelemetrySubscriptionRequest(subscriptionsRequest, ClientMetricsTestUtils.requestContext());
        PushTelemetryRequest request = (PushTelemetryRequest)new PushTelemetryRequest.Builder(new PushTelemetryRequestData().setClientInstanceId(subscriptionsResponse.data().clientInstanceId()).setSubscriptionId(subscriptionsResponse.data().subscriptionId()).setCompressionType(CompressionType.NONE.id).setMetrics("test-data".getBytes(StandardCharsets.UTF_8)), true).build();
        PushTelemetryResponse response = this.clientMetricsManager.processPushTelemetryRequest(request, ClientMetricsTestUtils.requestContext());
        Assertions.assertEquals((Object)Errors.NONE, (Object)response.error());
        this.time.sleep(30000L);
        response = this.clientMetricsManager.processPushTelemetryRequest(request, ClientMetricsTestUtils.requestContext());
        Assertions.assertEquals((Object)Errors.NONE, (Object)response.error());
    }

    @Test
    public void testPushTelemetryClientInstanceIdInvalid() throws UnknownHostException {
        PushTelemetryRequest request = (PushTelemetryRequest)new PushTelemetryRequest.Builder(new PushTelemetryRequestData().setClientInstanceId(null), true).build();
        PushTelemetryResponse response = this.clientMetricsManager.processPushTelemetryRequest(request, ClientMetricsTestUtils.requestContext());
        Assertions.assertEquals((Object)Errors.INVALID_REQUEST, (Object)response.error());
        request = (PushTelemetryRequest)new PushTelemetryRequest.Builder(new PushTelemetryRequestData().setClientInstanceId(Uuid.ZERO_UUID), true).build();
        response = this.clientMetricsManager.processPushTelemetryRequest(request, ClientMetricsTestUtils.requestContext());
        Assertions.assertEquals((Object)Errors.INVALID_REQUEST, (Object)response.error());
    }

    @Test
    public void testPushTelemetryThrottleError() throws Exception {
        GetTelemetrySubscriptionsRequest subscriptionsRequest = (GetTelemetrySubscriptionsRequest)new GetTelemetrySubscriptionsRequest.Builder(new GetTelemetrySubscriptionsRequestData(), true).build();
        GetTelemetrySubscriptionsResponse subscriptionsResponse = this.clientMetricsManager.processGetTelemetrySubscriptionRequest(subscriptionsRequest, ClientMetricsTestUtils.requestContext());
        PushTelemetryRequest request = (PushTelemetryRequest)new PushTelemetryRequest.Builder(new PushTelemetryRequestData().setClientInstanceId(subscriptionsResponse.data().clientInstanceId()).setSubscriptionId(subscriptionsResponse.data().subscriptionId()).setMetrics("test-data".getBytes(StandardCharsets.UTF_8)), true).build();
        PushTelemetryResponse response = this.clientMetricsManager.processPushTelemetryRequest(request, ClientMetricsTestUtils.requestContext());
        Assertions.assertEquals((Object)Errors.NONE, (Object)response.error());
        response = this.clientMetricsManager.processPushTelemetryRequest(request, ClientMetricsTestUtils.requestContext());
        Assertions.assertEquals((Object)Errors.THROTTLING_QUOTA_EXCEEDED, (Object)response.error());
        ClientMetricsInstance instance = this.clientMetricsManager.clientInstance(subscriptionsResponse.data().clientInstanceId());
        Assertions.assertNotNull((Object)instance);
        Assertions.assertFalse((boolean)instance.terminating());
        Assertions.assertEquals((Object)Errors.THROTTLING_QUOTA_EXCEEDED, (Object)instance.lastKnownError());
        Assertions.assertEquals((Object)1.0, (Object)this.getMetric("throttle-count").metricValue());
        Assertions.assertTrue(((Double)this.getMetric("throttle-rate").metricValue() > 0.0 ? 1 : 0) != 0);
        Assertions.assertEquals((Object)1.0, (Object)this.getMetric("plugin-export-count").metricValue());
        Assertions.assertEquals((Object)0.0, (Object)this.getMetric("plugin-error-count").metricValue());
        Assertions.assertNotEquals((Object)Double.NaN, (Object)this.getMetric("plugin-export-time-avg").metricValue());
        Assertions.assertNotEquals((Object)Double.NaN, (Object)this.getMetric("plugin-export-time-max").metricValue());
    }

    @Test
    public void testPushTelemetryTerminatingFlag() throws Exception {
        GetTelemetrySubscriptionsRequest subscriptionsRequest = (GetTelemetrySubscriptionsRequest)new GetTelemetrySubscriptionsRequest.Builder(new GetTelemetrySubscriptionsRequestData(), true).build();
        GetTelemetrySubscriptionsResponse subscriptionsResponse = this.clientMetricsManager.processGetTelemetrySubscriptionRequest(subscriptionsRequest, ClientMetricsTestUtils.requestContext());
        PushTelemetryRequest request = (PushTelemetryRequest)new PushTelemetryRequest.Builder(new PushTelemetryRequestData().setClientInstanceId(subscriptionsResponse.data().clientInstanceId()).setSubscriptionId(subscriptionsResponse.data().subscriptionId()).setMetrics("test-data".getBytes(StandardCharsets.UTF_8)), true).build();
        PushTelemetryResponse response = this.clientMetricsManager.processPushTelemetryRequest(request, ClientMetricsTestUtils.requestContext());
        Assertions.assertEquals((Object)Errors.NONE, (Object)response.error());
        request = (PushTelemetryRequest)new PushTelemetryRequest.Builder(new PushTelemetryRequestData().setClientInstanceId(subscriptionsResponse.data().clientInstanceId()).setSubscriptionId(subscriptionsResponse.data().subscriptionId()).setMetrics("test-data".getBytes(StandardCharsets.UTF_8)).setTerminating(true), true).build();
        response = this.clientMetricsManager.processPushTelemetryRequest(request, ClientMetricsTestUtils.requestContext());
        Assertions.assertEquals((Object)Errors.NONE, (Object)response.error());
        ClientMetricsInstance instance = this.clientMetricsManager.clientInstance(subscriptionsResponse.data().clientInstanceId());
        Assertions.assertNotNull((Object)instance);
        Assertions.assertTrue((boolean)instance.terminating());
        Assertions.assertEquals((Object)Errors.NONE, (Object)instance.lastKnownError());
        Assertions.assertEquals((Object)2.0, (Object)this.getMetric("plugin-export-count").metricValue());
        Assertions.assertTrue(((Double)this.getMetric("plugin-export-rate").metricValue() > 0.0 ? 1 : 0) != 0);
        Assertions.assertEquals((Object)0.0, (Object)this.getMetric("plugin-error-count").metricValue());
        Assertions.assertNotEquals((Object)Double.NaN, (Object)this.getMetric("plugin-export-time-avg").metricValue());
        Assertions.assertNotEquals((Object)Double.NaN, (Object)this.getMetric("plugin-export-time-max").metricValue());
    }

    @Test
    public void testPushTelemetryNextRequestPostTerminatingFlag() throws UnknownHostException {
        GetTelemetrySubscriptionsRequest subscriptionsRequest = (GetTelemetrySubscriptionsRequest)new GetTelemetrySubscriptionsRequest.Builder(new GetTelemetrySubscriptionsRequestData(), true).build();
        GetTelemetrySubscriptionsResponse subscriptionsResponse = this.clientMetricsManager.processGetTelemetrySubscriptionRequest(subscriptionsRequest, ClientMetricsTestUtils.requestContext());
        ClientMetricsInstance instance = this.clientMetricsManager.clientInstance(subscriptionsResponse.data().clientInstanceId());
        Assertions.assertNotNull((Object)instance);
        PushTelemetryRequest request = (PushTelemetryRequest)new PushTelemetryRequest.Builder(new PushTelemetryRequestData().setClientInstanceId(subscriptionsResponse.data().clientInstanceId()).setSubscriptionId(subscriptionsResponse.data().subscriptionId()).setTerminating(true), true).build();
        PushTelemetryResponse response = this.clientMetricsManager.processPushTelemetryRequest(request, ClientMetricsTestUtils.requestContext());
        Assertions.assertEquals((Object)Errors.NONE, (Object)response.error());
        Assertions.assertTrue((boolean)instance.terminating());
        Assertions.assertEquals((Object)Errors.NONE, (Object)instance.lastKnownError());
        request = (PushTelemetryRequest)new PushTelemetryRequest.Builder(new PushTelemetryRequestData().setClientInstanceId(subscriptionsResponse.data().clientInstanceId()).setSubscriptionId(subscriptionsResponse.data().subscriptionId()).setTerminating(true), true).build();
        response = this.clientMetricsManager.processPushTelemetryRequest(request, ClientMetricsTestUtils.requestContext());
        Assertions.assertEquals((Object)Errors.INVALID_REQUEST, (Object)response.error());
        Assertions.assertTrue((boolean)instance.terminating());
        Assertions.assertEquals((Object)Errors.INVALID_REQUEST, (Object)instance.lastKnownError());
    }

    @Test
    public void testPushTelemetrySubscriptionIdInvalid() throws Exception {
        GetTelemetrySubscriptionsRequest subscriptionsRequest = (GetTelemetrySubscriptionsRequest)new GetTelemetrySubscriptionsRequest.Builder(new GetTelemetrySubscriptionsRequestData(), true).build();
        GetTelemetrySubscriptionsResponse subscriptionsResponse = this.clientMetricsManager.processGetTelemetrySubscriptionRequest(subscriptionsRequest, ClientMetricsTestUtils.requestContext());
        ClientMetricsInstance instance = this.clientMetricsManager.clientInstance(subscriptionsResponse.data().clientInstanceId());
        Assertions.assertNotNull((Object)instance);
        PushTelemetryRequest request = (PushTelemetryRequest)new PushTelemetryRequest.Builder(new PushTelemetryRequestData().setClientInstanceId(subscriptionsResponse.data().clientInstanceId()).setMetrics("test-data".getBytes(StandardCharsets.UTF_8)).setSubscriptionId(1234), true).build();
        PushTelemetryResponse response = this.clientMetricsManager.processPushTelemetryRequest(request, ClientMetricsTestUtils.requestContext());
        Assertions.assertEquals((Object)Errors.UNKNOWN_SUBSCRIPTION_ID, (Object)response.error());
        Assertions.assertFalse((boolean)instance.terminating());
        Assertions.assertEquals((Object)Errors.UNKNOWN_SUBSCRIPTION_ID, (Object)instance.lastKnownError());
        Assertions.assertEquals((Object)1.0, (Object)this.getMetric("unknown-subscription-request-count").metricValue());
        Assertions.assertEquals((Object)0.0, (Object)this.getMetric("plugin-export-count").metricValue());
        Assertions.assertEquals((Object)0.0, (Object)this.getMetric("plugin-error-count").metricValue());
        Assertions.assertEquals((Object)Double.NaN, (Object)this.getMetric("plugin-export-time-avg").metricValue());
        Assertions.assertEquals((Object)Double.NaN, (Object)this.getMetric("plugin-export-time-max").metricValue());
    }

    @Test
    public void testPushTelemetryCompressionTypeInvalid() throws UnknownHostException {
        GetTelemetrySubscriptionsRequest subscriptionsRequest = (GetTelemetrySubscriptionsRequest)new GetTelemetrySubscriptionsRequest.Builder(new GetTelemetrySubscriptionsRequestData(), true).build();
        GetTelemetrySubscriptionsResponse subscriptionsResponse = this.clientMetricsManager.processGetTelemetrySubscriptionRequest(subscriptionsRequest, ClientMetricsTestUtils.requestContext());
        ClientMetricsInstance instance = this.clientMetricsManager.clientInstance(subscriptionsResponse.data().clientInstanceId());
        Assertions.assertNotNull((Object)instance);
        PushTelemetryRequest request = (PushTelemetryRequest)new PushTelemetryRequest.Builder(new PushTelemetryRequestData().setClientInstanceId(subscriptionsResponse.data().clientInstanceId()).setSubscriptionId(subscriptionsResponse.data().subscriptionId()).setCompressionType((byte)100), true).build();
        PushTelemetryResponse response = this.clientMetricsManager.processPushTelemetryRequest(request, ClientMetricsTestUtils.requestContext());
        Assertions.assertEquals((Object)Errors.UNSUPPORTED_COMPRESSION_TYPE, (Object)response.error());
        Assertions.assertFalse((boolean)instance.terminating());
        Assertions.assertEquals((Object)Errors.UNSUPPORTED_COMPRESSION_TYPE, (Object)instance.lastKnownError());
    }

    @Test
    public void testPushTelemetryNullMetricsData() throws Exception {
        GetTelemetrySubscriptionsRequest subscriptionsRequest = (GetTelemetrySubscriptionsRequest)new GetTelemetrySubscriptionsRequest.Builder(new GetTelemetrySubscriptionsRequestData(), true).build();
        GetTelemetrySubscriptionsResponse subscriptionsResponse = this.clientMetricsManager.processGetTelemetrySubscriptionRequest(subscriptionsRequest, ClientMetricsTestUtils.requestContext());
        ClientMetricsInstance instance = this.clientMetricsManager.clientInstance(subscriptionsResponse.data().clientInstanceId());
        Assertions.assertNotNull((Object)instance);
        PushTelemetryRequest request = (PushTelemetryRequest)new PushTelemetryRequest.Builder(new PushTelemetryRequestData().setClientInstanceId(subscriptionsResponse.data().clientInstanceId()).setSubscriptionId(subscriptionsResponse.data().subscriptionId()).setMetrics(null), true).build();
        PushTelemetryResponse response = this.clientMetricsManager.processPushTelemetryRequest(request, ClientMetricsTestUtils.requestContext());
        Assertions.assertEquals((Object)Errors.NONE, (Object)response.error());
        Assertions.assertFalse((boolean)instance.terminating());
        Assertions.assertEquals((Object)Errors.NONE, (Object)instance.lastKnownError());
        Assertions.assertEquals((Object)1.0, (Object)this.getMetric("instance-count").metricValue());
        Assertions.assertEquals((Object)0.0, (Object)this.getMetric("plugin-export-count").metricValue());
        Assertions.assertEquals((Object)0.0, (Object)this.getMetric("plugin-error-count").metricValue());
        Assertions.assertEquals((Object)Double.NaN, (Object)this.getMetric("plugin-export-time-avg").metricValue());
        Assertions.assertEquals((Object)Double.NaN, (Object)this.getMetric("plugin-export-time-max").metricValue());
    }

    @Test
    public void testPushTelemetryMetricsTooLarge() throws Exception {
        try (Metrics kafkaMetrics = new Metrics();
             ClientMetricsManager clientMetricsManager = new ClientMetricsManager(this.clientMetricsReceiverPlugin, 1, (Time)this.time, kafkaMetrics);){
            GetTelemetrySubscriptionsRequest subscriptionsRequest = (GetTelemetrySubscriptionsRequest)new GetTelemetrySubscriptionsRequest.Builder(new GetTelemetrySubscriptionsRequestData(), true).build();
            GetTelemetrySubscriptionsResponse subscriptionsResponse = clientMetricsManager.processGetTelemetrySubscriptionRequest(subscriptionsRequest, ClientMetricsTestUtils.requestContext());
            ClientMetricsInstance instance = clientMetricsManager.clientInstance(subscriptionsResponse.data().clientInstanceId());
            Assertions.assertNotNull((Object)instance);
            byte[] metrics = "ab".getBytes(StandardCharsets.UTF_8);
            Assertions.assertEquals((int)2, (int)metrics.length);
            PushTelemetryRequest request = (PushTelemetryRequest)new PushTelemetryRequest.Builder(new PushTelemetryRequestData().setClientInstanceId(subscriptionsResponse.data().clientInstanceId()).setSubscriptionId(subscriptionsResponse.data().subscriptionId()).setMetrics(metrics), true).build();
            PushTelemetryResponse response = clientMetricsManager.processPushTelemetryRequest(request, ClientMetricsTestUtils.requestContext());
            Assertions.assertEquals((Object)Errors.TELEMETRY_TOO_LARGE, (Object)response.error());
            Assertions.assertFalse((boolean)instance.terminating());
            Assertions.assertEquals((Object)Errors.TELEMETRY_TOO_LARGE, (Object)instance.lastKnownError());
        }
    }

    @Test
    public void testPushTelemetryConcurrentRequestNewClientInstance() throws Exception {
        GetTelemetrySubscriptionsRequest subscriptionsRequest = (GetTelemetrySubscriptionsRequest)new GetTelemetrySubscriptionsRequest.Builder(new GetTelemetrySubscriptionsRequestData(), true).build();
        GetTelemetrySubscriptionsResponse subscriptionsResponse = this.clientMetricsManager.processGetTelemetrySubscriptionRequest(subscriptionsRequest, ClientMetricsTestUtils.requestContext());
        ClientMetricsInstance instance = this.clientMetricsManager.clientInstance(subscriptionsResponse.data().clientInstanceId());
        Assertions.assertNotNull((Object)instance);
        PushTelemetryRequest request = (PushTelemetryRequest)new PushTelemetryRequest.Builder(new PushTelemetryRequestData().setClientInstanceId(subscriptionsResponse.data().clientInstanceId()).setSubscriptionId(subscriptionsResponse.data().subscriptionId()).setCompressionType(CompressionType.NONE.id).setMetrics("test-data".getBytes(StandardCharsets.UTF_8)), true).build();
        CountDownLatch lock = new CountDownLatch(2);
        List<PushTelemetryResponse> responses = Collections.synchronizedList(new ArrayList());
        try (Metrics kafkaMetrics = new Metrics();
             ClientMetricsManager newClientMetricsManager = new ClientMetricsManager(this.clientMetricsReceiverPlugin, 100, (Time)this.time, kafkaMetrics);){
            Thread thread = new Thread(() -> {
                try {
                    PushTelemetryResponse response = newClientMetricsManager.processPushTelemetryRequest(request, ClientMetricsTestUtils.requestContext());
                    responses.add(response);
                }
                catch (UnknownHostException e) {
                    LOG.error("Error processing request", (Throwable)e);
                }
                finally {
                    lock.countDown();
                }
            });
            Thread thread1 = new Thread(() -> {
                try {
                    PushTelemetryResponse response = newClientMetricsManager.processPushTelemetryRequest(request, ClientMetricsTestUtils.requestContext());
                    responses.add(response);
                }
                catch (UnknownHostException e) {
                    LOG.error("Error processing request", (Throwable)e);
                }
                finally {
                    lock.countDown();
                }
            });
            thread.start();
            thread1.start();
            Assertions.assertTrue((boolean)lock.await(2000L, TimeUnit.MILLISECONDS));
            Assertions.assertEquals((int)2, (int)responses.size());
            int throttlingErrorCount = 0;
            for (PushTelemetryResponse response : responses) {
                if (response.error() == Errors.THROTTLING_QUOTA_EXCEEDED) {
                    ++throttlingErrorCount;
                    continue;
                }
                Assertions.assertEquals((Object)Errors.NONE, (Object)response.error());
            }
            Assertions.assertEquals((int)1, (int)throttlingErrorCount);
            Assertions.assertEquals((Object)1.0, (Object)this.getMetric(kafkaMetrics, "throttle-count").metricValue());
            Assertions.assertTrue(((Double)this.getMetric(kafkaMetrics, "throttle-rate").metricValue() > 0.0 ? 1 : 0) != 0);
            Assertions.assertEquals((Object)1.0, (Object)this.getMetric(kafkaMetrics, "plugin-export-count").metricValue());
            Assertions.assertEquals((Object)0.0, (Object)this.getMetric(kafkaMetrics, "plugin-error-count").metricValue());
            Assertions.assertNotEquals((Object)Double.NaN, (Object)this.getMetric(kafkaMetrics, "plugin-export-time-avg").metricValue());
            Assertions.assertNotEquals((Object)Double.NaN, (Object)this.getMetric(kafkaMetrics, "plugin-export-time-max").metricValue());
        }
    }

    @Test
    public void testPushTelemetryConcurrentRequestAfterSubscriptionUpdate() throws Exception {
        GetTelemetrySubscriptionsRequest subscriptionsRequest = (GetTelemetrySubscriptionsRequest)new GetTelemetrySubscriptionsRequest.Builder(new GetTelemetrySubscriptionsRequestData(), true).build();
        GetTelemetrySubscriptionsResponse subscriptionsResponse = this.clientMetricsManager.processGetTelemetrySubscriptionRequest(subscriptionsRequest, ClientMetricsTestUtils.requestContext());
        ClientMetricsInstance instance = this.clientMetricsManager.clientInstance(subscriptionsResponse.data().clientInstanceId());
        Assertions.assertNotNull((Object)instance);
        PushTelemetryRequest request = (PushTelemetryRequest)new PushTelemetryRequest.Builder(new PushTelemetryRequestData().setClientInstanceId(subscriptionsResponse.data().clientInstanceId()).setSubscriptionId(subscriptionsResponse.data().subscriptionId()).setCompressionType(CompressionType.NONE.id).setMetrics("test-data".getBytes(StandardCharsets.UTF_8)), true).build();
        this.clientMetricsManager.updateSubscription("sub-1", ClientMetricsTestUtils.defaultProperties());
        Assertions.assertEquals((int)1, (int)this.clientMetricsManager.subscriptions().size());
        CountDownLatch lock = new CountDownLatch(2);
        List<PushTelemetryResponse> responses = Collections.synchronizedList(new ArrayList());
        Thread thread = new Thread(() -> {
            try {
                PushTelemetryResponse response = this.clientMetricsManager.processPushTelemetryRequest(request, ClientMetricsTestUtils.requestContext());
                responses.add(response);
            }
            catch (UnknownHostException e) {
                LOG.error("Error processing request", (Throwable)e);
            }
            finally {
                lock.countDown();
            }
        });
        Thread thread1 = new Thread(() -> {
            try {
                PushTelemetryResponse response = this.clientMetricsManager.processPushTelemetryRequest(request, ClientMetricsTestUtils.requestContext());
                responses.add(response);
            }
            catch (UnknownHostException e) {
                LOG.error("Error processing request", (Throwable)e);
            }
            finally {
                lock.countDown();
            }
        });
        thread.start();
        thread1.start();
        Assertions.assertTrue((boolean)lock.await(2000L, TimeUnit.MILLISECONDS));
        Assertions.assertEquals((int)2, (int)responses.size());
        int throttlingErrorCount = 0;
        for (PushTelemetryResponse response : responses) {
            if (response.error() == Errors.THROTTLING_QUOTA_EXCEEDED) {
                ++throttlingErrorCount;
                continue;
            }
            Assertions.assertEquals((Object)Errors.UNKNOWN_SUBSCRIPTION_ID, (Object)response.error());
        }
        Assertions.assertEquals((int)1, (int)throttlingErrorCount);
        Assertions.assertEquals((Object)1.0, (Object)this.getMetric("unknown-subscription-request-count").metricValue());
        Assertions.assertTrue(((Double)this.getMetric("unknown-subscription-request-rate").metricValue() > 0.0 ? 1 : 0) != 0);
        Assertions.assertEquals((Object)1.0, (Object)this.getMetric("throttle-count").metricValue());
        Assertions.assertTrue(((Double)this.getMetric("throttle-rate").metricValue() > 0.0 ? 1 : 0) != 0);
        Assertions.assertEquals((Object)0.0, (Object)this.getMetric("plugin-export-count").metricValue());
        Assertions.assertEquals((Object)0.0, (Object)this.getMetric("plugin-error-count").metricValue());
        Assertions.assertEquals((Object)Double.NaN, (Object)this.getMetric("plugin-export-time-avg").metricValue());
        Assertions.assertEquals((Object)Double.NaN, (Object)this.getMetric("plugin-export-time-max").metricValue());
    }

    @Test
    public void testPushTelemetryPluginException() throws Exception {
        ClientMetricsReceiverPlugin receiverPlugin = (ClientMetricsReceiverPlugin)Mockito.mock(ClientMetricsReceiverPlugin.class);
        ((ClientMetricsReceiverPlugin)Mockito.doThrow((Throwable[])new Throwable[]{new RuntimeException("test exception")}).when((Object)receiverPlugin)).exportMetrics((RequestContext)Mockito.any(), (PushTelemetryRequest)Mockito.any());
        try (Metrics kafkaMetrics = new Metrics();
             ClientMetricsManager clientMetricsManager = new ClientMetricsManager(receiverPlugin, 100, (Time)this.time, 100, kafkaMetrics);){
            clientMetricsManager.updateSubscription("sub-1", ClientMetricsTestUtils.defaultProperties());
            Assertions.assertEquals((int)1, (int)clientMetricsManager.subscriptions().size());
            GetTelemetrySubscriptionsRequest subscriptionsRequest = (GetTelemetrySubscriptionsRequest)new GetTelemetrySubscriptionsRequest.Builder(new GetTelemetrySubscriptionsRequestData(), true).build();
            GetTelemetrySubscriptionsResponse subscriptionsResponse = clientMetricsManager.processGetTelemetrySubscriptionRequest(subscriptionsRequest, ClientMetricsTestUtils.requestContext());
            ClientMetricsInstance instance = clientMetricsManager.clientInstance(subscriptionsResponse.data().clientInstanceId());
            Assertions.assertNotNull((Object)instance);
            PushTelemetryRequest request = (PushTelemetryRequest)new PushTelemetryRequest.Builder(new PushTelemetryRequestData().setClientInstanceId(subscriptionsResponse.data().clientInstanceId()).setSubscriptionId(subscriptionsResponse.data().subscriptionId()).setCompressionType(CompressionType.NONE.id).setMetrics("test-data".getBytes(StandardCharsets.UTF_8)), true).build();
            PushTelemetryResponse response = clientMetricsManager.processPushTelemetryRequest(request, ClientMetricsTestUtils.requestContext());
            Assertions.assertEquals((Object)Errors.INVALID_RECORD, (Object)response.error());
            Assertions.assertFalse((boolean)instance.terminating());
            Assertions.assertEquals((Object)Errors.INVALID_RECORD, (Object)instance.lastKnownError());
            Assertions.assertEquals((Object)0.0, (Object)this.getMetric(kafkaMetrics, "unknown-subscription-request-count").metricValue());
            Assertions.assertEquals((Object)0.0, (Object)this.getMetric(kafkaMetrics, "throttle-count").metricValue());
            Assertions.assertEquals((Object)0.0, (Object)this.getMetric(kafkaMetrics, "plugin-export-count").metricValue());
            Assertions.assertEquals((Object)1.0, (Object)this.getMetric(kafkaMetrics, "plugin-error-count").metricValue());
            Assertions.assertTrue(((Double)this.getMetric(kafkaMetrics, "plugin-error-rate").metricValue() > 0.0 ? 1 : 0) != 0);
            Assertions.assertEquals((Object)Double.NaN, (Object)this.getMetric(kafkaMetrics, "plugin-export-time-avg").metricValue());
            Assertions.assertEquals((Object)Double.NaN, (Object)this.getMetric(kafkaMetrics, "plugin-export-time-max").metricValue());
        }
    }

    @Test
    public void testCacheEviction() throws Exception {
        Properties properties = new Properties();
        properties.put("metrics", "*");
        properties.put("interval.ms", "100");
        this.clientMetricsManager.updateSubscription("sub-1", properties);
        GetTelemetrySubscriptionsRequest request = (GetTelemetrySubscriptionsRequest)new GetTelemetrySubscriptionsRequest.Builder(new GetTelemetrySubscriptionsRequestData(), true).build();
        GetTelemetrySubscriptionsResponse response = this.clientMetricsManager.processGetTelemetrySubscriptionRequest(request, ClientMetricsTestUtils.requestContext());
        Assertions.assertEquals((Object)Errors.NONE, (Object)response.error());
        Assertions.assertEquals((int)12, (int)this.kafkaMetrics.metrics().size());
        Assertions.assertEquals((Object)1.0, (Object)this.getMetric("instance-count").metricValue());
        Assertions.assertNotNull((Object)this.clientMetricsManager.clientInstance(response.data().clientInstanceId()));
        Assertions.assertEquals((int)1, (int)this.clientMetricsManager.expirationTimer().size());
        this.clientMetricsManager.expirationTimer().advanceClock(300L);
        Assertions.assertTimeoutPreemptively((Duration)Duration.ofMillis(300L), () -> {
            while (this.clientMetricsManager.expirationTimer().size() != 0 || this.clientMetricsManager.clientInstance(response.data().clientInstanceId()) != null) {
                Thread.sleep(50L);
            }
        });
        Assertions.assertEquals((int)4, (int)this.kafkaMetrics.metrics().size());
        Assertions.assertEquals((Object)0.0, (Object)this.getMetric("instance-count").metricValue());
    }

    @Test
    public void testCacheEvictionWithMultipleClients() throws Exception {
        Properties properties = new Properties();
        properties.put("metrics", "*");
        properties.put("interval.ms", "100");
        this.clientMetricsManager.updateSubscription("sub-1", properties);
        GetTelemetrySubscriptionsRequest request = (GetTelemetrySubscriptionsRequest)new GetTelemetrySubscriptionsRequest.Builder(new GetTelemetrySubscriptionsRequestData(), true).build();
        GetTelemetrySubscriptionsResponse response1 = this.clientMetricsManager.processGetTelemetrySubscriptionRequest(request, ClientMetricsTestUtils.requestContext());
        Assertions.assertEquals((Object)Errors.NONE, (Object)response1.error());
        GetTelemetrySubscriptionsResponse response2 = this.clientMetricsManager.processGetTelemetrySubscriptionRequest(request, ClientMetricsTestUtils.requestContext());
        Assertions.assertEquals((Object)Errors.NONE, (Object)response2.error());
        Assertions.assertEquals((int)20, (int)this.kafkaMetrics.metrics().size());
        Assertions.assertEquals((Object)2.0, (Object)this.getMetric("instance-count").metricValue());
        Assertions.assertNotNull((Object)this.clientMetricsManager.clientInstance(response1.data().clientInstanceId()));
        Assertions.assertNotNull((Object)this.clientMetricsManager.clientInstance(response2.data().clientInstanceId()));
        Assertions.assertEquals((int)2, (int)this.clientMetricsManager.expirationTimer().size());
        this.clientMetricsManager.expirationTimer().advanceClock(300L);
        Assertions.assertTimeoutPreemptively((Duration)Duration.ofMillis(300L), () -> {
            while (this.clientMetricsManager.expirationTimer().size() != 0 || this.clientMetricsManager.clientInstance(response1.data().clientInstanceId()) != null || this.clientMetricsManager.clientInstance(response2.data().clientInstanceId()) != null) {
                Thread.sleep(50L);
            }
        });
        Assertions.assertEquals((int)4, (int)this.kafkaMetrics.metrics().size());
        Assertions.assertEquals((Object)0.0, (Object)this.getMetric("instance-count").metricValue());
    }

    @Test
    public void testCacheExpirationTaskCancelledOnInstanceUpdate() throws Exception {
        GetTelemetrySubscriptionsRequest request = (GetTelemetrySubscriptionsRequest)new GetTelemetrySubscriptionsRequest.Builder(new GetTelemetrySubscriptionsRequestData(), true).build();
        GetTelemetrySubscriptionsResponse response = this.clientMetricsManager.processGetTelemetrySubscriptionRequest(request, ClientMetricsTestUtils.requestContext());
        Assertions.assertEquals((Object)Errors.NONE, (Object)response.error());
        Uuid clientInstanceId = response.data().clientInstanceId();
        int subscriptionId = response.data().subscriptionId();
        ClientMetricsInstance instance = this.clientMetricsManager.clientInstance(response.data().clientInstanceId());
        Assertions.assertNotNull((Object)instance);
        Assertions.assertNotNull((Object)instance.expirationTimerTask());
        this.clientMetricsManager.updateSubscription("sub-1", ClientMetricsTestUtils.defaultProperties());
        Assertions.assertEquals((int)1, (int)this.clientMetricsManager.subscriptions().size());
        request = (GetTelemetrySubscriptionsRequest)new GetTelemetrySubscriptionsRequest.Builder(new GetTelemetrySubscriptionsRequestData().setClientInstanceId(clientInstanceId), true).build();
        response = this.clientMetricsManager.processGetTelemetrySubscriptionRequest(request, ClientMetricsTestUtils.requestContext());
        Assertions.assertEquals((Object)Errors.NONE, (Object)response.error());
        Assertions.assertTrue((subscriptionId != response.data().subscriptionId() ? 1 : 0) != 0);
        Assertions.assertNull((Object)instance.expirationTimerTask());
        instance = this.clientMetricsManager.clientInstance(response.data().clientInstanceId());
        Assertions.assertNotNull((Object)instance);
        Assertions.assertNotNull((Object)instance.expirationTimerTask());
        Assertions.assertEquals((int)1, (int)this.clientMetricsManager.expirationTimer().size());
        Assertions.assertEquals((int)12, (int)this.kafkaMetrics.metrics().size());
        Assertions.assertEquals((Object)1.0, (Object)this.getMetric("instance-count").metricValue());
    }

    private KafkaMetric getMetric(String name) throws Exception {
        return this.getMetric(this.kafkaMetrics, name);
    }

    private KafkaMetric getMetric(Metrics kafkaMetrics, String name) throws Exception {
        Optional<Map.Entry> metric = kafkaMetrics.metrics().entrySet().stream().filter(entry -> ((MetricName)entry.getKey()).name().equals(name)).findFirst();
        if (!metric.isPresent()) {
            throw new Exception(String.format("Could not find metric called %s", name));
        }
        return (KafkaMetric)metric.get().getValue();
    }
}

