/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients.producer;

import java.lang.management.ManagementFactory;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.LeastLoadedNode;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.clients.producer.internals.FutureRecordMetadata;
import org.apache.kafka.clients.producer.internals.ProduceRequestResult;
import org.apache.kafka.clients.producer.internals.ProducerInterceptors;
import org.apache.kafka.clients.producer.internals.ProducerMetadata;
import org.apache.kafka.clients.producer.internals.RecordAccumulator;
import org.apache.kafka.clients.producer.internals.Sender;
import org.apache.kafka.clients.producer.internals.TransactionManager;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.message.AddOffsetsToTxnResponseData;
import org.apache.kafka.common.message.EndTxnResponseData;
import org.apache.kafka.common.message.InitProducerIdResponseData;
import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Measurable;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.AddOffsetsToTxnResponse;
import org.apache.kafka.common.requests.EndTxnResponse;
import org.apache.kafka.common.requests.FindCoordinatorRequest;
import org.apache.kafka.common.requests.FindCoordinatorResponse;
import org.apache.kafka.common.requests.InitProducerIdResponse;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.requests.RequestTestUtils;
import org.apache.kafka.common.requests.TxnOffsetCommitRequest;
import org.apache.kafka.common.requests.TxnOffsetCommitResponse;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter;
import org.apache.kafka.common.telemetry.internals.ClientTelemetrySender;
import org.apache.kafka.common.utils.KafkaThread;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.test.MockMetricsReporter;
import org.apache.kafka.test.MockPartitioner;
import org.apache.kafka.test.MockProducerInterceptor;
import org.apache.kafka.test.MockSerializer;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentMatchers;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.mockito.internal.stubbing.answers.CallsRealMethods;
import org.mockito.stubbing.Answer;
import org.mockito.verification.VerificationMode;

public class KafkaProducerTest {
    private final String topic = "topic";
    private final Collection<Node> nodes = Collections.singletonList(NODE);
    private final Cluster emptyCluster = new Cluster(null, this.nodes, Collections.emptySet(), Collections.emptySet(), Collections.emptySet());
    private final Cluster onePartitionCluster = new Cluster("dummy", this.nodes, Collections.singletonList(new PartitionInfo("topic", 0, null, null, null)), Collections.emptySet(), Collections.emptySet());
    private final Cluster threePartitionCluster = new Cluster("dummy", this.nodes, Arrays.asList(new PartitionInfo("topic", 0, null, null, null), new PartitionInfo("topic", 1, null, null, null), new PartitionInfo("topic", 2, null, null, null)), Collections.emptySet(), Collections.emptySet());
    private TestInfo testInfo;
    private static final int DEFAULT_METADATA_IDLE_MS = 300000;
    private static final Node NODE = new Node(0, "host1", 1000);
    private static final List<String> CLIENT_IDS = new ArrayList<String>();

    private static <K, V> KafkaProducer<K, V> kafkaProducer(Map<String, Object> configs, Serializer<K> keySerializer, Serializer<V> valueSerializer, ProducerMetadata metadata, KafkaClient kafkaClient, ProducerInterceptors<K, V> interceptors, Time time) {
        return new KafkaProducer(new ProducerConfig(ProducerConfig.appendSerializerToConfig(configs, keySerializer, valueSerializer)), keySerializer, valueSerializer, metadata, kafkaClient, interceptors, time);
    }

    @BeforeEach
    public void setup(TestInfo testInfo) {
        this.testInfo = testInfo;
    }

    @Test
    public void testOverwriteAcksAndRetriesForIdempotentProducers() {
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "localhost:9999");
        props.setProperty("transactional.id", "transactionalId");
        props.setProperty("key.serializer", StringSerializer.class.getName());
        props.setProperty("value.serializer", StringSerializer.class.getName());
        ProducerConfig config = new ProducerConfig(props);
        Assertions.assertTrue((boolean)config.getBoolean("enable.idempotence"));
        Assertions.assertTrue((boolean)Stream.of("-1", "all").anyMatch(each -> each.equalsIgnoreCase(config.getString("acks"))));
        Assertions.assertEquals((int)config.getInt("retries"), (int)Integer.MAX_VALUE);
        Assertions.assertTrue((boolean)config.getString("client.id").equalsIgnoreCase("producer-" + config.getString("transactional.id")));
    }

    @Test
    public void testAcksAndIdempotenceForIdempotentProducers() {
        final Properties baseProps = new Properties(){
            {
                this.setProperty("bootstrap.servers", "localhost:9999");
                this.setProperty("key.serializer", StringSerializer.class.getName());
                this.setProperty("value.serializer", StringSerializer.class.getName());
            }
        };
        Properties validProps = new Properties(){
            {
                this.putAll((Map<?, ?>)baseProps);
                this.setProperty("acks", "0");
                this.setProperty("enable.idempotence", "false");
            }
        };
        ProducerConfig config = new ProducerConfig(validProps);
        Assertions.assertFalse((boolean)config.getBoolean("enable.idempotence"), (String)"idempotence should be overwritten");
        Assertions.assertEquals((Object)"0", (Object)config.getString("acks"), (String)"acks should be overwritten");
        Properties validProps2 = new Properties(){
            {
                this.putAll((Map<?, ?>)baseProps);
                this.setProperty("transactional.id", "transactionalId");
            }
        };
        config = new ProducerConfig(validProps2);
        Assertions.assertTrue((boolean)config.getBoolean("enable.idempotence"), (String)"idempotence should be set with the default value");
        Assertions.assertEquals((Object)"-1", (Object)config.getString("acks"), (String)"acks should be set with the default value");
        Properties validProps3 = new Properties(){
            {
                this.putAll((Map<?, ?>)baseProps);
                this.setProperty("acks", "all");
                this.setProperty("enable.idempotence", "false");
            }
        };
        config = new ProducerConfig(validProps3);
        Assertions.assertFalse((boolean)config.getBoolean("enable.idempotence"), (String)"idempotence should be overwritten");
        Assertions.assertEquals((Object)"-1", (Object)config.getString("acks"), (String)"acks should be overwritten");
        Properties validProps4 = new Properties(){
            {
                this.putAll((Map<?, ?>)baseProps);
                this.setProperty("acks", "0");
            }
        };
        config = new ProducerConfig(validProps4);
        Assertions.assertFalse((boolean)config.getBoolean("enable.idempotence"), (String)"idempotence should be disabled when acks not set to all and `enable.idempotence` config is unset.");
        Assertions.assertEquals((Object)"0", (Object)config.getString("acks"), (String)"acks should be set with overridden value");
        Properties validProps5 = new Properties(){
            {
                this.putAll((Map<?, ?>)baseProps);
                this.setProperty("acks", "1");
            }
        };
        config = new ProducerConfig(validProps5);
        Assertions.assertFalse((boolean)config.getBoolean("enable.idempotence"), (String)"idempotence should be disabled when acks not set to all and `enable.idempotence` config is unset.");
        Assertions.assertEquals((Object)"1", (Object)config.getString("acks"), (String)"acks should be set with overridden value");
        Properties invalidProps = new Properties(){
            {
                this.putAll((Map<?, ?>)baseProps);
                this.setProperty("acks", "0");
                this.setProperty("enable.idempotence", "false");
                this.setProperty("transactional.id", "transactionalId");
            }
        };
        Assertions.assertThrows(ConfigException.class, () -> new ProducerConfig(invalidProps), (String)"Cannot set a transactional.id without also enabling idempotence");
        Properties invalidProps2 = new Properties(){
            {
                this.putAll((Map<?, ?>)baseProps);
                this.setProperty("acks", "1");
                this.setProperty("enable.idempotence", "true");
            }
        };
        Assertions.assertThrows(ConfigException.class, () -> new ProducerConfig(invalidProps2), (String)"Must set acks to all in order to use the idempotent producer");
        Properties invalidProps3 = new Properties(){
            {
                this.putAll((Map<?, ?>)baseProps);
                this.setProperty("acks", "0");
                this.setProperty("transactional.id", "transactionalId");
            }
        };
        Assertions.assertThrows(ConfigException.class, () -> new ProducerConfig(invalidProps3), (String)"Must set acks to all when using the transactional producer.");
    }

    @Test
    public void testRetriesAndIdempotenceForIdempotentProducers() {
        final Properties baseProps = new Properties(){
            {
                this.setProperty("bootstrap.servers", "localhost:9999");
                this.setProperty("key.serializer", StringSerializer.class.getName());
                this.setProperty("value.serializer", StringSerializer.class.getName());
            }
        };
        Properties validProps = new Properties(){
            {
                this.putAll((Map<?, ?>)baseProps);
                this.setProperty("retries", "0");
                this.setProperty("enable.idempotence", "false");
            }
        };
        ProducerConfig config = new ProducerConfig(validProps);
        Assertions.assertFalse((boolean)config.getBoolean("enable.idempotence"), (String)"idempotence should be overwritten");
        Assertions.assertEquals((int)0, (Integer)config.getInt("retries"), (String)"retries should be overwritten");
        Properties validProps2 = new Properties(){
            {
                this.putAll((Map<?, ?>)baseProps);
                this.setProperty("retries", "0");
            }
        };
        config = new ProducerConfig(validProps2);
        Assertions.assertFalse((boolean)config.getBoolean("enable.idempotence"), (String)"idempotence should be disabled when retries set to 0 and `enable.idempotence` config is unset.");
        Assertions.assertEquals((int)0, (Integer)config.getInt("retries"), (String)"retries should be set with overridden value");
        Properties invalidProps = new Properties(){
            {
                this.putAll((Map<?, ?>)baseProps);
                this.setProperty("retries", "0");
                this.setProperty("enable.idempotence", "false");
                this.setProperty("transactional.id", "transactionalId");
            }
        };
        Assertions.assertThrows(ConfigException.class, () -> new ProducerConfig(invalidProps), (String)"Cannot set a transactional.id without also enabling idempotence");
        Properties invalidProps2 = new Properties(){
            {
                this.putAll((Map<?, ?>)baseProps);
                this.setProperty("retries", "0");
                this.setProperty("enable.idempotence", "true");
            }
        };
        Assertions.assertThrows(ConfigException.class, () -> new ProducerConfig(invalidProps2), (String)"Must set retries to non-zero when using the idempotent producer.");
        Properties invalidProps3 = new Properties(){
            {
                this.putAll((Map<?, ?>)baseProps);
                this.setProperty("retries", "0");
                this.setProperty("transactional.id", "transactionalId");
            }
        };
        Assertions.assertThrows(ConfigException.class, () -> new ProducerConfig(invalidProps3), (String)"Must set retries to non-zero when using the transactional producer.");
    }

    @Test
    public void testInflightRequestsAndIdempotenceForIdempotentProducers() {
        final Properties baseProps = new Properties(){
            {
                this.setProperty("bootstrap.servers", "localhost:9999");
                this.setProperty("key.serializer", StringSerializer.class.getName());
                this.setProperty("value.serializer", StringSerializer.class.getName());
            }
        };
        Properties validProps = new Properties(){
            {
                this.putAll((Map<?, ?>)baseProps);
                this.setProperty("max.in.flight.requests.per.connection", "6");
                this.setProperty("enable.idempotence", "false");
            }
        };
        ProducerConfig config = new ProducerConfig(validProps);
        Assertions.assertFalse((boolean)config.getBoolean("enable.idempotence"), (String)"idempotence should be overwritten");
        Assertions.assertEquals((int)6, (Integer)config.getInt("max.in.flight.requests.per.connection"), (String)"max.in.flight.requests.per.connection should be overwritten");
        Properties validProps2 = new Properties(){
            {
                this.putAll((Map<?, ?>)baseProps);
                this.setProperty("max.in.flight.requests.per.connection", "6");
            }
        };
        config = new ProducerConfig(validProps2);
        Assertions.assertFalse((boolean)config.getBoolean("enable.idempotence"), (String)"idempotence should be disabled when `max.in.flight.requests.per.connection` is greater than 5 and `enable.idempotence` config is unset.");
        Assertions.assertEquals((int)6, (Integer)config.getInt("max.in.flight.requests.per.connection"), (String)"`max.in.flight.requests.per.connection` should be set with overridden value");
        Properties invalidProps = new Properties(){
            {
                this.putAll((Map<?, ?>)baseProps);
                this.setProperty("max.in.flight.requests.per.connection", "5");
                this.setProperty("enable.idempotence", "false");
                this.setProperty("transactional.id", "transactionalId");
            }
        };
        Assertions.assertThrows(ConfigException.class, () -> new ProducerConfig(invalidProps), (String)"Cannot set a transactional.id without also enabling idempotence");
        Properties invalidProps2 = new Properties(){
            {
                this.putAll((Map<?, ?>)baseProps);
                this.setProperty("max.in.flight.requests.per.connection", "6");
                this.setProperty("enable.idempotence", "true");
            }
        };
        Assertions.assertThrows(ConfigException.class, () -> new ProducerConfig(invalidProps2), (String)"Must set max.in.flight.requests.per.connection to at most 5 when using the idempotent producer.");
        Properties invalidProps3 = new Properties(){
            {
                this.putAll((Map<?, ?>)baseProps);
                this.setProperty("max.in.flight.requests.per.connection", "6");
                this.setProperty("transactional.id", "transactionalId");
            }
        };
        Assertions.assertThrows(ConfigException.class, () -> new ProducerConfig(invalidProps3), (String)"Must set retries to non-zero when using the idempotent producer.");
    }

    @Test
    public void testMetricsReporterAutoGeneratedClientId() {
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "localhost:9999");
        props.setProperty("metric.reporters", MockMetricsReporter.class.getName());
        KafkaProducer producer = new KafkaProducer(props, (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
        Assertions.assertEquals((int)3, (int)producer.metrics.reporters().size());
        MockMetricsReporter mockMetricsReporter = (MockMetricsReporter)producer.metrics.reporters().stream().filter(reporter -> reporter instanceof MockMetricsReporter).findFirst().get();
        Assertions.assertEquals((Object)producer.getClientId(), (Object)mockMetricsReporter.clientId);
        producer.close();
    }

    @Test
    public void testDisableJmxAndClientTelemetryReporter() {
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "localhost:9999");
        props.setProperty("auto.include.jmx.reporter", "false");
        props.setProperty("enable.metrics.push", "false");
        KafkaProducer producer = new KafkaProducer(props, (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
        Assertions.assertTrue((boolean)producer.metrics.reporters().isEmpty());
        producer.close();
    }

    @Test
    public void testExplicitlyOnlyEnableJmxReporter() {
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "localhost:9999");
        props.setProperty("metric.reporters", "org.apache.kafka.common.metrics.JmxReporter");
        props.setProperty("enable.metrics.push", "false");
        KafkaProducer producer = new KafkaProducer(props, (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
        Assertions.assertEquals((int)1, (int)producer.metrics.reporters().size());
        Assertions.assertInstanceOf(JmxReporter.class, producer.metrics.reporters().get(0));
        producer.close();
    }

    @Test
    public void testExplicitlyOnlyEnableClientTelemetryReporter() {
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "localhost:9999");
        props.setProperty("auto.include.jmx.reporter", "false");
        KafkaProducer producer = new KafkaProducer(props, (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
        Assertions.assertEquals((int)1, (int)producer.metrics.reporters().size());
        Assertions.assertInstanceOf(ClientTelemetryReporter.class, producer.metrics.reporters().get(0));
        producer.close();
    }

    @Test
    public void testConstructorWithSerializers() {
        Properties producerProps = new Properties();
        producerProps.put("bootstrap.servers", "localhost:9000");
        new KafkaProducer(producerProps, (Serializer)new ByteArraySerializer(), (Serializer)new ByteArraySerializer()).close();
    }

    @Test
    public void testNoSerializerProvided() {
        Properties producerProps = new Properties();
        producerProps.put("bootstrap.servers", "localhost:9000");
        Assertions.assertThrows(ConfigException.class, () -> new KafkaProducer(producerProps));
        HashMap<String, String> configs = new HashMap<String, String>();
        configs.put("bootstrap.servers", "localhost:9999");
        Assertions.assertThrows(ConfigException.class, () -> new KafkaProducer(configs));
    }

    @Test
    public void testConstructorFailureCloseResource() {
        Properties props = new Properties();
        props.setProperty("client.id", "testConstructorClose");
        props.setProperty("bootstrap.servers", "some.invalid.hostname.foo.bar.local:9999");
        props.setProperty("metric.reporters", MockMetricsReporter.class.getName());
        int oldInitCount = MockMetricsReporter.INIT_COUNT.get();
        int oldCloseCount = MockMetricsReporter.CLOSE_COUNT.get();
        try (KafkaProducer ignored = new KafkaProducer(props, (Serializer)new ByteArraySerializer(), (Serializer)new ByteArraySerializer());){
            Assertions.fail((String)"should have caught an exception and returned");
        }
        catch (KafkaException e) {
            Assertions.assertEquals((int)(oldInitCount + 1), (int)MockMetricsReporter.INIT_COUNT.get());
            Assertions.assertEquals((int)(oldCloseCount + 1), (int)MockMetricsReporter.CLOSE_COUNT.get());
            Assertions.assertEquals((Object)"Failed to construct kafka producer", (Object)e.getMessage());
        }
    }

    @Test
    public void testConstructorWithNotStringKey() {
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "localhost:9999");
        props.put((Object)1, "not string key");
        ConfigException ce = (ConfigException)Assertions.assertThrows(ConfigException.class, () -> new KafkaProducer(props, (Serializer)new StringSerializer(), (Serializer)new StringSerializer()));
        Assertions.assertTrue((boolean)ce.getMessage().contains("not string key"), (String)("Unexpected exception message: " + ce.getMessage()));
    }

    @Test
    public void testSerializerClose() {
        HashMap<String, String> configs = new HashMap<String, String>();
        configs.put("client.id", "testConstructorClose");
        configs.put("bootstrap.servers", "localhost:9999");
        configs.put("metric.reporters", MockMetricsReporter.class.getName());
        configs.put("security.protocol", "PLAINTEXT");
        int oldInitCount = MockSerializer.INIT_COUNT.get();
        int oldCloseCount = MockSerializer.CLOSE_COUNT.get();
        KafkaProducer producer = new KafkaProducer(configs, (Serializer)new MockSerializer(), (Serializer)new MockSerializer());
        Assertions.assertEquals((int)(oldInitCount + 2), (int)MockSerializer.INIT_COUNT.get());
        Assertions.assertEquals((int)oldCloseCount, (int)MockSerializer.CLOSE_COUNT.get());
        producer.close();
        Assertions.assertEquals((int)(oldInitCount + 2), (int)MockSerializer.INIT_COUNT.get());
        Assertions.assertEquals((int)(oldCloseCount + 2), (int)MockSerializer.CLOSE_COUNT.get());
    }

    @Test
    public void testInterceptorConstructClose() {
        try {
            Properties props = new Properties();
            props.setProperty("bootstrap.servers", "localhost:9999");
            props.setProperty("interceptor.classes", MockProducerInterceptor.class.getName());
            props.setProperty("mock.interceptor.append", "something");
            KafkaProducer producer = new KafkaProducer(props, (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
            Assertions.assertEquals((int)1, (int)MockProducerInterceptor.INIT_COUNT.get());
            Assertions.assertEquals((int)0, (int)MockProducerInterceptor.CLOSE_COUNT.get());
            Assertions.assertNull((Object)MockProducerInterceptor.CLUSTER_META.get());
            producer.close();
            Assertions.assertEquals((int)1, (int)MockProducerInterceptor.INIT_COUNT.get());
            Assertions.assertEquals((int)1, (int)MockProducerInterceptor.CLOSE_COUNT.get());
        }
        finally {
            MockProducerInterceptor.resetCounters();
        }
    }

    @Test
    public void testInterceptorConstructorConfigurationWithExceptionShouldCloseRemainingInstances() {
        int targetInterceptor = 3;
        try {
            Properties props = new Properties();
            props.setProperty("bootstrap.servers", "localhost:9999");
            props.setProperty("interceptor.classes", MockProducerInterceptor.class.getName() + ", " + MockProducerInterceptor.class.getName() + ", " + MockProducerInterceptor.class.getName());
            props.setProperty("mock.interceptor.append", "something");
            MockProducerInterceptor.setThrowOnConfigExceptionThreshold(3);
            Assertions.assertThrows(KafkaException.class, () -> new KafkaProducer(props, (Serializer)new StringSerializer(), (Serializer)new StringSerializer()));
            Assertions.assertEquals((int)3, (int)MockProducerInterceptor.CONFIG_COUNT.get());
            Assertions.assertEquals((int)3, (int)MockProducerInterceptor.CLOSE_COUNT.get());
        }
        finally {
            MockProducerInterceptor.resetCounters();
        }
    }

    @Test
    public void testPartitionerClose() {
        try {
            Properties props = new Properties();
            props.setProperty("bootstrap.servers", "localhost:9999");
            MockPartitioner.resetCounters();
            props.setProperty("partitioner.class", MockPartitioner.class.getName());
            KafkaProducer producer = new KafkaProducer(props, (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
            Assertions.assertEquals((int)1, (int)MockPartitioner.INIT_COUNT.get());
            Assertions.assertEquals((int)0, (int)MockPartitioner.CLOSE_COUNT.get());
            producer.close();
            Assertions.assertEquals((int)1, (int)MockPartitioner.INIT_COUNT.get());
            Assertions.assertEquals((int)1, (int)MockPartitioner.CLOSE_COUNT.get());
        }
        finally {
            MockPartitioner.resetCounters();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void shouldCloseProperlyAndThrowIfInterrupted() throws Exception {
        HashMap<String, Object> configs = new HashMap<String, Object>();
        configs.put("bootstrap.servers", "localhost:9999");
        configs.put("partitioner.class", MockPartitioner.class.getName());
        configs.put("batch.size", "1");
        MockTime time = new MockTime();
        MetadataResponse initialUpdateResponse = RequestTestUtils.metadataUpdateWith(1, Collections.singletonMap("topic", 1));
        ProducerMetadata metadata = KafkaProducerTest.newMetadata(0L, 0L, Long.MAX_VALUE);
        MockClient client = new MockClient((Time)time, (Metadata)metadata);
        client.updateMetadata(initialUpdateResponse);
        KafkaProducer producer = KafkaProducerTest.kafkaProducer(configs, new StringSerializer(), new StringSerializer(), metadata, client, null, time);
        ExecutorService executor = Executors.newSingleThreadExecutor();
        AtomicReference closeException = new AtomicReference();
        try {
            Future<?> future = executor.submit(() -> {
                producer.send(new ProducerRecord("topic", (Object)"key", (Object)"value"));
                try {
                    producer.close();
                    Assertions.fail((String)"Close should block and throw.");
                }
                catch (Exception e) {
                    closeException.set(e);
                }
            });
            try {
                future.get(100L, TimeUnit.MILLISECONDS);
                Assertions.fail((String)"Close completed without waiting for send");
            }
            catch (TimeoutException timeoutException) {
                // empty catch block
            }
            client.waitForRequests(1, 1000L);
            Assertions.assertTrue((boolean)future.cancel(true), (String)"Close terminated prematurely");
            TestUtils.waitForCondition(() -> closeException.get() != null, "InterruptException did not occur within timeout.");
            Assertions.assertInstanceOf(InterruptException.class, closeException.get(), (String)("Expected exception not thrown " + closeException));
        }
        finally {
            executor.shutdownNow();
        }
    }

    @Test
    public void testOsDefaultSocketBufferSizes() {
        HashMap<String, Object> config = new HashMap<String, Object>();
        config.put("bootstrap.servers", "localhost:9999");
        config.put("send.buffer.bytes", -1);
        config.put("receive.buffer.bytes", -1);
        new KafkaProducer(config, (Serializer)new ByteArraySerializer(), (Serializer)new ByteArraySerializer()).close();
    }

    @Test
    public void testInvalidSocketSendBufferSize() {
        HashMap<String, Object> config = new HashMap<String, Object>();
        config.put("bootstrap.servers", "localhost:9999");
        config.put("send.buffer.bytes", -2);
        Assertions.assertThrows(KafkaException.class, () -> new KafkaProducer(config, (Serializer)new ByteArraySerializer(), (Serializer)new ByteArraySerializer()));
    }

    @Test
    public void testInvalidSocketReceiveBufferSize() {
        HashMap<String, Object> config = new HashMap<String, Object>();
        config.put("bootstrap.servers", "localhost:9999");
        config.put("receive.buffer.bytes", -2);
        Assertions.assertThrows(KafkaException.class, () -> new KafkaProducer(config, (Serializer)new ByteArraySerializer(), (Serializer)new ByteArraySerializer()));
    }

    private static KafkaProducer<String, String> producerWithOverrideNewSender(Map<String, Object> configs, ProducerMetadata metadata) {
        return KafkaProducerTest.producerWithOverrideNewSender(configs, metadata, Time.SYSTEM);
    }

    private static KafkaProducer<String, String> producerWithOverrideNewSender(Map<String, Object> configs, ProducerMetadata metadata, Time time) {
        MockClient mockClient = new MockClient(Time.SYSTEM, (Metadata)metadata){

            @Override
            public LeastLoadedNode leastLoadedNode(long now) {
                return new LeastLoadedNode(NODE, true);
            }
        };
        return new KafkaProducer<String, String>(new ProducerConfig(ProducerConfig.appendSerializerToConfig(configs, (Serializer)new StringSerializer(), (Serializer)new StringSerializer())), (Serializer)new StringSerializer(), (Serializer)new StringSerializer(), metadata, (KafkaClient)mockClient, null, time){

            Sender newSender(LogContext logContext, KafkaClient kafkaClient, ProducerMetadata metadata) {
                return super.newSender(logContext, kafkaClient, KafkaProducerTest.newMetadata(0L, 0L, 100000L));
            }
        };
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testMetadataFetch(boolean isIdempotenceEnabled) throws InterruptedException {
        HashMap<String, Object> configs = new HashMap<String, Object>();
        configs.put("bootstrap.servers", "localhost:9999");
        configs.put("enable.idempotence", isIdempotenceEnabled);
        ProducerMetadata metadata = (ProducerMetadata)Mockito.mock(ProducerMetadata.class);
        Mockito.when((Object)metadata.fetch()).thenReturn((Object)this.emptyCluster, (Object[])new Cluster[]{this.emptyCluster, this.emptyCluster, this.emptyCluster, this.onePartitionCluster});
        KafkaProducer<String, String> producer = KafkaProducerTest.producerWithOverrideNewSender(configs, metadata);
        ProducerRecord record = new ProducerRecord("topic", (Object)"value");
        producer.send(record);
        ((ProducerMetadata)Mockito.verify((Object)metadata, (VerificationMode)Mockito.times((int)4))).requestUpdateForTopic("topic");
        ((ProducerMetadata)Mockito.verify((Object)metadata, (VerificationMode)Mockito.times((int)4))).awaitUpdate(ArgumentMatchers.anyInt(), ArgumentMatchers.anyLong());
        ((ProducerMetadata)Mockito.verify((Object)metadata, (VerificationMode)Mockito.times((int)5))).fetch();
        producer.send(record, null);
        ((ProducerMetadata)Mockito.verify((Object)metadata, (VerificationMode)Mockito.times((int)4))).requestUpdateForTopic("topic");
        ((ProducerMetadata)Mockito.verify((Object)metadata, (VerificationMode)Mockito.times((int)4))).awaitUpdate(ArgumentMatchers.anyInt(), ArgumentMatchers.anyLong());
        ((ProducerMetadata)Mockito.verify((Object)metadata, (VerificationMode)Mockito.times((int)6))).fetch();
        producer.partitionsFor("topic");
        ((ProducerMetadata)Mockito.verify((Object)metadata, (VerificationMode)Mockito.times((int)4))).requestUpdateForTopic("topic");
        ((ProducerMetadata)Mockito.verify((Object)metadata, (VerificationMode)Mockito.times((int)4))).awaitUpdate(ArgumentMatchers.anyInt(), ArgumentMatchers.anyLong());
        ((ProducerMetadata)Mockito.verify((Object)metadata, (VerificationMode)Mockito.times((int)7))).fetch();
        producer.close(Duration.ofMillis(0L));
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testMetadataExpiry(boolean isIdempotenceEnabled) throws InterruptedException {
        HashMap<String, Object> configs = new HashMap<String, Object>();
        configs.put("bootstrap.servers", "localhost:9999");
        configs.put("enable.idempotence", isIdempotenceEnabled);
        ProducerMetadata metadata = (ProducerMetadata)Mockito.mock(ProducerMetadata.class);
        Mockito.when((Object)metadata.fetch()).thenReturn((Object)this.onePartitionCluster, (Object[])new Cluster[]{this.emptyCluster, this.onePartitionCluster});
        KafkaProducer<String, String> producer = KafkaProducerTest.producerWithOverrideNewSender(configs, metadata);
        ProducerRecord record = new ProducerRecord("topic", (Object)"value");
        producer.send(record);
        ((ProducerMetadata)Mockito.verify((Object)metadata, (VerificationMode)Mockito.times((int)0))).requestUpdateForTopic("topic");
        ((ProducerMetadata)Mockito.verify((Object)metadata, (VerificationMode)Mockito.times((int)0))).awaitUpdate(ArgumentMatchers.anyInt(), ArgumentMatchers.anyLong());
        ((ProducerMetadata)Mockito.verify((Object)metadata, (VerificationMode)Mockito.times((int)1))).fetch();
        producer.send(record, null);
        ((ProducerMetadata)Mockito.verify((Object)metadata, (VerificationMode)Mockito.times((int)1))).requestUpdateForTopic("topic");
        ((ProducerMetadata)Mockito.verify((Object)metadata, (VerificationMode)Mockito.times((int)1))).awaitUpdate(ArgumentMatchers.anyInt(), ArgumentMatchers.anyLong());
        ((ProducerMetadata)Mockito.verify((Object)metadata, (VerificationMode)Mockito.times((int)3))).fetch();
        producer.close(Duration.ofMillis(0L));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testMetadataTimeoutWithMissingTopic(boolean isIdempotenceEnabled) throws Exception {
        HashMap<String, Object> configs = new HashMap<String, Object>();
        configs.put("bootstrap.servers", "localhost:9999");
        configs.put("max.block.ms", 60000);
        configs.put("enable.idempotence", isIdempotenceEnabled);
        ProducerRecord record = new ProducerRecord("topic", Integer.valueOf(2), null, (Object)"value");
        ProducerMetadata metadata = (ProducerMetadata)Mockito.mock(ProducerMetadata.class);
        MockTime mockTime = new MockTime();
        AtomicInteger invocationCount = new AtomicInteger(0);
        Mockito.when((Object)metadata.fetch()).then(invocation -> {
            invocationCount.incrementAndGet();
            if (invocationCount.get() == 5) {
                mockTime.setCurrentTimeMs(mockTime.milliseconds() + 70000L);
            }
            return this.emptyCluster;
        });
        KafkaProducer<String, String> producer = KafkaProducerTest.producerWithOverrideNewSender(configs, metadata, mockTime);
        Future future = producer.send(record);
        ((ProducerMetadata)Mockito.verify((Object)metadata, (VerificationMode)Mockito.times((int)4))).requestUpdateForTopic("topic");
        ((ProducerMetadata)Mockito.verify((Object)metadata, (VerificationMode)Mockito.times((int)4))).awaitUpdate(ArgumentMatchers.anyInt(), ArgumentMatchers.anyLong());
        ((ProducerMetadata)Mockito.verify((Object)metadata, (VerificationMode)Mockito.times((int)5))).fetch();
        try {
            Assertions.assertInstanceOf(org.apache.kafka.common.errors.TimeoutException.class, (Object)((ExecutionException)Assertions.assertThrows(ExecutionException.class, future::get)).getCause());
        }
        finally {
            producer.close(Duration.ofMillis(0L));
        }
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testMetadataWithPartitionOutOfRange(boolean isIdempotenceEnabled) throws Exception {
        HashMap<String, Object> configs = new HashMap<String, Object>();
        configs.put("bootstrap.servers", "localhost:9999");
        configs.put("max.block.ms", 60000);
        configs.put("enable.idempotence", isIdempotenceEnabled);
        ProducerRecord record = new ProducerRecord("topic", Integer.valueOf(2), null, (Object)"value");
        ProducerMetadata metadata = (ProducerMetadata)Mockito.mock(ProducerMetadata.class);
        MockTime mockTime = new MockTime();
        Mockito.when((Object)metadata.fetch()).thenReturn((Object)this.onePartitionCluster, (Object[])new Cluster[]{this.onePartitionCluster, this.threePartitionCluster});
        KafkaProducer<String, String> producer = KafkaProducerTest.producerWithOverrideNewSender(configs, metadata, mockTime);
        producer.send(record);
        ((ProducerMetadata)Mockito.verify((Object)metadata, (VerificationMode)Mockito.times((int)2))).requestUpdateForTopic("topic");
        ((ProducerMetadata)Mockito.verify((Object)metadata, (VerificationMode)Mockito.times((int)2))).awaitUpdate(ArgumentMatchers.anyInt(), ArgumentMatchers.anyLong());
        ((ProducerMetadata)Mockito.verify((Object)metadata, (VerificationMode)Mockito.times((int)3))).fetch();
        producer.close(Duration.ofMillis(0L));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testMetadataTimeoutWithPartitionOutOfRange(boolean isIdempotenceEnabled) throws Exception {
        HashMap<String, Object> configs = new HashMap<String, Object>();
        configs.put("bootstrap.servers", "localhost:9999");
        configs.put("max.block.ms", 60000);
        configs.put("enable.idempotence", isIdempotenceEnabled);
        ProducerRecord record = new ProducerRecord("topic", Integer.valueOf(2), null, (Object)"value");
        ProducerMetadata metadata = (ProducerMetadata)Mockito.mock(ProducerMetadata.class);
        MockTime mockTime = new MockTime();
        AtomicInteger invocationCount = new AtomicInteger(0);
        Mockito.when((Object)metadata.fetch()).then(invocation -> {
            invocationCount.incrementAndGet();
            if (invocationCount.get() == 5) {
                mockTime.setCurrentTimeMs(mockTime.milliseconds() + 70000L);
            }
            return this.onePartitionCluster;
        });
        KafkaProducer<String, String> producer = KafkaProducerTest.producerWithOverrideNewSender(configs, metadata, mockTime);
        Future future = producer.send(record);
        ((ProducerMetadata)Mockito.verify((Object)metadata, (VerificationMode)Mockito.times((int)4))).requestUpdateForTopic("topic");
        ((ProducerMetadata)Mockito.verify((Object)metadata, (VerificationMode)Mockito.times((int)4))).awaitUpdate(ArgumentMatchers.anyInt(), ArgumentMatchers.anyLong());
        ((ProducerMetadata)Mockito.verify((Object)metadata, (VerificationMode)Mockito.times((int)5))).fetch();
        try {
            Assertions.assertInstanceOf(org.apache.kafka.common.errors.TimeoutException.class, (Object)((ExecutionException)Assertions.assertThrows(ExecutionException.class, future::get)).getCause());
        }
        finally {
            producer.close(Duration.ofMillis(0L));
        }
    }

    @Test
    public void testTopicRefreshInMetadata() throws InterruptedException {
        HashMap<String, Object> configs = new HashMap<String, Object>();
        configs.put("bootstrap.servers", "localhost:9999");
        configs.put("max.block.ms", "600000");
        configs.put("enable.idempotence", false);
        long refreshBackoffMs = 500L;
        long refreshBackoffMaxMs = 5000L;
        long metadataExpireMs = 60000L;
        long metadataIdleMs = 60000L;
        MockTime time = new MockTime();
        ProducerMetadata metadata = new ProducerMetadata(refreshBackoffMs, refreshBackoffMaxMs, metadataExpireMs, metadataIdleMs, new LogContext(), new ClusterResourceListeners(), (Time)time);
        String topic = "topic";
        try (KafkaProducer producer = KafkaProducerTest.kafkaProducer(configs, new StringSerializer(), new StringSerializer(), metadata, new MockClient((Time)time, (Metadata)metadata), null, time);){
            AtomicBoolean running = new AtomicBoolean(true);
            Thread t = new Thread(() -> {
                long startTimeMs = System.currentTimeMillis();
                while (running.get()) {
                    while (!metadata.updateRequested() && System.currentTimeMillis() - startTimeMs < 100L) {
                        Thread.yield();
                    }
                    MetadataResponse updateResponse = RequestTestUtils.metadataUpdateWith("kafka-cluster", 1, Collections.singletonMap("topic", Errors.UNKNOWN_TOPIC_OR_PARTITION), Collections.emptyMap());
                    metadata.updateWithCurrentRequestVersion(updateResponse, false, time.milliseconds());
                    time.sleep(60000L);
                }
            });
            t.start();
            Throwable throwable = Assertions.assertThrows(org.apache.kafka.common.errors.TimeoutException.class, () -> producer.partitionsFor("topic"));
            Assertions.assertInstanceOf(UnknownTopicOrPartitionException.class, (Object)throwable.getCause());
            running.set(false);
            t.join();
        }
    }

    @Test
    public void testTopicNotExistingInMetadata() throws InterruptedException {
        HashMap<String, Object> configs = new HashMap<String, Object>();
        configs.put("bootstrap.servers", "localhost:9999");
        configs.put("max.block.ms", "30000");
        long refreshBackoffMs = 500L;
        long refreshBackoffMaxMs = 5000L;
        long metadataExpireMs = 60000L;
        long metadataIdleMs = 60000L;
        MockTime time = new MockTime();
        ProducerMetadata metadata = new ProducerMetadata(refreshBackoffMs, refreshBackoffMaxMs, metadataExpireMs, metadataIdleMs, new LogContext(), new ClusterResourceListeners(), (Time)time);
        String topic = "topic";
        try (KafkaProducer producer = KafkaProducerTest.kafkaProducer(configs, new StringSerializer(), new StringSerializer(), metadata, new MockClient((Time)time, (Metadata)metadata), null, time);){
            Exchanger<Object> exchanger = new Exchanger<Object>();
            Thread t = new Thread(() -> {
                try {
                    MetadataResponse updateResponse = RequestTestUtils.metadataUpdateWith("kafka-cluster", 1, Collections.singletonMap("topic", Errors.UNKNOWN_TOPIC_OR_PARTITION), Collections.emptyMap());
                    metadata.updateWithCurrentRequestVersion(updateResponse, false, time.milliseconds());
                    exchanger.exchange(null);
                    while (!metadata.updateRequested()) {
                        Thread.sleep(100L);
                    }
                    time.sleep(30000L);
                }
                catch (Exception e) {
                    throw new RuntimeException(e);
                }
            });
            t.start();
            exchanger.exchange(null);
            Throwable throwable = Assertions.assertThrows(org.apache.kafka.common.errors.TimeoutException.class, () -> producer.partitionsFor("topic"));
            Assertions.assertInstanceOf(UnknownTopicOrPartitionException.class, (Object)throwable.getCause());
            t.join();
        }
    }

    @Test
    public void testTopicExpiryInMetadata() throws InterruptedException {
        HashMap<String, Object> configs = new HashMap<String, Object>();
        configs.put("bootstrap.servers", "localhost:9999");
        configs.put("max.block.ms", "30000");
        long refreshBackoffMs = 500L;
        long refreshBackoffMaxMs = 5000L;
        long metadataExpireMs = 60000L;
        long metadataIdleMs = 60000L;
        MockTime time = new MockTime();
        ProducerMetadata metadata = new ProducerMetadata(refreshBackoffMs, refreshBackoffMaxMs, metadataExpireMs, metadataIdleMs, new LogContext(), new ClusterResourceListeners(), (Time)time);
        String topic = "topic";
        try (KafkaProducer producer = KafkaProducerTest.kafkaProducer(configs, new StringSerializer(), new StringSerializer(), metadata, new MockClient((Time)time, (Metadata)metadata), null, time);){
            Exchanger<Object> exchanger = new Exchanger<Object>();
            Thread t = new Thread(() -> {
                try {
                    exchanger.exchange(null);
                    while (!metadata.updateRequested()) {
                        Thread.sleep(100L);
                    }
                    MetadataResponse updateResponse = RequestTestUtils.metadataUpdateWith(1, Collections.singletonMap("topic", 1));
                    metadata.updateWithCurrentRequestVersion(updateResponse, false, time.milliseconds());
                    exchanger.exchange(null);
                    time.sleep(120000L);
                    updateResponse = RequestTestUtils.metadataUpdateWith(1, Collections.singletonMap("topic", 1));
                    metadata.updateWithCurrentRequestVersion(updateResponse, false, time.milliseconds());
                    exchanger.exchange(null);
                    while (!metadata.updateRequested()) {
                        Thread.sleep(100L);
                    }
                    time.sleep(30000L);
                }
                catch (Exception e) {
                    throw new RuntimeException(e);
                }
            });
            t.start();
            exchanger.exchange(null);
            Assertions.assertNotNull((Object)producer.partitionsFor("topic"));
            exchanger.exchange(null);
            exchanger.exchange(null);
            Assertions.assertThrows(org.apache.kafka.common.errors.TimeoutException.class, () -> producer.partitionsFor("topic"));
            t.join();
        }
    }

    @Test
    public void testHeaders() {
        this.doTestHeaders(Serializer.class);
    }

    private <T extends Serializer<String>> void doTestHeaders(Class<T> serializerClassToMock) {
        HashMap<String, Object> configs = new HashMap<String, Object>();
        configs.put("bootstrap.servers", "localhost:9999");
        Serializer keySerializer = (Serializer)Mockito.mock(serializerClassToMock);
        Serializer valueSerializer = (Serializer)Mockito.mock(serializerClassToMock);
        long nowMs = Time.SYSTEM.milliseconds();
        String topic = "topic";
        ProducerMetadata metadata = KafkaProducerTest.newMetadata(0L, 0L, 90000L);
        metadata.add(topic, nowMs);
        MetadataResponse initialUpdateResponse = RequestTestUtils.metadataUpdateWith(1, Collections.singletonMap(topic, 1));
        metadata.updateWithCurrentRequestVersion(initialUpdateResponse, false, nowMs);
        KafkaProducer producer = KafkaProducerTest.kafkaProducer(configs, keySerializer, valueSerializer, metadata, null, null, Time.SYSTEM);
        Mockito.when((Object)keySerializer.serialize((String)ArgumentMatchers.any(), (Headers)ArgumentMatchers.any(), ArgumentMatchers.any())).then(invocation -> ((String)invocation.getArgument(2)).getBytes());
        Mockito.when((Object)valueSerializer.serialize((String)ArgumentMatchers.any(), (Headers)ArgumentMatchers.any(), ArgumentMatchers.any())).then(invocation -> ((String)invocation.getArgument(2)).getBytes());
        String value = "value";
        String key = "key";
        ProducerRecord record = new ProducerRecord(topic, (Object)key, (Object)value);
        record.headers().add((Header)new RecordHeader("test", "header2".getBytes()));
        producer.send(record, null);
        Assertions.assertThrows(IllegalStateException.class, () -> record.headers().add((Header)new RecordHeader("test", "test".getBytes())));
        Assertions.assertArrayEquals((byte[])record.headers().lastHeader("test").value(), (byte[])"header2".getBytes());
        ((Serializer)Mockito.verify((Object)valueSerializer)).serialize(topic, record.headers(), (Object)value);
        ((Serializer)Mockito.verify((Object)keySerializer)).serialize(topic, record.headers(), (Object)key);
        producer.close(Duration.ofMillis(0L));
    }

    @Test
    public void closeShouldBeIdempotent() {
        Properties producerProps = new Properties();
        producerProps.put("bootstrap.servers", "localhost:9000");
        KafkaProducer producer = new KafkaProducer(producerProps, (Serializer)new ByteArraySerializer(), (Serializer)new ByteArraySerializer());
        producer.close();
        producer.close();
    }

    @Test
    public void closeWithNegativeTimestampShouldThrow() {
        Properties producerProps = new Properties();
        producerProps.put("bootstrap.servers", "localhost:9000");
        try (KafkaProducer producer = new KafkaProducer(producerProps, (Serializer)new ByteArraySerializer(), (Serializer)new ByteArraySerializer());){
            Assertions.assertThrows(IllegalArgumentException.class, () -> KafkaProducerTest.lambda$closeWithNegativeTimestampShouldThrow$30((Producer)producer));
        }
    }

    @Test
    public void testFlushCompleteSendOfInflightBatches() {
        HashMap<String, Object> configs = new HashMap<String, Object>();
        configs.put("bootstrap.servers", "localhost:9000");
        configs.put("enable.idempotence", false);
        MockTime time = new MockTime(1L);
        MetadataResponse initialUpdateResponse = RequestTestUtils.metadataUpdateWith(1, Collections.singletonMap("topic", 1));
        ProducerMetadata metadata = KafkaProducerTest.newMetadata(0L, 0L, Long.MAX_VALUE);
        MockClient client = new MockClient((Time)time, (Metadata)metadata);
        client.updateMetadata(initialUpdateResponse);
        try (KafkaProducer producer = KafkaProducerTest.kafkaProducer(configs, new StringSerializer(), new StringSerializer(), metadata, client, null, time);){
            ArrayList<Future> futureResponses = new ArrayList<Future>();
            for (int i = 0; i < 50; ++i) {
                Future response = producer.send(new ProducerRecord("topic", (Object)("value" + i)));
                futureResponses.add(response);
            }
            futureResponses.forEach(res -> Assertions.assertFalse((boolean)res.isDone()));
            producer.flush();
            futureResponses.forEach(res -> Assertions.assertTrue((boolean)res.isDone()));
        }
    }

    private static Double getMetricValue(KafkaProducer<?, ?> producer, String name) {
        Metrics metrics = producer.metrics;
        KafkaMetric metric = metrics.metric(metrics.metricName(name, "producer-metrics"));
        return (Double)metric.metricValue();
    }

    @Test
    public void testFlushMeasureLatency() {
        HashMap<String, Object> configs = new HashMap<String, Object>();
        configs.put("bootstrap.servers", "localhost:9000");
        MockTime time = new MockTime(1L);
        MetadataResponse initialUpdateResponse = RequestTestUtils.metadataUpdateWith(1, Collections.singletonMap("topic", 1));
        ProducerMetadata metadata = KafkaProducerTest.newMetadata(0L, 0L, Long.MAX_VALUE);
        MockClient client = new MockClient((Time)time, (Metadata)metadata);
        client.updateMetadata(initialUpdateResponse);
        try (KafkaProducer producer = KafkaProducerTest.kafkaProducer(configs, new StringSerializer(), new StringSerializer(), metadata, client, null, time);){
            producer.flush();
            double first = KafkaProducerTest.getMetricValue(producer, "flush-time-ns-total");
            Assertions.assertTrue((first > 0.0 ? 1 : 0) != 0);
            producer.flush();
            Assertions.assertTrue((KafkaProducerTest.getMetricValue(producer, "flush-time-ns-total") > first ? 1 : 0) != 0);
        }
    }

    @Test
    public void testMetricConfigRecordingLevel() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9000");
        try (KafkaProducer producer = new KafkaProducer(props, (Serializer)new ByteArraySerializer(), (Serializer)new ByteArraySerializer());){
            Assertions.assertEquals((Object)Sensor.RecordingLevel.INFO, (Object)producer.metrics.config().recordLevel());
        }
        props.put("metrics.recording.level", "DEBUG");
        producer = new KafkaProducer(props, (Serializer)new ByteArraySerializer(), (Serializer)new ByteArraySerializer());
        var3_3 = null;
        try {
            Assertions.assertEquals((Object)Sensor.RecordingLevel.DEBUG, (Object)producer.metrics.config().recordLevel());
        }
        catch (Throwable throwable) {
            var3_3 = throwable;
            throw throwable;
        }
        finally {
            if (producer != null) {
                if (var3_3 != null) {
                    try {
                        producer.close();
                    }
                    catch (Throwable throwable) {
                        var3_3.addSuppressed(throwable);
                    }
                } else {
                    producer.close();
                }
            }
        }
    }

    @Test
    public void testInterceptorPartitionSetOnTooLargeRecord() {
        HashMap<String, Object> configs = new HashMap<String, Object>();
        configs.put("bootstrap.servers", "localhost:9999");
        configs.put("max.request.size", "1");
        String topic = "topic";
        ProducerRecord record = new ProducerRecord(topic, (Object)"value");
        long nowMs = Time.SYSTEM.milliseconds();
        ProducerMetadata metadata = KafkaProducerTest.newMetadata(0L, 0L, 90000L);
        metadata.add(topic, nowMs);
        MetadataResponse initialUpdateResponse = RequestTestUtils.metadataUpdateWith(1, Collections.singletonMap(topic, 1));
        metadata.updateWithCurrentRequestVersion(initialUpdateResponse, false, nowMs);
        ProducerInterceptors interceptors = (ProducerInterceptors)Mockito.mock(ProducerInterceptors.class);
        KafkaProducer producer = KafkaProducerTest.kafkaProducer(configs, new StringSerializer(), new StringSerializer(), metadata, null, interceptors, Time.SYSTEM);
        Mockito.when((Object)interceptors.onSend((ProducerRecord)ArgumentMatchers.any())).then(invocation -> invocation.getArgument(0));
        producer.send(record);
        ((ProducerInterceptors)Mockito.verify((Object)interceptors)).onSend(record);
        ((ProducerInterceptors)Mockito.verify((Object)interceptors)).onSendError((ProducerRecord)ArgumentMatchers.eq((Object)record), (TopicPartition)ArgumentMatchers.notNull(), (Exception)ArgumentMatchers.notNull());
        producer.close(Duration.ofMillis(0L));
    }

    @Test
    public void testPartitionsForWithNullTopic() {
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "localhost:9000");
        try (KafkaProducer producer = new KafkaProducer(props, (Serializer)new ByteArraySerializer(), (Serializer)new ByteArraySerializer());){
            Assertions.assertThrows(NullPointerException.class, () -> producer.partitionsFor(null));
        }
    }

    @Test
    public void testInitTransactionsResponseAfterTimeout() throws Exception {
        int maxBlockMs = 500;
        HashMap<String, Object> configs = new HashMap<String, Object>();
        configs.put("transactional.id", "bad-transaction");
        configs.put("max.block.ms", maxBlockMs);
        configs.put("bootstrap.servers", "localhost:9000");
        MockTime time = new MockTime();
        MetadataResponse initialUpdateResponse = RequestTestUtils.metadataUpdateWith(1, Collections.singletonMap("topic", 1));
        ProducerMetadata metadata = KafkaProducerTest.newMetadata(0L, 0L, Long.MAX_VALUE);
        metadata.updateWithCurrentRequestVersion(initialUpdateResponse, false, time.milliseconds());
        MockClient client = new MockClient((Time)time, (Metadata)metadata);
        ExecutorService executor = Executors.newFixedThreadPool(1);
        try (KafkaProducer producer = KafkaProducerTest.kafkaProducer(configs, new StringSerializer(), new StringSerializer(), metadata, client, null, time);){
            client.prepareResponse(request -> request instanceof FindCoordinatorRequest && ((FindCoordinatorRequest)request).data().keyType() == FindCoordinatorRequest.CoordinatorType.TRANSACTION.id(), (AbstractResponse)FindCoordinatorResponse.prepareResponse((Errors)Errors.NONE, (String)"bad-transaction", (Node)NODE));
            Future<?> future = executor.submit(() -> producer.initTransactions());
            TestUtils.waitForCondition(client::hasInFlightRequests, "Timed out while waiting for expected `InitProducerId` request to be sent");
            time.sleep(maxBlockMs);
            TestUtils.assertFutureThrows(future, org.apache.kafka.common.errors.TimeoutException.class);
            client.respond((AbstractResponse)this.initProducerIdResponse(1L, (short)5, Errors.NONE));
            Thread.sleep(1000L);
            producer.initTransactions();
        }
    }

    @Test
    public void testInitTransactionTimeout() {
        HashMap<String, Object> configs = new HashMap<String, Object>();
        configs.put("transactional.id", "bad-transaction");
        configs.put("max.block.ms", 500);
        configs.put("bootstrap.servers", "localhost:9000");
        MockTime time = new MockTime(1L);
        MetadataResponse initialUpdateResponse = RequestTestUtils.metadataUpdateWith(1, Collections.singletonMap("topic", 1));
        ProducerMetadata metadata = KafkaProducerTest.newMetadata(0L, 0L, Long.MAX_VALUE);
        metadata.updateWithCurrentRequestVersion(initialUpdateResponse, false, time.milliseconds());
        MockClient client = new MockClient((Time)time, (Metadata)metadata);
        try (KafkaProducer producer = KafkaProducerTest.kafkaProducer(configs, new StringSerializer(), new StringSerializer(), metadata, client, null, time);){
            client.prepareResponse(request -> request instanceof FindCoordinatorRequest && ((FindCoordinatorRequest)request).data().keyType() == FindCoordinatorRequest.CoordinatorType.TRANSACTION.id(), (AbstractResponse)FindCoordinatorResponse.prepareResponse((Errors)Errors.NONE, (String)"bad-transaction", (Node)NODE));
            Assertions.assertThrows(org.apache.kafka.common.errors.TimeoutException.class, () -> producer.initTransactions());
            client.prepareResponse(request -> request instanceof FindCoordinatorRequest && ((FindCoordinatorRequest)request).data().keyType() == FindCoordinatorRequest.CoordinatorType.TRANSACTION.id(), (AbstractResponse)FindCoordinatorResponse.prepareResponse((Errors)Errors.NONE, (String)"bad-transaction", (Node)NODE));
            client.prepareResponse((AbstractResponse)this.initProducerIdResponse(1L, (short)5, Errors.NONE));
            producer.initTransactions();
        }
    }

    @Test
    public void testInitTransactionWhileThrottled() {
        HashMap<String, Object> configs = new HashMap<String, Object>();
        configs.put("transactional.id", "some.id");
        configs.put("max.block.ms", 10000);
        configs.put("bootstrap.servers", "localhost:9000");
        MockTime time = new MockTime(1L);
        MetadataResponse initialUpdateResponse = RequestTestUtils.metadataUpdateWith(1, Collections.singletonMap("topic", 1));
        ProducerMetadata metadata = KafkaProducerTest.newMetadata(0L, 0L, Long.MAX_VALUE);
        MockClient client = new MockClient((Time)time, (Metadata)metadata);
        client.updateMetadata(initialUpdateResponse);
        Node node = (Node)metadata.fetch().nodes().get(0);
        client.throttle(node, 5000L);
        client.prepareResponse((AbstractResponse)FindCoordinatorResponse.prepareResponse((Errors)Errors.NONE, (String)"some.id", (Node)NODE));
        client.prepareResponse((AbstractResponse)this.initProducerIdResponse(1L, (short)5, Errors.NONE));
        try (KafkaProducer producer = KafkaProducerTest.kafkaProducer(configs, new StringSerializer(), new StringSerializer(), metadata, client, null, time);){
            producer.initTransactions();
        }
    }

    @Test
    public void testClusterAuthorizationFailure() throws Exception {
        int maxBlockMs = 500;
        HashMap<String, Object> configs = new HashMap<String, Object>();
        configs.put("max.block.ms", maxBlockMs);
        configs.put("bootstrap.servers", "localhost:9000");
        configs.put("enable.idempotence", true);
        configs.put("transactional.id", "some-txn");
        MockTime time = new MockTime(1L);
        MetadataResponse initialUpdateResponse = RequestTestUtils.metadataUpdateWith(1, Collections.singletonMap("topic", 1));
        ProducerMetadata metadata = KafkaProducerTest.newMetadata(500L, 5000L, Long.MAX_VALUE);
        MockClient client = new MockClient((Time)time, (Metadata)metadata);
        client.updateMetadata(initialUpdateResponse);
        client.prepareResponse((AbstractResponse)FindCoordinatorResponse.prepareResponse((Errors)Errors.NONE, (String)"some-txn", (Node)NODE));
        client.prepareResponse((AbstractResponse)this.initProducerIdResponse(1L, (short)5, Errors.CLUSTER_AUTHORIZATION_FAILED));
        KafkaProducer producer = KafkaProducerTest.kafkaProducer(configs, new StringSerializer(), new StringSerializer(), metadata, client, null, time);
        Assertions.assertThrows(ClusterAuthorizationException.class, () -> producer.initTransactions());
        client.prepareResponse((AbstractResponse)this.initProducerIdResponse(1L, (short)5, Errors.NONE));
        TestUtils.retryOnExceptionWithTimeout(1000L, 100L, () -> producer.initTransactions());
        producer.close();
    }

    @Test
    public void testAbortTransaction() {
        HashMap<String, Object> configs = new HashMap<String, Object>();
        configs.put("transactional.id", "some.id");
        configs.put("bootstrap.servers", "localhost:9000");
        MockTime time = new MockTime(1L);
        MetadataResponse initialUpdateResponse = RequestTestUtils.metadataUpdateWith(1, Collections.singletonMap("topic", 1));
        ProducerMetadata metadata = KafkaProducerTest.newMetadata(0L, 0L, Long.MAX_VALUE);
        MockClient client = new MockClient((Time)time, (Metadata)metadata);
        client.updateMetadata(initialUpdateResponse);
        client.prepareResponse((AbstractResponse)FindCoordinatorResponse.prepareResponse((Errors)Errors.NONE, (String)"some.id", (Node)NODE));
        client.prepareResponse((AbstractResponse)this.initProducerIdResponse(1L, (short)5, Errors.NONE));
        client.prepareResponse((AbstractResponse)this.endTxnResponse(Errors.NONE));
        try (KafkaProducer producer = KafkaProducerTest.kafkaProducer(configs, new StringSerializer(), new StringSerializer(), metadata, client, null, time);){
            producer.initTransactions();
            producer.beginTransaction();
            producer.abortTransaction();
        }
    }

    @Test
    public void testMeasureAbortTransactionDuration() {
        HashMap<String, Object> configs = new HashMap<String, Object>();
        configs.put("transactional.id", "some.id");
        configs.put("bootstrap.servers", "localhost:9000");
        MockTime time = new MockTime(1L);
        MetadataResponse initialUpdateResponse = RequestTestUtils.metadataUpdateWith(1, Collections.singletonMap("topic", 1));
        ProducerMetadata metadata = KafkaProducerTest.newMetadata(0L, 0L, Long.MAX_VALUE);
        MockClient client = new MockClient((Time)time, (Metadata)metadata);
        client.updateMetadata(initialUpdateResponse);
        client.prepareResponse((AbstractResponse)FindCoordinatorResponse.prepareResponse((Errors)Errors.NONE, (String)"some.id", (Node)NODE));
        client.prepareResponse((AbstractResponse)this.initProducerIdResponse(1L, (short)5, Errors.NONE));
        try (KafkaProducer producer = KafkaProducerTest.kafkaProducer(configs, new StringSerializer(), new StringSerializer(), metadata, client, null, time);){
            producer.initTransactions();
            client.prepareResponse((AbstractResponse)this.endTxnResponse(Errors.NONE));
            producer.beginTransaction();
            producer.abortTransaction();
            double first = KafkaProducerTest.getMetricValue(producer, "txn-abort-time-ns-total");
            Assertions.assertTrue((first > 0.0 ? 1 : 0) != 0);
            client.prepareResponse((AbstractResponse)this.endTxnResponse(Errors.NONE));
            producer.beginTransaction();
            producer.abortTransaction();
            Assertions.assertTrue((KafkaProducerTest.getMetricValue(producer, "txn-abort-time-ns-total") > first ? 1 : 0) != 0);
        }
    }

    @Test
    public void testCommitTransactionWithRecordTooLargeException() throws Exception {
        HashMap<String, Object> configs = new HashMap<String, Object>();
        configs.put("transactional.id", "some.id");
        configs.put("bootstrap.servers", "localhost:9000");
        configs.put("max.request.size", 1000);
        MockTime time = new MockTime(1L);
        MetadataResponse initialUpdateResponse = RequestTestUtils.metadataUpdateWith(1, Collections.singletonMap("topic", 1));
        ProducerMetadata metadata = (ProducerMetadata)Mockito.mock(ProducerMetadata.class);
        MockClient client = new MockClient((Time)time, (Metadata)metadata);
        client.updateMetadata(initialUpdateResponse);
        client.prepareResponse((AbstractResponse)FindCoordinatorResponse.prepareResponse((Errors)Errors.NONE, (String)"some.id", (Node)NODE));
        client.prepareResponse((AbstractResponse)this.initProducerIdResponse(1L, (short)5, Errors.NONE));
        Mockito.when((Object)metadata.fetch()).thenReturn((Object)this.onePartitionCluster);
        String largeString = IntStream.range(0, 1000).mapToObj(i -> "*").collect(Collectors.joining());
        ProducerRecord largeRecord = new ProducerRecord("topic", (Object)"large string", (Object)largeString);
        try (KafkaProducer producer = KafkaProducerTest.kafkaProducer(configs, new StringSerializer(), new StringSerializer(), metadata, client, null, time);){
            producer.initTransactions();
            client.prepareResponse((AbstractResponse)this.endTxnResponse(Errors.NONE));
            producer.beginTransaction();
            TestUtils.assertFutureError(producer.send(largeRecord), RecordTooLargeException.class);
            Assertions.assertThrows(KafkaException.class, () -> producer.commitTransaction());
        }
    }

    @Test
    public void testCommitTransactionWithMetadataTimeoutForMissingTopic() throws Exception {
        HashMap<String, Object> configs = new HashMap<String, Object>();
        configs.put("transactional.id", "some.id");
        configs.put("bootstrap.servers", "localhost:9999");
        configs.put("max.block.ms", 60000);
        ProducerRecord record = new ProducerRecord("topic", (Object)"value");
        ProducerMetadata metadata = (ProducerMetadata)Mockito.mock(ProducerMetadata.class);
        MockTime mockTime = new MockTime();
        MockClient client = new MockClient((Time)mockTime, (Metadata)metadata);
        client.prepareResponse((AbstractResponse)FindCoordinatorResponse.prepareResponse((Errors)Errors.NONE, (String)"some.id", (Node)NODE));
        client.prepareResponse((AbstractResponse)this.initProducerIdResponse(1L, (short)5, Errors.NONE));
        AtomicInteger invocationCount = new AtomicInteger(0);
        Mockito.when((Object)metadata.fetch()).then(invocation -> {
            invocationCount.incrementAndGet();
            if (invocationCount.get() > 5) {
                mockTime.setCurrentTimeMs(mockTime.milliseconds() + 70000L);
            }
            return this.emptyCluster;
        });
        try (KafkaProducer producer = KafkaProducerTest.kafkaProducer(configs, new StringSerializer(), new StringSerializer(), metadata, client, null, mockTime);){
            producer.initTransactions();
            producer.beginTransaction();
            TestUtils.assertFutureError(producer.send(record), org.apache.kafka.common.errors.TimeoutException.class);
            Assertions.assertThrows(KafkaException.class, () -> producer.commitTransaction());
        }
    }

    @Test
    public void testCommitTransactionWithMetadataTimeoutForPartitionOutOfRange() throws Exception {
        HashMap<String, Object> configs = new HashMap<String, Object>();
        configs.put("transactional.id", "some.id");
        configs.put("bootstrap.servers", "localhost:9999");
        configs.put("max.block.ms", 60000);
        ProducerRecord record = new ProducerRecord("topic", Integer.valueOf(2), null, (Object)"value");
        ProducerMetadata metadata = (ProducerMetadata)Mockito.mock(ProducerMetadata.class);
        MockTime mockTime = new MockTime();
        MockClient client = new MockClient((Time)mockTime, (Metadata)metadata);
        client.prepareResponse((AbstractResponse)FindCoordinatorResponse.prepareResponse((Errors)Errors.NONE, (String)"some.id", (Node)NODE));
        client.prepareResponse((AbstractResponse)this.initProducerIdResponse(1L, (short)5, Errors.NONE));
        AtomicInteger invocationCount = new AtomicInteger(0);
        Mockito.when((Object)metadata.fetch()).then(invocation -> {
            invocationCount.incrementAndGet();
            if (invocationCount.get() > 5) {
                mockTime.setCurrentTimeMs(mockTime.milliseconds() + 70000L);
            }
            return this.onePartitionCluster;
        });
        try (KafkaProducer producer = KafkaProducerTest.kafkaProducer(configs, new StringSerializer(), new StringSerializer(), metadata, client, null, mockTime);){
            producer.initTransactions();
            producer.beginTransaction();
            TestUtils.assertFutureError(producer.send(record), org.apache.kafka.common.errors.TimeoutException.class);
            Assertions.assertThrows(KafkaException.class, () -> producer.commitTransaction());
        }
    }

    @Test
    public void testCommitTransactionWithSendToInvalidTopic() throws Exception {
        HashMap<String, Object> configs = new HashMap<String, Object>();
        configs.put("transactional.id", "some.id");
        configs.put("bootstrap.servers", "localhost:9000");
        configs.put("max.block.ms", "15000");
        MockTime time = new MockTime();
        MetadataResponse initialUpdateResponse = RequestTestUtils.metadataUpdateWith(1, Collections.emptyMap());
        ProducerMetadata metadata = KafkaProducerTest.newMetadata(0L, 0L, Long.MAX_VALUE);
        metadata.updateWithCurrentRequestVersion(initialUpdateResponse, false, time.milliseconds());
        MockClient client = new MockClient((Time)time, (Metadata)metadata);
        client.prepareResponse((AbstractResponse)FindCoordinatorResponse.prepareResponse((Errors)Errors.NONE, (String)"some.id", (Node)NODE));
        client.prepareResponse((AbstractResponse)this.initProducerIdResponse(1L, (short)5, Errors.NONE));
        String invalidTopicName = "topic abc";
        ProducerRecord record = new ProducerRecord(invalidTopicName, (Object)"HelloKafka");
        ArrayList<MetadataResponse.TopicMetadata> topicMetadata = new ArrayList<MetadataResponse.TopicMetadata>();
        topicMetadata.add(new MetadataResponse.TopicMetadata(Errors.INVALID_TOPIC_EXCEPTION, invalidTopicName, false, Collections.emptyList()));
        MetadataResponse updateResponse = RequestTestUtils.metadataResponse(new ArrayList<Node>(initialUpdateResponse.brokers()), initialUpdateResponse.clusterId(), initialUpdateResponse.controller().id(), topicMetadata);
        client.prepareMetadataUpdate(updateResponse);
        try (KafkaProducer producer = KafkaProducerTest.kafkaProducer(configs, new StringSerializer(), new StringSerializer(), metadata, client, null, time);){
            producer.initTransactions();
            producer.beginTransaction();
            TestUtils.assertFutureError(producer.send(record), InvalidTopicException.class);
            Assertions.assertThrows(KafkaException.class, () -> producer.commitTransaction());
        }
    }

    @Test
    public void testSendTxnOffsetsWithGroupId() {
        HashMap<String, Object> configs = new HashMap<String, Object>();
        configs.put("transactional.id", "some.id");
        configs.put("max.block.ms", 10000);
        configs.put("bootstrap.servers", "localhost:9000");
        MockTime time = new MockTime(1L);
        MetadataResponse initialUpdateResponse = RequestTestUtils.metadataUpdateWith(1, Collections.singletonMap("topic", 1));
        ProducerMetadata metadata = KafkaProducerTest.newMetadata(0L, 0L, Long.MAX_VALUE);
        MockClient client = new MockClient((Time)time, (Metadata)metadata);
        client.updateMetadata(initialUpdateResponse);
        Node node = (Node)metadata.fetch().nodes().get(0);
        client.throttle(node, 5000L);
        client.prepareResponse((AbstractResponse)FindCoordinatorResponse.prepareResponse((Errors)Errors.NONE, (String)"some.id", (Node)NODE));
        client.prepareResponse((AbstractResponse)this.initProducerIdResponse(1L, (short)5, Errors.NONE));
        client.prepareResponse((AbstractResponse)this.addOffsetsToTxnResponse(Errors.NONE));
        client.prepareResponse((AbstractResponse)FindCoordinatorResponse.prepareResponse((Errors)Errors.NONE, (String)"some.id", (Node)NODE));
        String groupId = "group";
        client.prepareResponse(request -> ((TxnOffsetCommitRequest)request).data().groupId().equals(groupId), (AbstractResponse)this.txnOffsetsCommitResponse(Collections.singletonMap(new TopicPartition("topic", 0), Errors.NONE)));
        client.prepareResponse((AbstractResponse)this.endTxnResponse(Errors.NONE));
        try (KafkaProducer producer = KafkaProducerTest.kafkaProducer(configs, new StringSerializer(), new StringSerializer(), metadata, client, null, time);){
            producer.initTransactions();
            producer.beginTransaction();
            producer.sendOffsetsToTransaction(Collections.emptyMap(), new ConsumerGroupMetadata(groupId));
            producer.commitTransaction();
        }
    }

    private void assertDurationAtLeast(KafkaProducer<?, ?> producer, String name, double floor) {
        this.getAndAssertDurationAtLeast(producer, name, floor);
    }

    private double getAndAssertDurationAtLeast(KafkaProducer<?, ?> producer, String name, double floor) {
        double value = KafkaProducerTest.getMetricValue(producer, name);
        Assertions.assertTrue((value >= floor ? 1 : 0) != 0);
        return value;
    }

    @Test
    public void testMeasureTransactionDurations() {
        HashMap<String, Object> configs = new HashMap<String, Object>();
        configs.put("transactional.id", "some.id");
        configs.put("max.block.ms", 10000);
        configs.put("bootstrap.servers", "localhost:9000");
        Duration tick = Duration.ofSeconds(1L);
        MockTime time = new MockTime(tick.toMillis());
        MetadataResponse initialUpdateResponse = RequestTestUtils.metadataUpdateWith(1, Collections.singletonMap("topic", 1));
        ProducerMetadata metadata = KafkaProducerTest.newMetadata(0L, 0L, Long.MAX_VALUE);
        MockClient client = new MockClient((Time)time, (Metadata)metadata);
        client.updateMetadata(initialUpdateResponse);
        client.prepareResponse((AbstractResponse)FindCoordinatorResponse.prepareResponse((Errors)Errors.NONE, (String)"some.id", (Node)NODE));
        client.prepareResponse((AbstractResponse)this.initProducerIdResponse(1L, (short)5, Errors.NONE));
        try (KafkaProducer producer = KafkaProducerTest.kafkaProducer(configs, new StringSerializer(), new StringSerializer(), metadata, client, null, time);){
            producer.initTransactions();
            this.assertDurationAtLeast(producer, "txn-init-time-ns-total", tick.toNanos());
            client.prepareResponse((AbstractResponse)this.addOffsetsToTxnResponse(Errors.NONE));
            client.prepareResponse((AbstractResponse)FindCoordinatorResponse.prepareResponse((Errors)Errors.NONE, (String)"some.id", (Node)NODE));
            client.prepareResponse((AbstractResponse)this.txnOffsetsCommitResponse(Collections.singletonMap(new TopicPartition("topic", 0), Errors.NONE)));
            client.prepareResponse((AbstractResponse)this.endTxnResponse(Errors.NONE));
            producer.beginTransaction();
            double beginFirst = this.getAndAssertDurationAtLeast(producer, "txn-begin-time-ns-total", tick.toNanos());
            producer.sendOffsetsToTransaction(Collections.singletonMap(new TopicPartition("topic", 0), new OffsetAndMetadata(5L)), new ConsumerGroupMetadata("group"));
            double sendOffFirst = this.getAndAssertDurationAtLeast(producer, "txn-send-offsets-time-ns-total", tick.toNanos());
            producer.commitTransaction();
            double commitFirst = this.getAndAssertDurationAtLeast(producer, "txn-commit-time-ns-total", tick.toNanos());
            client.prepareResponse((AbstractResponse)this.addOffsetsToTxnResponse(Errors.NONE));
            client.prepareResponse((AbstractResponse)this.txnOffsetsCommitResponse(Collections.singletonMap(new TopicPartition("topic", 0), Errors.NONE)));
            client.prepareResponse((AbstractResponse)this.endTxnResponse(Errors.NONE));
            producer.beginTransaction();
            this.assertDurationAtLeast(producer, "txn-begin-time-ns-total", beginFirst + (double)tick.toNanos());
            producer.sendOffsetsToTransaction(Collections.singletonMap(new TopicPartition("topic", 0), new OffsetAndMetadata(10L)), new ConsumerGroupMetadata("group"));
            this.assertDurationAtLeast(producer, "txn-send-offsets-time-ns-total", sendOffFirst + (double)tick.toNanos());
            producer.commitTransaction();
            this.assertDurationAtLeast(producer, "txn-commit-time-ns-total", commitFirst + (double)tick.toNanos());
        }
    }

    @Test
    public void testSendTxnOffsetsWithGroupMetadata() {
        int maxVersion = 3;
        HashMap<String, Object> configs = new HashMap<String, Object>();
        configs.put("transactional.id", "some.id");
        configs.put("max.block.ms", 10000);
        configs.put("bootstrap.servers", "localhost:9000");
        MockTime time = new MockTime(1L);
        MetadataResponse initialUpdateResponse = RequestTestUtils.metadataUpdateWith(1, Collections.singletonMap("topic", 1));
        ProducerMetadata metadata = KafkaProducerTest.newMetadata(0L, 0L, Long.MAX_VALUE);
        MockClient client = new MockClient((Time)time, (Metadata)metadata);
        client.updateMetadata(initialUpdateResponse);
        client.setNodeApiVersions(NodeApiVersions.create((short)ApiKeys.TXN_OFFSET_COMMIT.id, (short)0, (short)3));
        Node node = (Node)metadata.fetch().nodes().get(0);
        client.throttle(node, 5000L);
        client.prepareResponse((AbstractResponse)FindCoordinatorResponse.prepareResponse((Errors)Errors.NONE, (String)"some.id", (Node)NODE));
        client.prepareResponse((AbstractResponse)this.initProducerIdResponse(1L, (short)5, Errors.NONE));
        client.prepareResponse((AbstractResponse)this.addOffsetsToTxnResponse(Errors.NONE));
        client.prepareResponse((AbstractResponse)FindCoordinatorResponse.prepareResponse((Errors)Errors.NONE, (String)"some.id", (Node)NODE));
        String groupId = "group";
        String memberId = "member";
        int generationId = 5;
        String groupInstanceId = "instance";
        client.prepareResponse(request -> {
            TxnOffsetCommitRequestData data = ((TxnOffsetCommitRequest)request).data();
            return data.groupId().equals(groupId) && data.memberId().equals(memberId) && data.generationId() == generationId && data.groupInstanceId().equals(groupInstanceId);
        }, (AbstractResponse)this.txnOffsetsCommitResponse(Collections.singletonMap(new TopicPartition("topic", 0), Errors.NONE)));
        client.prepareResponse((AbstractResponse)this.endTxnResponse(Errors.NONE));
        try (KafkaProducer producer = KafkaProducerTest.kafkaProducer(configs, new StringSerializer(), new StringSerializer(), metadata, client, null, time);){
            producer.initTransactions();
            producer.beginTransaction();
            ConsumerGroupMetadata groupMetadata = new ConsumerGroupMetadata(groupId, generationId, memberId, Optional.of(groupInstanceId));
            producer.sendOffsetsToTransaction(Collections.emptyMap(), groupMetadata);
            producer.commitTransaction();
        }
    }

    @Test
    public void testNullGroupMetadataInSendOffsets() {
        this.verifyInvalidGroupMetadata(null);
    }

    @Test
    public void testInvalidGenerationIdAndMemberIdCombinedInSendOffsets() {
        this.verifyInvalidGroupMetadata(new ConsumerGroupMetadata("group", 2, "", Optional.empty()));
    }

    @Test
    public void testClientInstanceId() {
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "localhost:9999");
        ClientTelemetryReporter clientTelemetryReporter = (ClientTelemetryReporter)Mockito.mock(ClientTelemetryReporter.class);
        clientTelemetryReporter.configure((Map)ArgumentMatchers.any());
        try (MockedStatic mockedCommonClientConfigs = Mockito.mockStatic(CommonClientConfigs.class, (Answer)new CallsRealMethods());){
            mockedCommonClientConfigs.when(() -> CommonClientConfigs.telemetryReporter((String)ArgumentMatchers.anyString(), (AbstractConfig)((AbstractConfig)ArgumentMatchers.any()))).thenReturn(Optional.of(clientTelemetryReporter));
            ClientTelemetrySender clientTelemetrySender = (ClientTelemetrySender)Mockito.mock(ClientTelemetrySender.class);
            Uuid expectedUuid = Uuid.randomUuid();
            Mockito.when((Object)clientTelemetryReporter.telemetrySender()).thenReturn((Object)clientTelemetrySender);
            Mockito.when((Object)clientTelemetrySender.clientInstanceId((Duration)ArgumentMatchers.any())).thenReturn(Optional.of(expectedUuid));
            try (KafkaProducer producer = new KafkaProducer(props, (Serializer)new StringSerializer(), (Serializer)new StringSerializer());){
                Uuid uuid = producer.clientInstanceId(Duration.ofMillis(0L));
                Assertions.assertEquals((Object)expectedUuid, (Object)uuid);
            }
        }
    }

    @Test
    public void testClientInstanceIdInvalidTimeout() {
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "localhost:9999");
        KafkaProducer producer = new KafkaProducer(props, (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
        Exception exception = (Exception)Assertions.assertThrows(IllegalArgumentException.class, () -> producer.clientInstanceId(Duration.ofMillis(-1L)));
        Assertions.assertEquals((Object)"The timeout cannot be negative.", (Object)exception.getMessage());
        producer.close();
    }

    @Test
    public void testClientInstanceIdNoTelemetryReporterRegistered() {
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "localhost:9999");
        props.setProperty("enable.metrics.push", "false");
        KafkaProducer producer = new KafkaProducer(props, (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
        Exception exception = (Exception)Assertions.assertThrows(IllegalStateException.class, () -> producer.clientInstanceId(Duration.ofMillis(0L)));
        Assertions.assertEquals((Object)"Telemetry is not enabled. Set config `enable.metrics.push` to `true`.", (Object)exception.getMessage());
        producer.close();
    }

    private void verifyInvalidGroupMetadata(ConsumerGroupMetadata groupMetadata) {
        HashMap<String, Object> configs = new HashMap<String, Object>();
        configs.put("transactional.id", "some.id");
        configs.put("max.block.ms", 10000);
        configs.put("bootstrap.servers", "localhost:9000");
        MockTime time = new MockTime(1L);
        MetadataResponse initialUpdateResponse = RequestTestUtils.metadataUpdateWith(1, Collections.singletonMap("topic", 1));
        ProducerMetadata metadata = KafkaProducerTest.newMetadata(0L, 0L, Long.MAX_VALUE);
        MockClient client = new MockClient((Time)time, (Metadata)metadata);
        client.updateMetadata(initialUpdateResponse);
        Node node = (Node)metadata.fetch().nodes().get(0);
        client.throttle(node, 5000L);
        client.prepareResponse((AbstractResponse)FindCoordinatorResponse.prepareResponse((Errors)Errors.NONE, (String)"some.id", (Node)NODE));
        client.prepareResponse((AbstractResponse)this.initProducerIdResponse(1L, (short)5, Errors.NONE));
        try (KafkaProducer producer = KafkaProducerTest.kafkaProducer(configs, new StringSerializer(), new StringSerializer(), metadata, client, null, time);){
            producer.initTransactions();
            producer.beginTransaction();
            Assertions.assertThrows(IllegalArgumentException.class, () -> producer.sendOffsetsToTransaction(Collections.emptyMap(), groupMetadata));
        }
    }

    private InitProducerIdResponse initProducerIdResponse(long producerId, short producerEpoch, Errors error) {
        InitProducerIdResponseData responseData = new InitProducerIdResponseData().setErrorCode(error.code()).setProducerEpoch(producerEpoch).setProducerId(producerId).setThrottleTimeMs(0);
        return new InitProducerIdResponse(responseData);
    }

    private AddOffsetsToTxnResponse addOffsetsToTxnResponse(Errors error) {
        return new AddOffsetsToTxnResponse(new AddOffsetsToTxnResponseData().setErrorCode(error.code()).setThrottleTimeMs(10));
    }

    private TxnOffsetCommitResponse txnOffsetsCommitResponse(Map<TopicPartition, Errors> errorMap) {
        return new TxnOffsetCommitResponse(10, errorMap);
    }

    private EndTxnResponse endTxnResponse(Errors error) {
        return new EndTxnResponse(new EndTxnResponseData().setErrorCode(error.code()).setThrottleTimeMs(0));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testOnlyCanExecuteCloseAfterInitTransactionsTimeout() {
        HashMap<String, Object> configs = new HashMap<String, Object>();
        configs.put("transactional.id", "bad-transaction");
        configs.put("max.block.ms", 5);
        configs.put("bootstrap.servers", "localhost:9000");
        MockTime time = new MockTime();
        MetadataResponse initialUpdateResponse = RequestTestUtils.metadataUpdateWith(1, Collections.singletonMap("topic", 1));
        ProducerMetadata metadata = KafkaProducerTest.newMetadata(0L, 0L, Long.MAX_VALUE);
        metadata.updateWithCurrentRequestVersion(initialUpdateResponse, false, time.milliseconds());
        MockClient client = new MockClient((Time)time, (Metadata)metadata);
        KafkaProducer producer = KafkaProducerTest.kafkaProducer(configs, new StringSerializer(), new StringSerializer(), metadata, client, null, time);
        Assertions.assertThrows(org.apache.kafka.common.errors.TimeoutException.class, () -> producer.initTransactions());
        try {
            Assertions.assertThrows(IllegalStateException.class, () -> producer.beginTransaction());
        }
        finally {
            producer.close(Duration.ofMillis(0L));
        }
    }

    @Test
    public void testSendToInvalidTopic() throws Exception {
        HashMap<String, Object> configs = new HashMap<String, Object>();
        configs.put("bootstrap.servers", "localhost:9000");
        configs.put("max.block.ms", "15000");
        MockTime time = new MockTime();
        MetadataResponse initialUpdateResponse = RequestTestUtils.metadataUpdateWith(1, Collections.emptyMap());
        ProducerMetadata metadata = KafkaProducerTest.newMetadata(0L, 0L, Long.MAX_VALUE);
        metadata.updateWithCurrentRequestVersion(initialUpdateResponse, false, time.milliseconds());
        MockClient client = new MockClient((Time)time, (Metadata)metadata);
        KafkaProducer producer = KafkaProducerTest.kafkaProducer(configs, new StringSerializer(), new StringSerializer(), metadata, client, null, time);
        String invalidTopicName = "topic abc";
        ProducerRecord record = new ProducerRecord(invalidTopicName, (Object)"HelloKafka");
        ArrayList<MetadataResponse.TopicMetadata> topicMetadata = new ArrayList<MetadataResponse.TopicMetadata>();
        topicMetadata.add(new MetadataResponse.TopicMetadata(Errors.INVALID_TOPIC_EXCEPTION, invalidTopicName, false, Collections.emptyList()));
        MetadataResponse updateResponse = RequestTestUtils.metadataResponse(new ArrayList<Node>(initialUpdateResponse.brokers()), initialUpdateResponse.clusterId(), initialUpdateResponse.controller().id(), topicMetadata);
        client.prepareMetadataUpdate(updateResponse);
        Future future = producer.send(record);
        Assertions.assertEquals(Collections.singleton(invalidTopicName), (Object)metadata.fetch().invalidTopics(), (String)"Cluster has incorrect invalid topic list.");
        TestUtils.assertFutureError(future, InvalidTopicException.class);
        producer.close(Duration.ofMillis(0L));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testCloseWhenWaitingForMetadataUpdate() throws InterruptedException {
        HashMap<String, Object> configs = new HashMap<String, Object>();
        configs.put("max.block.ms", Long.MAX_VALUE);
        configs.put("bootstrap.servers", "localhost:9000");
        String topicName = "test";
        Time time = Time.SYSTEM;
        MetadataResponse initialUpdateResponse = RequestTestUtils.metadataUpdateWith(1, Collections.emptyMap());
        ProducerMetadata metadata = new ProducerMetadata(0L, 0L, Long.MAX_VALUE, Long.MAX_VALUE, new LogContext(), new ClusterResourceListeners(), time);
        metadata.updateWithCurrentRequestVersion(initialUpdateResponse, false, time.milliseconds());
        MockClient client = new MockClient(time, (Metadata)metadata);
        KafkaProducer producer = KafkaProducerTest.kafkaProducer(configs, new StringSerializer(), new StringSerializer(), metadata, client, null, time);
        ExecutorService executor = Executors.newSingleThreadExecutor();
        AtomicReference sendException = new AtomicReference();
        try {
            executor.submit(() -> {
                try {
                    producer.send(new ProducerRecord(topicName, (Object)"key", (Object)"value"));
                    Assertions.fail();
                }
                catch (Exception e) {
                    sendException.set(e);
                }
            });
            TestUtils.waitForCondition(() -> metadata.containsTopic(topicName), "Timeout when waiting for topic to be added to metadata");
            producer.close(Duration.ofMillis(0L));
            TestUtils.waitForCondition(() -> sendException.get() != null, "No producer exception within timeout");
            Assertions.assertEquals(KafkaException.class, ((Exception)sendException.get()).getClass());
        }
        finally {
            executor.shutdownNow();
        }
    }

    @Test
    public void testTransactionalMethodThrowsWhenSenderClosed() {
        HashMap<String, Object> configs = new HashMap<String, Object>();
        configs.put("bootstrap.servers", "localhost:9000");
        configs.put("transactional.id", "this-is-a-transactional-id");
        MockTime time = new MockTime();
        MetadataResponse initialUpdateResponse = RequestTestUtils.metadataUpdateWith(1, Collections.emptyMap());
        ProducerMetadata metadata = KafkaProducerTest.newMetadata(0L, 0L, Long.MAX_VALUE);
        metadata.updateWithCurrentRequestVersion(initialUpdateResponse, false, time.milliseconds());
        MockClient client = new MockClient((Time)time, (Metadata)metadata);
        KafkaProducer producer = KafkaProducerTest.kafkaProducer(configs, new StringSerializer(), new StringSerializer(), metadata, client, null, time);
        producer.close();
        Assertions.assertThrows(IllegalStateException.class, () -> producer.initTransactions());
    }

    @Test
    public void testCloseIsForcedOnPendingFindCoordinator() throws InterruptedException {
        HashMap<String, Object> configs = new HashMap<String, Object>();
        configs.put("bootstrap.servers", "localhost:9000");
        configs.put("transactional.id", "this-is-a-transactional-id");
        MockTime time = new MockTime();
        MetadataResponse initialUpdateResponse = RequestTestUtils.metadataUpdateWith(1, Collections.singletonMap("testTopic", 1));
        ProducerMetadata metadata = KafkaProducerTest.newMetadata(0L, 0L, Long.MAX_VALUE);
        metadata.updateWithCurrentRequestVersion(initialUpdateResponse, false, time.milliseconds());
        MockClient client = new MockClient((Time)time, (Metadata)metadata);
        KafkaProducer producer = KafkaProducerTest.kafkaProducer(configs, new StringSerializer(), new StringSerializer(), metadata, client, null, time);
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        CountDownLatch assertionDoneLatch = new CountDownLatch(1);
        executorService.submit(() -> {
            Assertions.assertThrows(KafkaException.class, () -> ((Producer)producer).initTransactions());
            assertionDoneLatch.countDown();
        });
        client.waitForRequests(1, 2000L);
        producer.close(Duration.ofMillis(1000L));
        assertionDoneLatch.await(5000L, TimeUnit.MILLISECONDS);
    }

    @Test
    public void testCloseIsForcedOnPendingInitProducerId() throws InterruptedException {
        HashMap<String, Object> configs = new HashMap<String, Object>();
        configs.put("bootstrap.servers", "localhost:9000");
        configs.put("transactional.id", "this-is-a-transactional-id");
        MockTime time = new MockTime();
        MetadataResponse initialUpdateResponse = RequestTestUtils.metadataUpdateWith(1, Collections.singletonMap("testTopic", 1));
        ProducerMetadata metadata = KafkaProducerTest.newMetadata(0L, 0L, Long.MAX_VALUE);
        metadata.updateWithCurrentRequestVersion(initialUpdateResponse, false, time.milliseconds());
        MockClient client = new MockClient((Time)time, (Metadata)metadata);
        KafkaProducer producer = KafkaProducerTest.kafkaProducer(configs, new StringSerializer(), new StringSerializer(), metadata, client, null, time);
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        CountDownLatch assertionDoneLatch = new CountDownLatch(1);
        client.prepareResponse((AbstractResponse)FindCoordinatorResponse.prepareResponse((Errors)Errors.NONE, (String)"this-is-a-transactional-id", (Node)NODE));
        executorService.submit(() -> {
            Assertions.assertThrows(KafkaException.class, () -> ((Producer)producer).initTransactions());
            assertionDoneLatch.countDown();
        });
        client.waitForRequests(1, 2000L);
        producer.close(Duration.ofMillis(1000L));
        assertionDoneLatch.await(5000L, TimeUnit.MILLISECONDS);
    }

    @Test
    public void testCloseIsForcedOnPendingAddOffsetRequest() throws InterruptedException {
        HashMap<String, Object> configs = new HashMap<String, Object>();
        configs.put("bootstrap.servers", "localhost:9000");
        configs.put("transactional.id", "this-is-a-transactional-id");
        MockTime time = new MockTime();
        MetadataResponse initialUpdateResponse = RequestTestUtils.metadataUpdateWith(1, Collections.singletonMap("testTopic", 1));
        ProducerMetadata metadata = KafkaProducerTest.newMetadata(0L, 0L, Long.MAX_VALUE);
        metadata.updateWithCurrentRequestVersion(initialUpdateResponse, false, time.milliseconds());
        MockClient client = new MockClient((Time)time, (Metadata)metadata);
        KafkaProducer producer = KafkaProducerTest.kafkaProducer(configs, new StringSerializer(), new StringSerializer(), metadata, client, null, time);
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        CountDownLatch assertionDoneLatch = new CountDownLatch(1);
        client.prepareResponse((AbstractResponse)FindCoordinatorResponse.prepareResponse((Errors)Errors.NONE, (String)"this-is-a-transactional-id", (Node)NODE));
        executorService.submit(() -> {
            Assertions.assertThrows(KafkaException.class, () -> ((Producer)producer).initTransactions());
            assertionDoneLatch.countDown();
        });
        client.waitForRequests(1, 2000L);
        producer.close(Duration.ofMillis(1000L));
        assertionDoneLatch.await(5000L, TimeUnit.MILLISECONDS);
    }

    @Test
    public void testProducerJmxPrefix() throws Exception {
        HashMap<String, String> props = new HashMap<String, String>();
        props.put("bootstrap.servers", "localhost:9999");
        props.put("client.id", "client-1");
        KafkaProducer producer = new KafkaProducer(props, (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
        MBeanServer server = ManagementFactory.getPlatformMBeanServer();
        MetricName testMetricName = producer.metrics.metricName("test-metric", "grp1", "test metric");
        producer.metrics.addMetric(testMetricName, (Measurable)new Avg());
        Assertions.assertNotNull((Object)server.getObjectInstance(new ObjectName("kafka.producer:type=grp1,client-id=client-1")));
        producer.close();
    }

    private static ProducerMetadata newMetadata(long refreshBackoffMs, long refreshBackoffMaxMs, long expirationMs) {
        return new ProducerMetadata(refreshBackoffMs, refreshBackoffMaxMs, expirationMs, 300000L, new LogContext(), new ClusterResourceListeners(), Time.SYSTEM);
    }

    @Test
    public void configurableObjectsShouldSeeGeneratedClientId() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9999");
        props.put("key.serializer", SerializerForClientId.class.getName());
        props.put("value.serializer", SerializerForClientId.class.getName());
        props.put("partitioner.class", PartitionerForClientId.class.getName());
        props.put("interceptor.classes", ProducerInterceptorForClientId.class.getName());
        KafkaProducer producer = new KafkaProducer(props);
        Assertions.assertNotNull((Object)producer.getClientId());
        Assertions.assertNotEquals((int)0, (int)producer.getClientId().length());
        Assertions.assertEquals((int)4, (int)CLIENT_IDS.size());
        CLIENT_IDS.forEach(id -> Assertions.assertEquals((Object)id, (Object)producer.getClientId()));
        producer.close();
    }

    @Test
    public void testUnusedConfigs() {
        HashMap<String, String> props = new HashMap<String, String>();
        props.put("bootstrap.servers", "localhost:9999");
        props.put("ssl.protocol", "TLS");
        ProducerConfig config = new ProducerConfig(ProducerConfig.appendSerializerToConfig(props, (Serializer)new StringSerializer(), (Serializer)new StringSerializer()));
        Assertions.assertTrue((boolean)config.unused().contains("ssl.protocol"));
        try (KafkaProducer producer = new KafkaProducer(config, null, null, null, null, null, Time.SYSTEM);){
            Assertions.assertTrue((boolean)config.unused().contains("ssl.protocol"));
        }
    }

    @Test
    public void testNullTopicName() {
        Assertions.assertThrows(IllegalArgumentException.class, () -> new ProducerRecord(null, Integer.valueOf(1), (Object)"key".getBytes(StandardCharsets.UTF_8), (Object)"value".getBytes(StandardCharsets.UTF_8)));
    }

    @Test
    public void testCallbackAndInterceptorHandleError() {
        HashMap<String, Object> configs = new HashMap<String, Object>();
        configs.put("bootstrap.servers", "localhost:9000");
        configs.put("max.block.ms", "1000");
        configs.put("interceptor.classes", MockProducerInterceptor.class.getName());
        configs.put("mock.interceptor.append", "something");
        MockTime time = new MockTime();
        ProducerMetadata producerMetadata = KafkaProducerTest.newMetadata(0L, 0L, Long.MAX_VALUE);
        MockClient client = new MockClient((Time)time, (Metadata)producerMetadata);
        String invalidTopicName = "topic abc";
        ProducerInterceptors producerInterceptors = new ProducerInterceptors(Collections.singletonList(new MockProducerInterceptor()));
        try (KafkaProducer producer = KafkaProducerTest.kafkaProducer(configs, new StringSerializer(), new StringSerializer(), producerMetadata, client, producerInterceptors, time);){
            ProducerRecord record = new ProducerRecord(invalidTopicName, (Object)"HelloKafka");
            Callback callBack = (recordMetadata, exception) -> {
                Assertions.assertNotNull((Object)exception);
                Assertions.assertNotNull((Object)recordMetadata);
                Assertions.assertNotNull((Object)recordMetadata.topic(), (String)"Topic name should be valid even on send failure");
                Assertions.assertEquals((Object)invalidTopicName, (Object)recordMetadata.topic());
                Assertions.assertFalse((boolean)recordMetadata.hasOffset());
                Assertions.assertEquals((long)-1L, (long)recordMetadata.offset());
                Assertions.assertFalse((boolean)recordMetadata.hasTimestamp());
                Assertions.assertEquals((long)-1L, (long)recordMetadata.timestamp());
                Assertions.assertEquals((int)-1, (int)recordMetadata.serializedKeySize());
                Assertions.assertEquals((int)-1, (int)recordMetadata.serializedValueSize());
                Assertions.assertEquals((int)-1, (int)recordMetadata.partition());
            };
            producer.send(record, callBack);
            Assertions.assertEquals((int)1, (int)MockProducerInterceptor.ON_ACKNOWLEDGEMENT_COUNT.intValue());
        }
    }

    @Test
    public void negativePartitionShouldThrow() {
        HashMap<String, Object> configs = new HashMap<String, Object>();
        configs.put("bootstrap.servers", "localhost:9000");
        configs.put("partitioner.class", BuggyPartitioner.class.getName());
        MockTime time = new MockTime(1L);
        MetadataResponse initialUpdateResponse = RequestTestUtils.metadataUpdateWith(1, Collections.singletonMap("topic", 1));
        ProducerMetadata metadata = KafkaProducerTest.newMetadata(0L, 0L, Long.MAX_VALUE);
        MockClient client = new MockClient((Time)time, (Metadata)metadata);
        client.updateMetadata(initialUpdateResponse);
        try (KafkaProducer producer = KafkaProducerTest.kafkaProducer(configs, new StringSerializer(), new StringSerializer(), metadata, client, null, time);){
            Assertions.assertThrows(IllegalArgumentException.class, () -> producer.send(new ProducerRecord("topic", (Object)"key", (Object)"value")));
        }
    }

    @Test
    public void testPartitionAddedToTransaction() throws Exception {
        StringSerializer serializer = new StringSerializer();
        KafkaProducerTestContext ctx = new KafkaProducerTestContext(this.testInfo, serializer);
        String topic = "foo";
        TopicPartition topicPartition = new TopicPartition(topic, 0);
        Cluster cluster = TestUtils.singletonCluster(topic, 1);
        Mockito.when((Object)ctx.sender.isRunning()).thenReturn((Object)true);
        Mockito.when((Object)ctx.metadata.fetch()).thenReturn((Object)cluster);
        long timestamp = ctx.time.milliseconds();
        ProducerRecord record = new ProducerRecord(topic, null, Long.valueOf(timestamp), (Object)"key", (Object)"value");
        FutureRecordMetadata future = this.expectAppend(ctx, record, topicPartition, cluster);
        try (KafkaProducer producer = ctx.newKafkaProducer();){
            Assertions.assertEquals((Object)future, (Object)producer.send(record));
            Assertions.assertFalse((boolean)future.isDone());
            ((TransactionManager)Mockito.verify((Object)ctx.transactionManager)).maybeAddPartition(topicPartition);
        }
    }

    @Test
    public void testPartitionAddedToTransactionAfterFullBatchRetry() throws Exception {
        StringSerializer serializer = new StringSerializer();
        KafkaProducerTestContext ctx = new KafkaProducerTestContext(this.testInfo, serializer);
        String topic = "foo";
        TopicPartition topicPartition0 = new TopicPartition(topic, 0);
        TopicPartition topicPartition1 = new TopicPartition(topic, 1);
        Cluster cluster = TestUtils.singletonCluster(topic, 2);
        Mockito.when((Object)ctx.sender.isRunning()).thenReturn((Object)true);
        Mockito.when((Object)ctx.metadata.fetch()).thenReturn((Object)cluster);
        long timestamp = ctx.time.milliseconds();
        ProducerRecord record = new ProducerRecord(topic, null, Long.valueOf(timestamp), (Object)"key", (Object)"value");
        FutureRecordMetadata future = this.expectAppendWithAbortForNewBatch(ctx, record, topicPartition0, topicPartition1, cluster);
        try (KafkaProducer producer = ctx.newKafkaProducer();){
            Assertions.assertEquals((Object)future, (Object)producer.send(record));
            Assertions.assertFalse((boolean)future.isDone());
            ((Partitioner)Mockito.verify((Object)ctx.partitioner)).onNewBatch(topic, cluster, 0);
            ((TransactionManager)Mockito.verify((Object)ctx.transactionManager, (VerificationMode)Mockito.never())).maybeAddPartition(topicPartition0);
            ((TransactionManager)Mockito.verify((Object)ctx.transactionManager)).maybeAddPartition(topicPartition1);
        }
    }

    private <T> FutureRecordMetadata expectAppend(KafkaProducerTestContext<T> ctx, ProducerRecord<T, T> record, TopicPartition initialSelectedPartition, Cluster cluster) throws InterruptedException {
        byte[] serializedKey = ((KafkaProducerTestContext)ctx).serializer.serialize("topic", record.key());
        byte[] serializedValue = ((KafkaProducerTestContext)ctx).serializer.serialize("topic", record.value());
        long timestamp = record.timestamp() == null ? ((KafkaProducerTestContext)ctx).time.milliseconds() : record.timestamp().longValue();
        ProduceRequestResult requestResult = new ProduceRequestResult(initialSelectedPartition);
        FutureRecordMetadata futureRecordMetadata = new FutureRecordMetadata(requestResult, 5, timestamp, serializedKey.length, serializedValue.length, ((KafkaProducerTestContext)ctx).time);
        Mockito.when((Object)((KafkaProducerTestContext)ctx).partitioner.partition(initialSelectedPartition.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster)).thenReturn((Object)initialSelectedPartition.partition());
        Mockito.when((Object)((KafkaProducerTestContext)ctx).accumulator.append((String)ArgumentMatchers.eq((Object)initialSelectedPartition.topic()), ArgumentMatchers.eq((int)initialSelectedPartition.partition()), ArgumentMatchers.eq((long)timestamp), (byte[])ArgumentMatchers.eq((Object)serializedKey), (byte[])ArgumentMatchers.eq((Object)serializedValue), (Header[])ArgumentMatchers.eq((Object)Record.EMPTY_HEADERS), (RecordAccumulator.AppendCallbacks)ArgumentMatchers.any(RecordAccumulator.AppendCallbacks.class), ArgumentMatchers.anyLong(), ArgumentMatchers.eq((boolean)true), ArgumentMatchers.anyLong(), (Cluster)ArgumentMatchers.any())).thenAnswer(invocation -> {
            RecordAccumulator.AppendCallbacks callbacks = (RecordAccumulator.AppendCallbacks)invocation.getArguments()[6];
            callbacks.setPartition(initialSelectedPartition.partition());
            return new RecordAccumulator.RecordAppendResult(futureRecordMetadata, false, false, false, 0);
        });
        return futureRecordMetadata;
    }

    private <T> FutureRecordMetadata expectAppendWithAbortForNewBatch(KafkaProducerTestContext<T> ctx, ProducerRecord<T, T> record, TopicPartition initialSelectedPartition, TopicPartition retrySelectedPartition, Cluster cluster) throws InterruptedException {
        byte[] serializedKey = ((KafkaProducerTestContext)ctx).serializer.serialize("topic", record.key());
        byte[] serializedValue = ((KafkaProducerTestContext)ctx).serializer.serialize("topic", record.value());
        long timestamp = record.timestamp() == null ? ((KafkaProducerTestContext)ctx).time.milliseconds() : record.timestamp().longValue();
        ProduceRequestResult requestResult = new ProduceRequestResult(retrySelectedPartition);
        FutureRecordMetadata futureRecordMetadata = new FutureRecordMetadata(requestResult, 0, timestamp, serializedKey.length, serializedValue.length, ((KafkaProducerTestContext)ctx).time);
        Mockito.when((Object)((KafkaProducerTestContext)ctx).partitioner.partition(initialSelectedPartition.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster)).thenReturn((Object)initialSelectedPartition.partition()).thenReturn((Object)retrySelectedPartition.partition());
        Mockito.when((Object)((KafkaProducerTestContext)ctx).accumulator.append((String)ArgumentMatchers.eq((Object)initialSelectedPartition.topic()), ArgumentMatchers.eq((int)initialSelectedPartition.partition()), ArgumentMatchers.eq((long)timestamp), (byte[])ArgumentMatchers.eq((Object)serializedKey), (byte[])ArgumentMatchers.eq((Object)serializedValue), (Header[])ArgumentMatchers.eq((Object)Record.EMPTY_HEADERS), (RecordAccumulator.AppendCallbacks)ArgumentMatchers.any(RecordAccumulator.AppendCallbacks.class), ArgumentMatchers.anyLong(), ArgumentMatchers.eq((boolean)true), ArgumentMatchers.anyLong(), (Cluster)ArgumentMatchers.any())).thenAnswer(invocation -> {
            RecordAccumulator.AppendCallbacks callbacks = (RecordAccumulator.AppendCallbacks)invocation.getArguments()[6];
            callbacks.setPartition(initialSelectedPartition.partition());
            return new RecordAccumulator.RecordAppendResult(null, false, false, true, 0);
        });
        Mockito.when((Object)((KafkaProducerTestContext)ctx).accumulator.append((String)ArgumentMatchers.eq((Object)retrySelectedPartition.topic()), ArgumentMatchers.eq((int)retrySelectedPartition.partition()), ArgumentMatchers.eq((long)timestamp), (byte[])ArgumentMatchers.eq((Object)serializedKey), (byte[])ArgumentMatchers.eq((Object)serializedValue), (Header[])ArgumentMatchers.eq((Object)Record.EMPTY_HEADERS), (RecordAccumulator.AppendCallbacks)ArgumentMatchers.any(RecordAccumulator.AppendCallbacks.class), ArgumentMatchers.anyLong(), ArgumentMatchers.eq((boolean)false), ArgumentMatchers.anyLong(), (Cluster)ArgumentMatchers.any())).thenAnswer(invocation -> {
            RecordAccumulator.AppendCallbacks callbacks = (RecordAccumulator.AppendCallbacks)invocation.getArguments()[6];
            callbacks.setPartition(retrySelectedPartition.partition());
            return new RecordAccumulator.RecordAppendResult(futureRecordMetadata, false, true, false, 0);
        });
        return futureRecordMetadata;
    }

    @Test
    void testDeliveryTimeoutAndLingerMsConfig() {
        HashMap<String, Object> configs = new HashMap<String, Object>();
        configs.put("client.id", "testDeliveryTimeoutAndLingerMsConfig");
        configs.put("bootstrap.servers", "localhost:9999");
        configs.put("delivery.timeout.ms", 1000);
        configs.put("linger.ms", 1000);
        configs.put("request.timeout.ms", 1);
        Assertions.assertThrows(KafkaException.class, () -> new KafkaProducer(configs, (Serializer)new StringSerializer(), (Serializer)new StringSerializer()));
        configs.put("delivery.timeout.ms", 1000);
        configs.put("linger.ms", 999);
        configs.put("request.timeout.ms", 1);
        Assertions.assertDoesNotThrow(() -> new KafkaProducer(configs, (Serializer)new StringSerializer(), (Serializer)new StringSerializer()).close());
    }

    private static /* synthetic */ void lambda$closeWithNegativeTimestampShouldThrow$30(Producer producer) throws Throwable {
        producer.close(Duration.ofMillis(-100L));
    }

    private static class KafkaProducerTestContext<T> {
        private final TestInfo testInfo;
        private final Map<String, Object> configs;
        private final Serializer<T> serializer;
        private final Partitioner partitioner = (Partitioner)Mockito.mock(Partitioner.class);
        private final KafkaThread ioThread = (KafkaThread)Mockito.mock(KafkaThread.class);
        private final List<ProducerInterceptor<T, T>> interceptors = new ArrayList<ProducerInterceptor<T, T>>();
        private ProducerMetadata metadata = (ProducerMetadata)Mockito.mock(ProducerMetadata.class);
        private RecordAccumulator accumulator = (RecordAccumulator)Mockito.mock(RecordAccumulator.class);
        private Sender sender = (Sender)Mockito.mock(Sender.class);
        private TransactionManager transactionManager = (TransactionManager)Mockito.mock(TransactionManager.class);
        private Time time = new MockTime();
        private final Metrics metrics = new Metrics(this.time);

        public KafkaProducerTestContext(TestInfo testInfo, Serializer<T> serializer) {
            this(testInfo, new HashMap<String, Object>(), serializer);
        }

        public KafkaProducerTestContext(TestInfo testInfo, Map<String, Object> configs, Serializer<T> serializer) {
            this.testInfo = testInfo;
            this.configs = configs;
            this.serializer = serializer;
            if (!configs.containsKey("bootstrap.servers")) {
                configs.put("bootstrap.servers", "localhost:9999");
            }
        }

        public KafkaProducerTestContext<T> setProducerMetadata(ProducerMetadata metadata) {
            this.metadata = metadata;
            return this;
        }

        public KafkaProducerTestContext<T> setAccumulator(RecordAccumulator accumulator) {
            this.accumulator = accumulator;
            return this;
        }

        public KafkaProducerTestContext<T> setSender(Sender sender) {
            this.sender = sender;
            return this;
        }

        public KafkaProducerTestContext<T> setTransactionManager(TransactionManager transactionManager) {
            this.transactionManager = transactionManager;
            return this;
        }

        public KafkaProducerTestContext<T> addInterceptor(ProducerInterceptor<T, T> interceptor) {
            this.interceptors.add(interceptor);
            return this;
        }

        public KafkaProducerTestContext<T> setTime(Time time) {
            this.time = time;
            return this;
        }

        public KafkaProducer<T, T> newKafkaProducer() {
            LogContext logContext = new LogContext("[Producer test=" + this.testInfo.getDisplayName() + "] ");
            ProducerConfig producerConfig = new ProducerConfig(ProducerConfig.appendSerializerToConfig(this.configs, this.serializer, this.serializer));
            ProducerInterceptors interceptors = new ProducerInterceptors(this.interceptors);
            return new KafkaProducer(producerConfig, logContext, this.metrics, this.serializer, this.serializer, this.metadata, this.accumulator, this.transactionManager, this.sender, interceptors, this.partitioner, this.time, this.ioThread, Optional.empty());
        }
    }

    public static class BuggyPartitioner
    implements Partitioner {
        public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
            return -1;
        }

        public void close() {
        }

        public void configure(Map<String, ?> configs) {
        }
    }

    public static class ProducerInterceptorForClientId
    implements ProducerInterceptor<byte[], byte[]> {
        public ProducerRecord<byte[], byte[]> onSend(ProducerRecord<byte[], byte[]> record) {
            return record;
        }

        public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        }

        public void close() {
        }

        public void configure(Map<String, ?> configs) {
            CLIENT_IDS.add(configs.get("client.id").toString());
        }
    }

    public static class PartitionerForClientId
    implements Partitioner {
        public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
            return 0;
        }

        public void close() {
        }

        public void configure(Map<String, ?> configs) {
            CLIENT_IDS.add(configs.get("client.id").toString());
        }
    }

    public static class SerializerForClientId
    implements Serializer<byte[]> {
        public void configure(Map<String, ?> configs, boolean isKey) {
            CLIENT_IDS.add(configs.get("client.id").toString());
        }

        public byte[] serialize(String topic, byte[] data) {
            return data;
        }
    }
}

