/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients.producer;

import java.lang.management.ManagementFactory;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.internals.ProducerInterceptors;
import org.apache.kafka.clients.producer.internals.ProducerMetadata;
import org.apache.kafka.clients.producer.internals.Sender;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.message.AddOffsetsToTxnResponseData;
import org.apache.kafka.common.message.EndTxnResponseData;
import org.apache.kafka.common.message.InitProducerIdResponseData;
import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
import org.apache.kafka.common.metrics.Measurable;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.AddOffsetsToTxnResponse;
import org.apache.kafka.common.requests.EndTxnResponse;
import org.apache.kafka.common.requests.FindCoordinatorRequest;
import org.apache.kafka.common.requests.FindCoordinatorResponse;
import org.apache.kafka.common.requests.InitProducerIdResponse;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.requests.TxnOffsetCommitRequest;
import org.apache.kafka.common.requests.TxnOffsetCommitResponse;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.ExtendedSerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.test.MockMetricsReporter;
import org.apache.kafka.test.MockPartitioner;
import org.apache.kafka.test.MockProducerInterceptor;
import org.apache.kafka.test.MockSerializer;
import org.apache.kafka.test.TestUtils;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

public class KafkaProducerTest {
    private String topic = "topic";
    private Node host1 = new Node(0, "host1", 1000);
    private Collection<Node> nodes = Collections.singletonList(this.host1);
    private final Cluster emptyCluster = new Cluster(null, this.nodes, Collections.emptySet(), Collections.emptySet(), Collections.emptySet());
    private final Cluster onePartitionCluster = new Cluster("dummy", this.nodes, Collections.singletonList(new PartitionInfo(this.topic, 0, null, null, null)), Collections.emptySet(), Collections.emptySet());
    private final Cluster threePartitionCluster = new Cluster("dummy", this.nodes, Arrays.asList(new PartitionInfo(this.topic, 0, null, null, null), new PartitionInfo(this.topic, 1, null, null, null), new PartitionInfo(this.topic, 2, null, null, null)), Collections.emptySet(), Collections.emptySet());
    private final int defaultMetadataIdleMs = 300000;

    @Test
    public void testOverwriteAcksAndRetriesForIdempotentProducers() {
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "localhost:9999");
        props.setProperty("transactional.id", "transactionalId");
        props.setProperty("key.serializer", StringSerializer.class.getName());
        props.setProperty("value.serializer", StringSerializer.class.getName());
        ProducerConfig config = new ProducerConfig(props);
        Assert.assertTrue((boolean)config.getBoolean("enable.idempotence"));
        Assert.assertTrue((boolean)Stream.of("-1", "all").anyMatch(each -> each.equalsIgnoreCase(config.getString("acks"))));
        Assert.assertEquals((long)config.getInt("retries").intValue(), (long)Integer.MAX_VALUE);
        Assert.assertTrue((boolean)config.getString("client.id").equalsIgnoreCase("producer-" + config.getString("transactional.id")));
    }

    @Test
    public void testMetricsReporterAutoGeneratedClientId() {
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "localhost:9999");
        props.setProperty("metric.reporters", MockMetricsReporter.class.getName());
        KafkaProducer producer = new KafkaProducer(props, (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
        MockMetricsReporter mockMetricsReporter = (MockMetricsReporter)producer.metrics.reporters().get(0);
        Assert.assertEquals((Object)producer.getClientId(), (Object)mockMetricsReporter.clientId);
        producer.close();
    }

    @Test
    public void testConstructorWithSerializers() {
        Properties producerProps = new Properties();
        producerProps.put("bootstrap.servers", "localhost:9000");
        new KafkaProducer(producerProps, (Serializer)new ByteArraySerializer(), (Serializer)new ByteArraySerializer()).close();
    }

    @Test(expected=ConfigException.class)
    public void testNoSerializerProvided() {
        Properties producerProps = new Properties();
        producerProps.put("bootstrap.servers", "localhost:9000");
        new KafkaProducer(producerProps);
    }

    @Test
    public void testConstructorFailureCloseResource() {
        Properties props = new Properties();
        props.setProperty("client.id", "testConstructorClose");
        props.setProperty("bootstrap.servers", "some.invalid.hostname.foo.bar.local:9999");
        props.setProperty("metric.reporters", MockMetricsReporter.class.getName());
        int oldInitCount = MockMetricsReporter.INIT_COUNT.get();
        int oldCloseCount = MockMetricsReporter.CLOSE_COUNT.get();
        try (KafkaProducer ignored = new KafkaProducer(props, (Serializer)new ByteArraySerializer(), (Serializer)new ByteArraySerializer());){
            Assert.fail((String)"should have caught an exception and returned");
        }
        catch (KafkaException e) {
            Assert.assertEquals((long)(oldInitCount + 1), (long)MockMetricsReporter.INIT_COUNT.get());
            Assert.assertEquals((long)(oldCloseCount + 1), (long)MockMetricsReporter.CLOSE_COUNT.get());
            Assert.assertEquals((Object)"Failed to construct kafka producer", (Object)e.getMessage());
        }
    }

    @Test
    public void testConstructorWithNotStringKey() {
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "localhost:9999");
        props.put((Object)1, "not string key");
        try (KafkaProducer ff = new KafkaProducer(props, (Serializer)new StringSerializer(), (Serializer)new StringSerializer());){
            Assert.fail((String)"Constructor should throw exception");
        }
        catch (ConfigException e) {
            Assert.assertTrue((String)("Unexpected exception message: " + e.getMessage()), (boolean)e.getMessage().contains("not string key"));
        }
    }

    @Test
    public void testSerializerClose() {
        HashMap<String, String> configs = new HashMap<String, String>();
        configs.put("client.id", "testConstructorClose");
        configs.put("bootstrap.servers", "localhost:9999");
        configs.put("metric.reporters", MockMetricsReporter.class.getName());
        configs.put("security.protocol", "PLAINTEXT");
        int oldInitCount = MockSerializer.INIT_COUNT.get();
        int oldCloseCount = MockSerializer.CLOSE_COUNT.get();
        KafkaProducer producer = new KafkaProducer(configs, (Serializer)new MockSerializer(), (Serializer)new MockSerializer());
        Assert.assertEquals((long)(oldInitCount + 2), (long)MockSerializer.INIT_COUNT.get());
        Assert.assertEquals((long)oldCloseCount, (long)MockSerializer.CLOSE_COUNT.get());
        producer.close();
        Assert.assertEquals((long)(oldInitCount + 2), (long)MockSerializer.INIT_COUNT.get());
        Assert.assertEquals((long)(oldCloseCount + 2), (long)MockSerializer.CLOSE_COUNT.get());
    }

    @Test
    public void testInterceptorConstructClose() {
        try {
            Properties props = new Properties();
            props.setProperty("bootstrap.servers", "localhost:9999");
            props.setProperty("interceptor.classes", MockProducerInterceptor.class.getName());
            props.setProperty("mock.interceptor.append", "something");
            KafkaProducer producer = new KafkaProducer(props, (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
            Assert.assertEquals((long)1L, (long)MockProducerInterceptor.INIT_COUNT.get());
            Assert.assertEquals((long)0L, (long)MockProducerInterceptor.CLOSE_COUNT.get());
            Assert.assertNull((Object)MockProducerInterceptor.CLUSTER_META.get());
            producer.close();
            Assert.assertEquals((long)1L, (long)MockProducerInterceptor.INIT_COUNT.get());
            Assert.assertEquals((long)1L, (long)MockProducerInterceptor.CLOSE_COUNT.get());
        }
        finally {
            MockProducerInterceptor.resetCounters();
        }
    }

    @Test
    public void testPartitionerClose() {
        try {
            Properties props = new Properties();
            props.setProperty("bootstrap.servers", "localhost:9999");
            MockPartitioner.resetCounters();
            props.setProperty("partitioner.class", MockPartitioner.class.getName());
            KafkaProducer producer = new KafkaProducer(props, (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
            Assert.assertEquals((long)1L, (long)MockPartitioner.INIT_COUNT.get());
            Assert.assertEquals((long)0L, (long)MockPartitioner.CLOSE_COUNT.get());
            producer.close();
            Assert.assertEquals((long)1L, (long)MockPartitioner.INIT_COUNT.get());
            Assert.assertEquals((long)1L, (long)MockPartitioner.CLOSE_COUNT.get());
        }
        finally {
            MockPartitioner.resetCounters();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void shouldCloseProperlyAndThrowIfInterrupted() throws Exception {
        HashMap<String, String> configs = new HashMap<String, String>();
        configs.put("bootstrap.servers", "localhost:9999");
        configs.put("partitioner.class", MockPartitioner.class.getName());
        configs.put("batch.size", "1");
        MockTime time = new MockTime();
        MetadataResponse initialUpdateResponse = TestUtils.metadataUpdateWith(1, Collections.singletonMap("topic", 1));
        ProducerMetadata metadata = this.newMetadata(0L, Long.MAX_VALUE);
        MockClient client = new MockClient((Time)time, (Metadata)metadata);
        client.updateMetadata(initialUpdateResponse);
        KafkaProducer producer = new KafkaProducer(configs, (Serializer)new StringSerializer(), (Serializer)new StringSerializer(), metadata, (KafkaClient)client, null, (Time)time);
        ExecutorService executor = Executors.newSingleThreadExecutor();
        AtomicReference closeException = new AtomicReference();
        try {
            Future<?> future = executor.submit(() -> KafkaProducerTest.lambda$shouldCloseProperlyAndThrowIfInterrupted$1((Producer)producer, closeException));
            try {
                future.get(100L, TimeUnit.MILLISECONDS);
                Assert.fail((String)"Close completed without waiting for send");
            }
            catch (TimeoutException timeoutException) {
                // empty catch block
            }
            client.waitForRequests(1, 1000L);
            Assert.assertTrue((String)"Close terminated prematurely", (boolean)future.cancel(true));
            TestUtils.waitForCondition(() -> closeException.get() != null, "InterruptException did not occur within timeout.");
            Assert.assertTrue((String)("Expected exception not thrown " + closeException), (boolean)(closeException.get() instanceof InterruptException));
        }
        finally {
            executor.shutdownNow();
        }
    }

    @Test
    public void testOsDefaultSocketBufferSizes() {
        HashMap<String, Object> config = new HashMap<String, Object>();
        config.put("bootstrap.servers", "localhost:9999");
        config.put("send.buffer.bytes", -1);
        config.put("receive.buffer.bytes", -1);
        new KafkaProducer(config, (Serializer)new ByteArraySerializer(), (Serializer)new ByteArraySerializer()).close();
    }

    @Test(expected=KafkaException.class)
    public void testInvalidSocketSendBufferSize() {
        HashMap<String, Object> config = new HashMap<String, Object>();
        config.put("bootstrap.servers", "localhost:9999");
        config.put("send.buffer.bytes", -2);
        new KafkaProducer(config, (Serializer)new ByteArraySerializer(), (Serializer)new ByteArraySerializer());
    }

    @Test(expected=KafkaException.class)
    public void testInvalidSocketReceiveBufferSize() {
        HashMap<String, Object> config = new HashMap<String, Object>();
        config.put("bootstrap.servers", "localhost:9999");
        config.put("receive.buffer.bytes", -2);
        new KafkaProducer(config, (Serializer)new ByteArraySerializer(), (Serializer)new ByteArraySerializer());
    }

    @Test
    public void testMetadataFetch() throws InterruptedException {
        HashMap<String, String> configs = new HashMap<String, String>();
        configs.put("bootstrap.servers", "localhost:9999");
        ProducerMetadata metadata = (ProducerMetadata)Mockito.mock(ProducerMetadata.class);
        Mockito.when((Object)metadata.fetch()).thenReturn((Object)this.emptyCluster, (Object[])new Cluster[]{this.emptyCluster, this.emptyCluster, this.emptyCluster, this.onePartitionCluster});
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(configs, (Serializer)new StringSerializer(), (Serializer)new StringSerializer(), metadata, (KafkaClient)new MockClient(Time.SYSTEM, (Metadata)metadata), null, Time.SYSTEM){

            Sender newSender(LogContext logContext, KafkaClient kafkaClient, ProducerMetadata metadata) {
                return super.newSender(logContext, kafkaClient, KafkaProducerTest.this.newMetadata(0L, 100000L));
            }
        };
        ProducerRecord record = new ProducerRecord(this.topic, (Object)"value");
        producer.send(record);
        ((ProducerMetadata)Mockito.verify((Object)metadata, (VerificationMode)Mockito.times((int)4))).requestUpdateForTopic(this.topic);
        ((ProducerMetadata)Mockito.verify((Object)metadata, (VerificationMode)Mockito.times((int)4))).awaitUpdate(ArgumentMatchers.anyInt(), ArgumentMatchers.anyLong());
        ((ProducerMetadata)Mockito.verify((Object)metadata, (VerificationMode)Mockito.times((int)5))).fetch();
        producer.send(record, null);
        ((ProducerMetadata)Mockito.verify((Object)metadata, (VerificationMode)Mockito.times((int)4))).requestUpdateForTopic(this.topic);
        ((ProducerMetadata)Mockito.verify((Object)metadata, (VerificationMode)Mockito.times((int)4))).awaitUpdate(ArgumentMatchers.anyInt(), ArgumentMatchers.anyLong());
        ((ProducerMetadata)Mockito.verify((Object)metadata, (VerificationMode)Mockito.times((int)6))).fetch();
        producer.partitionsFor(this.topic);
        ((ProducerMetadata)Mockito.verify((Object)metadata, (VerificationMode)Mockito.times((int)4))).requestUpdateForTopic(this.topic);
        ((ProducerMetadata)Mockito.verify((Object)metadata, (VerificationMode)Mockito.times((int)4))).awaitUpdate(ArgumentMatchers.anyInt(), ArgumentMatchers.anyLong());
        ((ProducerMetadata)Mockito.verify((Object)metadata, (VerificationMode)Mockito.times((int)7))).fetch();
        producer.close(Duration.ofMillis(0L));
    }

    @Test
    public void testMetadataExpiry() throws InterruptedException {
        HashMap<String, String> configs = new HashMap<String, String>();
        configs.put("bootstrap.servers", "localhost:9999");
        ProducerMetadata metadata = (ProducerMetadata)Mockito.mock(ProducerMetadata.class);
        Cluster emptyCluster = new Cluster("dummy", Collections.singletonList(this.host1), Collections.emptySet(), Collections.emptySet(), Collections.emptySet());
        Mockito.when((Object)metadata.fetch()).thenReturn((Object)this.onePartitionCluster, (Object[])new Cluster[]{emptyCluster, this.onePartitionCluster});
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(configs, (Serializer)new StringSerializer(), (Serializer)new StringSerializer(), metadata, (KafkaClient)new MockClient(Time.SYSTEM, (Metadata)metadata), null, Time.SYSTEM){

            Sender newSender(LogContext logContext, KafkaClient kafkaClient, ProducerMetadata metadata) {
                return super.newSender(logContext, kafkaClient, KafkaProducerTest.this.newMetadata(0L, 100000L));
            }
        };
        ProducerRecord record = new ProducerRecord(this.topic, (Object)"value");
        producer.send(record);
        ((ProducerMetadata)Mockito.verify((Object)metadata, (VerificationMode)Mockito.times((int)0))).requestUpdateForTopic(this.topic);
        ((ProducerMetadata)Mockito.verify((Object)metadata, (VerificationMode)Mockito.times((int)0))).awaitUpdate(ArgumentMatchers.anyInt(), ArgumentMatchers.anyLong());
        ((ProducerMetadata)Mockito.verify((Object)metadata, (VerificationMode)Mockito.times((int)1))).fetch();
        producer.send(record, null);
        ((ProducerMetadata)Mockito.verify((Object)metadata, (VerificationMode)Mockito.times((int)1))).requestUpdateForTopic(this.topic);
        ((ProducerMetadata)Mockito.verify((Object)metadata, (VerificationMode)Mockito.times((int)1))).awaitUpdate(ArgumentMatchers.anyInt(), ArgumentMatchers.anyLong());
        ((ProducerMetadata)Mockito.verify((Object)metadata, (VerificationMode)Mockito.times((int)3))).fetch();
        producer.close(Duration.ofMillis(0L));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testMetadataTimeoutWithMissingTopic() throws Exception {
        HashMap<String, Object> configs = new HashMap<String, Object>();
        configs.put("bootstrap.servers", "localhost:9999");
        configs.put("max.block.ms", 60000);
        ProducerRecord record = new ProducerRecord(this.topic, Integer.valueOf(2), null, (Object)"value");
        ProducerMetadata metadata = (ProducerMetadata)Mockito.mock(ProducerMetadata.class);
        MockTime mockTime = new MockTime();
        AtomicInteger invocationCount = new AtomicInteger(0);
        Mockito.when((Object)metadata.fetch()).then(invocation -> {
            invocationCount.incrementAndGet();
            if (invocationCount.get() == 5) {
                mockTime.setCurrentTimeMs(mockTime.milliseconds() + 70000L);
            }
            return this.emptyCluster;
        });
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(configs, (Serializer)new StringSerializer(), (Serializer)new StringSerializer(), metadata, (KafkaClient)new MockClient(Time.SYSTEM, (Metadata)metadata), null, (Time)mockTime){

            Sender newSender(LogContext logContext, KafkaClient kafkaClient, ProducerMetadata metadata) {
                return super.newSender(logContext, kafkaClient, KafkaProducerTest.this.newMetadata(0L, 100000L));
            }
        };
        Future future = producer.send(record);
        ((ProducerMetadata)Mockito.verify((Object)metadata, (VerificationMode)Mockito.times((int)4))).requestUpdateForTopic(this.topic);
        ((ProducerMetadata)Mockito.verify((Object)metadata, (VerificationMode)Mockito.times((int)4))).awaitUpdate(ArgumentMatchers.anyInt(), ArgumentMatchers.anyLong());
        ((ProducerMetadata)Mockito.verify((Object)metadata, (VerificationMode)Mockito.times((int)5))).fetch();
        try {
            future.get();
        }
        catch (ExecutionException e) {
            Assert.assertTrue((boolean)(e.getCause() instanceof org.apache.kafka.common.errors.TimeoutException));
        }
        finally {
            producer.close(Duration.ofMillis(0L));
        }
    }

    @Test
    public void testMetadataWithPartitionOutOfRange() throws Exception {
        HashMap<String, Object> configs = new HashMap<String, Object>();
        configs.put("bootstrap.servers", "localhost:9999");
        configs.put("max.block.ms", 60000);
        ProducerRecord record = new ProducerRecord(this.topic, Integer.valueOf(2), null, (Object)"value");
        ProducerMetadata metadata = (ProducerMetadata)Mockito.mock(ProducerMetadata.class);
        MockTime mockTime = new MockTime();
        Mockito.when((Object)metadata.fetch()).thenReturn((Object)this.onePartitionCluster, (Object[])new Cluster[]{this.onePartitionCluster, this.threePartitionCluster});
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(configs, (Serializer)new StringSerializer(), (Serializer)new StringSerializer(), metadata, (KafkaClient)new MockClient(Time.SYSTEM, (Metadata)metadata), null, (Time)mockTime){

            Sender newSender(LogContext logContext, KafkaClient kafkaClient, ProducerMetadata metadata) {
                return super.newSender(logContext, kafkaClient, KafkaProducerTest.this.newMetadata(0L, 100000L));
            }
        };
        producer.send(record);
        ((ProducerMetadata)Mockito.verify((Object)metadata, (VerificationMode)Mockito.times((int)2))).requestUpdateForTopic(this.topic);
        ((ProducerMetadata)Mockito.verify((Object)metadata, (VerificationMode)Mockito.times((int)2))).awaitUpdate(ArgumentMatchers.anyInt(), ArgumentMatchers.anyLong());
        ((ProducerMetadata)Mockito.verify((Object)metadata, (VerificationMode)Mockito.times((int)3))).fetch();
        producer.close(Duration.ofMillis(0L));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testMetadataTimeoutWithPartitionOutOfRange() throws Exception {
        HashMap<String, Object> configs = new HashMap<String, Object>();
        configs.put("bootstrap.servers", "localhost:9999");
        configs.put("max.block.ms", 60000);
        ProducerRecord record = new ProducerRecord(this.topic, Integer.valueOf(2), null, (Object)"value");
        ProducerMetadata metadata = (ProducerMetadata)Mockito.mock(ProducerMetadata.class);
        MockTime mockTime = new MockTime();
        AtomicInteger invocationCount = new AtomicInteger(0);
        Mockito.when((Object)metadata.fetch()).then(invocation -> {
            invocationCount.incrementAndGet();
            if (invocationCount.get() == 5) {
                mockTime.setCurrentTimeMs(mockTime.milliseconds() + 70000L);
            }
            return this.onePartitionCluster;
        });
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(configs, (Serializer)new StringSerializer(), (Serializer)new StringSerializer(), metadata, (KafkaClient)new MockClient(Time.SYSTEM, (Metadata)metadata), null, (Time)mockTime){

            Sender newSender(LogContext logContext, KafkaClient kafkaClient, ProducerMetadata metadata) {
                return super.newSender(logContext, kafkaClient, KafkaProducerTest.this.newMetadata(0L, 100000L));
            }
        };
        Future future = producer.send(record);
        ((ProducerMetadata)Mockito.verify((Object)metadata, (VerificationMode)Mockito.times((int)4))).requestUpdateForTopic(this.topic);
        ((ProducerMetadata)Mockito.verify((Object)metadata, (VerificationMode)Mockito.times((int)4))).awaitUpdate(ArgumentMatchers.anyInt(), ArgumentMatchers.anyLong());
        ((ProducerMetadata)Mockito.verify((Object)metadata, (VerificationMode)Mockito.times((int)5))).fetch();
        try {
            future.get();
        }
        catch (ExecutionException e) {
            Assert.assertTrue((boolean)(e.getCause() instanceof org.apache.kafka.common.errors.TimeoutException));
        }
        finally {
            producer.close(Duration.ofMillis(0L));
        }
    }

    @Test
    public void testTopicRefreshInMetadata() throws InterruptedException {
        HashMap<String, String> configs = new HashMap<String, String>();
        configs.put("bootstrap.servers", "localhost:9999");
        configs.put("max.block.ms", "600000");
        long refreshBackoffMs = 500L;
        long metadataExpireMs = 60000L;
        long metadataIdleMs = 60000L;
        MockTime time = new MockTime();
        ProducerMetadata metadata = new ProducerMetadata(refreshBackoffMs, metadataExpireMs, metadataIdleMs, new LogContext(), new ClusterResourceListeners(), (Time)time);
        String topic = "topic";
        try (KafkaProducer producer = new KafkaProducer(configs, (Serializer)new StringSerializer(), (Serializer)new StringSerializer(), metadata, (KafkaClient)new MockClient((Time)time, (Metadata)metadata), null, (Time)time);){
            AtomicBoolean running = new AtomicBoolean(true);
            Thread t = new Thread(() -> {
                long startTimeMs = System.currentTimeMillis();
                while (running.get()) {
                    while (!metadata.updateRequested() && System.currentTimeMillis() - startTimeMs < 100L) {
                        Thread.yield();
                    }
                    MetadataResponse updateResponse = TestUtils.metadataUpdateWith("kafka-cluster", 1, Collections.singletonMap("topic", Errors.UNKNOWN_TOPIC_OR_PARTITION), Collections.emptyMap());
                    metadata.updateWithCurrentRequestVersion(updateResponse, false, time.milliseconds());
                    time.sleep(60000L);
                }
            });
            t.start();
            Assert.assertThrows(org.apache.kafka.common.errors.TimeoutException.class, () -> producer.partitionsFor("topic"));
            running.set(false);
            t.join();
        }
    }

    @Test
    public void testTopicExpiryInMetadata() throws InterruptedException {
        HashMap<String, String> configs = new HashMap<String, String>();
        configs.put("bootstrap.servers", "localhost:9999");
        configs.put("max.block.ms", "30000");
        long refreshBackoffMs = 500L;
        long metadataExpireMs = 60000L;
        long metadataIdleMs = 60000L;
        MockTime time = new MockTime();
        ProducerMetadata metadata = new ProducerMetadata(refreshBackoffMs, metadataExpireMs, metadataIdleMs, new LogContext(), new ClusterResourceListeners(), (Time)time);
        String topic = "topic";
        try (KafkaProducer producer = new KafkaProducer(configs, (Serializer)new StringSerializer(), (Serializer)new StringSerializer(), metadata, (KafkaClient)new MockClient((Time)time, (Metadata)metadata), null, (Time)time);){
            Exchanger<Object> exchanger = new Exchanger<Object>();
            Thread t = new Thread(() -> {
                try {
                    exchanger.exchange(null);
                    while (!metadata.updateRequested()) {
                        Thread.sleep(100L);
                    }
                    MetadataResponse updateResponse = TestUtils.metadataUpdateWith(1, Collections.singletonMap("topic", 1));
                    metadata.updateWithCurrentRequestVersion(updateResponse, false, time.milliseconds());
                    exchanger.exchange(null);
                    time.sleep(120000L);
                    updateResponse = TestUtils.metadataUpdateWith(1, Collections.singletonMap("topic", 1));
                    metadata.updateWithCurrentRequestVersion(updateResponse, false, time.milliseconds());
                    exchanger.exchange(null);
                    while (!metadata.updateRequested()) {
                        Thread.sleep(100L);
                    }
                    time.sleep(30000L);
                }
                catch (Exception e) {
                    throw new RuntimeException(e);
                }
            });
            t.start();
            exchanger.exchange(null);
            Assert.assertNotNull((Object)producer.partitionsFor("topic"));
            exchanger.exchange(null);
            exchanger.exchange(null);
            Assert.assertThrows(org.apache.kafka.common.errors.TimeoutException.class, () -> producer.partitionsFor("topic"));
            t.join();
        }
    }

    @Test
    @Deprecated
    public void testHeadersWithExtendedClasses() {
        this.doTestHeaders(ExtendedSerializer.class);
    }

    @Test
    public void testHeaders() {
        this.doTestHeaders(Serializer.class);
    }

    private <T extends Serializer<String>> void doTestHeaders(Class<T> serializerClassToMock) {
        HashMap<String, String> configs = new HashMap<String, String>();
        configs.put("bootstrap.servers", "localhost:9999");
        Serializer keySerializer = (Serializer)Mockito.mock(serializerClassToMock);
        Serializer valueSerializer = (Serializer)Mockito.mock(serializerClassToMock);
        long nowMs = Time.SYSTEM.milliseconds();
        String topic = "topic";
        ProducerMetadata metadata = this.newMetadata(0L, 90000L);
        metadata.add(topic, nowMs);
        MetadataResponse initialUpdateResponse = TestUtils.metadataUpdateWith(1, Collections.singletonMap(topic, 1));
        metadata.updateWithCurrentRequestVersion(initialUpdateResponse, false, nowMs);
        KafkaProducer producer = new KafkaProducer(configs, keySerializer, valueSerializer, metadata, null, null, Time.SYSTEM);
        Mockito.when((Object)keySerializer.serialize((String)ArgumentMatchers.any(), (Headers)ArgumentMatchers.any(), ArgumentMatchers.any())).then(invocation -> ((String)invocation.getArgument(2)).getBytes());
        Mockito.when((Object)valueSerializer.serialize((String)ArgumentMatchers.any(), (Headers)ArgumentMatchers.any(), ArgumentMatchers.any())).then(invocation -> ((String)invocation.getArgument(2)).getBytes());
        String value = "value";
        String key = "key";
        ProducerRecord record = new ProducerRecord(topic, (Object)key, (Object)value);
        record.headers().add((Header)new RecordHeader("test", "header2".getBytes()));
        producer.send(record, null);
        Assert.assertThrows(IllegalStateException.class, () -> record.headers().add((Header)new RecordHeader("test", "test".getBytes())));
        Assert.assertArrayEquals((byte[])record.headers().lastHeader("test").value(), (byte[])"header2".getBytes());
        ((Serializer)Mockito.verify((Object)valueSerializer)).serialize(topic, record.headers(), (Object)value);
        ((Serializer)Mockito.verify((Object)keySerializer)).serialize(topic, record.headers(), (Object)key);
        producer.close(Duration.ofMillis(0L));
    }

    @Test
    public void closeShouldBeIdempotent() {
        Properties producerProps = new Properties();
        producerProps.put("bootstrap.servers", "localhost:9000");
        KafkaProducer producer = new KafkaProducer(producerProps, (Serializer)new ByteArraySerializer(), (Serializer)new ByteArraySerializer());
        producer.close();
        producer.close();
    }

    @Test
    public void closeWithNegativeTimestampShouldThrow() {
        Properties producerProps = new Properties();
        producerProps.put("bootstrap.servers", "localhost:9000");
        try (KafkaProducer producer = new KafkaProducer(producerProps, (Serializer)new ByteArraySerializer(), (Serializer)new ByteArraySerializer());){
            Assert.assertThrows(IllegalArgumentException.class, () -> KafkaProducerTest.lambda$closeWithNegativeTimestampShouldThrow$12((Producer)producer));
        }
    }

    @Test
    public void testFlushCompleteSendOfInflightBatches() {
        HashMap<String, String> configs = new HashMap<String, String>();
        configs.put("bootstrap.servers", "localhost:9000");
        MockTime time = new MockTime(1L);
        MetadataResponse initialUpdateResponse = TestUtils.metadataUpdateWith(1, Collections.singletonMap("topic", 1));
        ProducerMetadata metadata = this.newMetadata(0L, Long.MAX_VALUE);
        MockClient client = new MockClient((Time)time, (Metadata)metadata);
        client.updateMetadata(initialUpdateResponse);
        try (KafkaProducer producer = new KafkaProducer(configs, (Serializer)new StringSerializer(), (Serializer)new StringSerializer(), metadata, (KafkaClient)client, null, (Time)time);){
            ArrayList<Future> futureResponses = new ArrayList<Future>();
            for (int i = 0; i < 50; ++i) {
                Future response = producer.send(new ProducerRecord("topic", (Object)("value" + i)));
                futureResponses.add(response);
            }
            futureResponses.forEach(res -> Assert.assertTrue((!res.isDone() ? 1 : 0) != 0));
            producer.flush();
            futureResponses.forEach(res -> Assert.assertTrue((boolean)res.isDone()));
        }
    }

    @Test
    public void testMetricConfigRecordingLevel() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9000");
        try (KafkaProducer producer = new KafkaProducer(props, (Serializer)new ByteArraySerializer(), (Serializer)new ByteArraySerializer());){
            Assert.assertEquals((Object)Sensor.RecordingLevel.INFO, (Object)producer.metrics.config().recordLevel());
        }
        props.put("metrics.recording.level", "DEBUG");
        producer = new KafkaProducer(props, (Serializer)new ByteArraySerializer(), (Serializer)new ByteArraySerializer());
        var3_3 = null;
        try {
            Assert.assertEquals((Object)Sensor.RecordingLevel.DEBUG, (Object)producer.metrics.config().recordLevel());
        }
        catch (Throwable throwable) {
            var3_3 = throwable;
            throw throwable;
        }
        finally {
            if (producer != null) {
                if (var3_3 != null) {
                    try {
                        producer.close();
                    }
                    catch (Throwable throwable) {
                        var3_3.addSuppressed(throwable);
                    }
                } else {
                    producer.close();
                }
            }
        }
    }

    @Test
    public void testInterceptorPartitionSetOnTooLargeRecord() {
        HashMap<String, String> configs = new HashMap<String, String>();
        configs.put("bootstrap.servers", "localhost:9999");
        configs.put("max.request.size", "1");
        String topic = "topic";
        ProducerRecord record = new ProducerRecord(topic, (Object)"value");
        long nowMs = Time.SYSTEM.milliseconds();
        ProducerMetadata metadata = this.newMetadata(0L, 90000L);
        metadata.add(topic, nowMs);
        MetadataResponse initialUpdateResponse = TestUtils.metadataUpdateWith(1, Collections.singletonMap(topic, 1));
        metadata.updateWithCurrentRequestVersion(initialUpdateResponse, false, nowMs);
        ProducerInterceptors interceptors = (ProducerInterceptors)Mockito.mock(ProducerInterceptors.class);
        KafkaProducer producer = new KafkaProducer(configs, (Serializer)new StringSerializer(), (Serializer)new StringSerializer(), metadata, null, interceptors, Time.SYSTEM);
        Mockito.when((Object)interceptors.onSend((ProducerRecord)ArgumentMatchers.any())).then(invocation -> invocation.getArgument(0));
        producer.send(record);
        ((ProducerInterceptors)Mockito.verify((Object)interceptors)).onSend(record);
        ((ProducerInterceptors)Mockito.verify((Object)interceptors)).onSendError((ProducerRecord)ArgumentMatchers.eq((Object)record), (TopicPartition)ArgumentMatchers.notNull(), (Exception)ArgumentMatchers.notNull());
        producer.close(Duration.ofMillis(0L));
    }

    @Test
    public void testPartitionsForWithNullTopic() {
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "localhost:9000");
        try (KafkaProducer producer = new KafkaProducer(props, (Serializer)new ByteArraySerializer(), (Serializer)new ByteArraySerializer());){
            Assert.assertThrows(NullPointerException.class, () -> producer.partitionsFor(null));
        }
    }

    @Test
    public void testInitTransactionTimeout() {
        HashMap<String, Object> configs = new HashMap<String, Object>();
        configs.put("transactional.id", "bad-transaction");
        configs.put("max.block.ms", 500);
        configs.put("bootstrap.servers", "localhost:9000");
        MockTime time = new MockTime(1L);
        MetadataResponse initialUpdateResponse = TestUtils.metadataUpdateWith(1, Collections.singletonMap("topic", 1));
        ProducerMetadata metadata = this.newMetadata(0L, Long.MAX_VALUE);
        metadata.updateWithCurrentRequestVersion(initialUpdateResponse, false, time.milliseconds());
        MockClient client = new MockClient((Time)time, (Metadata)metadata);
        try (KafkaProducer producer = new KafkaProducer(configs, (Serializer)new StringSerializer(), (Serializer)new StringSerializer(), metadata, (KafkaClient)client, null, (Time)time);){
            client.prepareResponse(request -> request instanceof FindCoordinatorRequest && ((FindCoordinatorRequest)request).data().keyType() == FindCoordinatorRequest.CoordinatorType.TRANSACTION.id(), (AbstractResponse)FindCoordinatorResponse.prepareResponse((Errors)Errors.NONE, (Node)this.host1));
            Assert.assertThrows(org.apache.kafka.common.errors.TimeoutException.class, () -> ((Producer)producer).initTransactions());
            client.prepareResponse(request -> request instanceof FindCoordinatorRequest && ((FindCoordinatorRequest)request).data().keyType() == FindCoordinatorRequest.CoordinatorType.TRANSACTION.id(), (AbstractResponse)FindCoordinatorResponse.prepareResponse((Errors)Errors.NONE, (Node)this.host1));
            client.prepareResponse((AbstractResponse)this.initProducerIdResponse(1L, (short)5, Errors.NONE));
            producer.initTransactions();
        }
    }

    @Test
    public void testInitTransactionWhileThrottled() {
        HashMap<String, Object> configs = new HashMap<String, Object>();
        configs.put("transactional.id", "some.id");
        configs.put("max.block.ms", 10000);
        configs.put("bootstrap.servers", "localhost:9000");
        MockTime time = new MockTime(1L);
        MetadataResponse initialUpdateResponse = TestUtils.metadataUpdateWith(1, Collections.singletonMap("topic", 1));
        ProducerMetadata metadata = this.newMetadata(0L, Long.MAX_VALUE);
        MockClient client = new MockClient((Time)time, (Metadata)metadata);
        client.updateMetadata(initialUpdateResponse);
        Node node = (Node)metadata.fetch().nodes().get(0);
        client.throttle(node, 5000L);
        client.prepareResponse((AbstractResponse)FindCoordinatorResponse.prepareResponse((Errors)Errors.NONE, (Node)this.host1));
        client.prepareResponse((AbstractResponse)this.initProducerIdResponse(1L, (short)5, Errors.NONE));
        try (KafkaProducer producer = new KafkaProducer(configs, (Serializer)new StringSerializer(), (Serializer)new StringSerializer(), metadata, (KafkaClient)client, null, (Time)time);){
            producer.initTransactions();
        }
    }

    @Test
    public void testAbortTransaction() {
        HashMap<String, String> configs = new HashMap<String, String>();
        configs.put("transactional.id", "some.id");
        configs.put("bootstrap.servers", "localhost:9000");
        MockTime time = new MockTime(1L);
        MetadataResponse initialUpdateResponse = TestUtils.metadataUpdateWith(1, Collections.singletonMap("topic", 1));
        ProducerMetadata metadata = this.newMetadata(0L, Long.MAX_VALUE);
        MockClient client = new MockClient((Time)time, (Metadata)metadata);
        client.updateMetadata(initialUpdateResponse);
        client.prepareResponse((AbstractResponse)FindCoordinatorResponse.prepareResponse((Errors)Errors.NONE, (Node)this.host1));
        client.prepareResponse((AbstractResponse)this.initProducerIdResponse(1L, (short)5, Errors.NONE));
        client.prepareResponse((AbstractResponse)this.endTxnResponse(Errors.NONE));
        try (KafkaProducer producer = new KafkaProducer(configs, (Serializer)new StringSerializer(), (Serializer)new StringSerializer(), metadata, (KafkaClient)client, null, (Time)time);){
            producer.initTransactions();
            producer.beginTransaction();
            producer.abortTransaction();
        }
    }

    @Test
    public void testSendTxnOffsetsWithGroupId() {
        HashMap<String, Object> configs = new HashMap<String, Object>();
        configs.put("transactional.id", "some.id");
        configs.put("max.block.ms", 10000);
        configs.put("bootstrap.servers", "localhost:9000");
        MockTime time = new MockTime(1L);
        MetadataResponse initialUpdateResponse = TestUtils.metadataUpdateWith(1, Collections.singletonMap("topic", 1));
        ProducerMetadata metadata = this.newMetadata(0L, Long.MAX_VALUE);
        MockClient client = new MockClient((Time)time, (Metadata)metadata);
        client.updateMetadata(initialUpdateResponse);
        Node node = (Node)metadata.fetch().nodes().get(0);
        client.throttle(node, 5000L);
        client.prepareResponse((AbstractResponse)FindCoordinatorResponse.prepareResponse((Errors)Errors.NONE, (Node)this.host1));
        client.prepareResponse((AbstractResponse)this.initProducerIdResponse(1L, (short)5, Errors.NONE));
        client.prepareResponse((AbstractResponse)this.addOffsetsToTxnResponse(Errors.NONE));
        client.prepareResponse((AbstractResponse)FindCoordinatorResponse.prepareResponse((Errors)Errors.NONE, (Node)this.host1));
        String groupId = "group";
        client.prepareResponse(request -> ((TxnOffsetCommitRequest)request).data.groupId().equals(groupId), (AbstractResponse)this.txnOffsetsCommitResponse(Collections.singletonMap(new TopicPartition("topic", 0), Errors.NONE)));
        client.prepareResponse((AbstractResponse)this.endTxnResponse(Errors.NONE));
        try (KafkaProducer producer = new KafkaProducer(configs, (Serializer)new StringSerializer(), (Serializer)new StringSerializer(), metadata, (KafkaClient)client, null, (Time)time);){
            producer.initTransactions();
            producer.beginTransaction();
            producer.sendOffsetsToTransaction(Collections.emptyMap(), groupId);
            producer.commitTransaction();
        }
    }

    @Test
    public void testSendTxnOffsetsWithGroupMetadata() {
        this.sendOffsetsWithGroupMetadata((short)3);
    }

    @Test
    public void testSendTxnOffsetsWithGroupMetadataDowngrade() {
        this.sendOffsetsWithGroupMetadata((short)2);
    }

    private void sendOffsetsWithGroupMetadata(short maxVersion) {
        HashMap<String, Object> configs = new HashMap<String, Object>();
        configs.put("transactional.id", "some.id");
        configs.put("max.block.ms", 10000);
        configs.put("bootstrap.servers", "localhost:9000");
        configs.put("internal.auto.downgrade.txn.commit", true);
        MockTime time = new MockTime(1L);
        MetadataResponse initialUpdateResponse = TestUtils.metadataUpdateWith(1, Collections.singletonMap("topic", 1));
        ProducerMetadata metadata = this.newMetadata(0L, Long.MAX_VALUE);
        MockClient client = new MockClient((Time)time, (Metadata)metadata);
        client.updateMetadata(initialUpdateResponse);
        client.setNodeApiVersions(NodeApiVersions.create((short)ApiKeys.TXN_OFFSET_COMMIT.id, (short)0, (short)maxVersion));
        Node node = (Node)metadata.fetch().nodes().get(0);
        client.throttle(node, 5000L);
        client.prepareResponse((AbstractResponse)FindCoordinatorResponse.prepareResponse((Errors)Errors.NONE, (Node)this.host1));
        client.prepareResponse((AbstractResponse)this.initProducerIdResponse(1L, (short)5, Errors.NONE));
        client.prepareResponse((AbstractResponse)this.addOffsetsToTxnResponse(Errors.NONE));
        client.prepareResponse((AbstractResponse)FindCoordinatorResponse.prepareResponse((Errors)Errors.NONE, (Node)this.host1));
        String groupId = "group";
        String memberId = "member";
        int generationId = 5;
        String groupInstanceId = "instance";
        client.prepareResponse(request -> {
            TxnOffsetCommitRequestData data = ((TxnOffsetCommitRequest)request).data;
            if (maxVersion < 3) {
                return data.groupId().equals(groupId) && data.memberId().equals("") && data.generationId() == -1 && data.groupInstanceId() == null;
            }
            return data.groupId().equals(groupId) && data.memberId().equals(memberId) && data.generationId() == generationId && data.groupInstanceId().equals(groupInstanceId);
        }, (AbstractResponse)this.txnOffsetsCommitResponse(Collections.singletonMap(new TopicPartition("topic", 0), Errors.NONE)));
        client.prepareResponse((AbstractResponse)this.endTxnResponse(Errors.NONE));
        try (KafkaProducer producer = new KafkaProducer(configs, (Serializer)new StringSerializer(), (Serializer)new StringSerializer(), metadata, (KafkaClient)client, null, (Time)time);){
            producer.initTransactions();
            producer.beginTransaction();
            ConsumerGroupMetadata groupMetadata = new ConsumerGroupMetadata(groupId, generationId, memberId, Optional.of(groupInstanceId));
            producer.sendOffsetsToTransaction(Collections.emptyMap(), groupMetadata);
            producer.commitTransaction();
        }
    }

    @Test
    public void testNullGroupMetadataInSendOffsets() {
        this.verifyInvalidGroupMetadata(null);
    }

    @Test
    public void testInvalidGenerationIdAndMemberIdCombinedInSendOffsets() {
        this.verifyInvalidGroupMetadata(new ConsumerGroupMetadata("group", 2, "", Optional.empty()));
    }

    private void verifyInvalidGroupMetadata(ConsumerGroupMetadata groupMetadata) {
        HashMap<String, Object> configs = new HashMap<String, Object>();
        configs.put("transactional.id", "some.id");
        configs.put("max.block.ms", 10000);
        configs.put("bootstrap.servers", "localhost:9000");
        MockTime time = new MockTime(1L);
        MetadataResponse initialUpdateResponse = TestUtils.metadataUpdateWith(1, Collections.singletonMap("topic", 1));
        ProducerMetadata metadata = this.newMetadata(0L, Long.MAX_VALUE);
        MockClient client = new MockClient((Time)time, (Metadata)metadata);
        client.updateMetadata(initialUpdateResponse);
        Node node = (Node)metadata.fetch().nodes().get(0);
        client.throttle(node, 5000L);
        client.prepareResponse((AbstractResponse)FindCoordinatorResponse.prepareResponse((Errors)Errors.NONE, (Node)this.host1));
        client.prepareResponse((AbstractResponse)this.initProducerIdResponse(1L, (short)5, Errors.NONE));
        try (KafkaProducer producer = new KafkaProducer(configs, (Serializer)new StringSerializer(), (Serializer)new StringSerializer(), metadata, (KafkaClient)client, null, (Time)time);){
            producer.initTransactions();
            producer.beginTransaction();
            Assert.assertThrows(IllegalArgumentException.class, () -> KafkaProducerTest.lambda$verifyInvalidGroupMetadata$21((Producer)producer, groupMetadata));
        }
    }

    private InitProducerIdResponse initProducerIdResponse(long producerId, short producerEpoch, Errors error) {
        InitProducerIdResponseData responseData = new InitProducerIdResponseData().setErrorCode(error.code()).setProducerEpoch(producerEpoch).setProducerId(producerId).setThrottleTimeMs(0);
        return new InitProducerIdResponse(responseData);
    }

    private AddOffsetsToTxnResponse addOffsetsToTxnResponse(Errors error) {
        return new AddOffsetsToTxnResponse(new AddOffsetsToTxnResponseData().setErrorCode(error.code()).setThrottleTimeMs(10));
    }

    private TxnOffsetCommitResponse txnOffsetsCommitResponse(Map<TopicPartition, Errors> errorMap) {
        return new TxnOffsetCommitResponse(10, errorMap);
    }

    private EndTxnResponse endTxnResponse(Errors error) {
        return new EndTxnResponse(new EndTxnResponseData().setErrorCode(error.code()).setThrottleTimeMs(0));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testOnlyCanExecuteCloseAfterInitTransactionsTimeout() {
        HashMap<String, Object> configs = new HashMap<String, Object>();
        configs.put("transactional.id", "bad-transaction");
        configs.put("max.block.ms", 5);
        configs.put("bootstrap.servers", "localhost:9000");
        MockTime time = new MockTime();
        MetadataResponse initialUpdateResponse = TestUtils.metadataUpdateWith(1, Collections.singletonMap("topic", 1));
        ProducerMetadata metadata = this.newMetadata(0L, Long.MAX_VALUE);
        metadata.updateWithCurrentRequestVersion(initialUpdateResponse, false, time.milliseconds());
        MockClient client = new MockClient((Time)time, (Metadata)metadata);
        KafkaProducer producer = new KafkaProducer(configs, (Serializer)new StringSerializer(), (Serializer)new StringSerializer(), metadata, (KafkaClient)client, null, (Time)time);
        Assert.assertThrows(org.apache.kafka.common.errors.TimeoutException.class, () -> ((Producer)producer).initTransactions());
        try {
            Assert.assertThrows(KafkaException.class, () -> ((Producer)producer).beginTransaction());
        }
        finally {
            producer.close(Duration.ofMillis(0L));
        }
    }

    @Test
    public void testSendToInvalidTopic() throws Exception {
        HashMap<String, String> configs = new HashMap<String, String>();
        configs.put("bootstrap.servers", "localhost:9000");
        configs.put("max.block.ms", "15000");
        MockTime time = new MockTime();
        MetadataResponse initialUpdateResponse = TestUtils.metadataUpdateWith(1, Collections.emptyMap());
        ProducerMetadata metadata = this.newMetadata(0L, Long.MAX_VALUE);
        metadata.updateWithCurrentRequestVersion(initialUpdateResponse, false, time.milliseconds());
        MockClient client = new MockClient((Time)time, (Metadata)metadata);
        KafkaProducer producer = new KafkaProducer(configs, (Serializer)new StringSerializer(), (Serializer)new StringSerializer(), metadata, (KafkaClient)client, null, (Time)time);
        String invalidTopicName = "topic abc";
        ProducerRecord record = new ProducerRecord(invalidTopicName, (Object)"HelloKafka");
        ArrayList<MetadataResponse.TopicMetadata> topicMetadata = new ArrayList<MetadataResponse.TopicMetadata>();
        topicMetadata.add(new MetadataResponse.TopicMetadata(Errors.INVALID_TOPIC_EXCEPTION, invalidTopicName, false, Collections.emptyList()));
        MetadataResponse updateResponse = MetadataResponse.prepareResponse(new ArrayList(initialUpdateResponse.brokers()), (String)initialUpdateResponse.clusterId(), (int)initialUpdateResponse.controller().id(), topicMetadata);
        client.prepareMetadataUpdate(updateResponse);
        Future future = producer.send(record);
        Assert.assertEquals((String)"Cluster has incorrect invalid topic list.", Collections.singleton(invalidTopicName), (Object)metadata.fetch().invalidTopics());
        TestUtils.assertFutureError(future, InvalidTopicException.class);
        producer.close(Duration.ofMillis(0L));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testCloseWhenWaitingForMetadataUpdate() throws InterruptedException {
        HashMap<String, Object> configs = new HashMap<String, Object>();
        configs.put("max.block.ms", Long.MAX_VALUE);
        configs.put("bootstrap.servers", "localhost:9000");
        String topicName = "test";
        Time time = Time.SYSTEM;
        MetadataResponse initialUpdateResponse = TestUtils.metadataUpdateWith(1, Collections.emptyMap());
        ProducerMetadata metadata = new ProducerMetadata(0L, Long.MAX_VALUE, Long.MAX_VALUE, new LogContext(), new ClusterResourceListeners(), time);
        metadata.updateWithCurrentRequestVersion(initialUpdateResponse, false, time.milliseconds());
        MockClient client = new MockClient(time, (Metadata)metadata);
        KafkaProducer producer = new KafkaProducer(configs, (Serializer)new StringSerializer(), (Serializer)new StringSerializer(), metadata, (KafkaClient)client, null, time);
        ExecutorService executor = Executors.newSingleThreadExecutor();
        AtomicReference sendException = new AtomicReference();
        try {
            executor.submit(() -> KafkaProducerTest.lambda$testCloseWhenWaitingForMetadataUpdate$22((Producer)producer, topicName, sendException));
            TestUtils.waitForCondition(() -> metadata.containsTopic(topicName), "Timeout when waiting for topic to be added to metadata");
            producer.close(Duration.ofMillis(0L));
            TestUtils.waitForCondition(() -> sendException.get() != null, "No producer exception within timeout");
            Assert.assertEquals(KafkaException.class, ((Exception)sendException.get()).getClass());
        }
        finally {
            executor.shutdownNow();
        }
    }

    @Test
    public void testTransactionalMethodThrowsWhenSenderClosed() {
        HashMap<String, String> configs = new HashMap<String, String>();
        configs.put("bootstrap.servers", "localhost:9000");
        configs.put("transactional.id", "this-is-a-transactional-id");
        MockTime time = new MockTime();
        MetadataResponse initialUpdateResponse = TestUtils.metadataUpdateWith(1, Collections.emptyMap());
        ProducerMetadata metadata = this.newMetadata(0L, Long.MAX_VALUE);
        metadata.updateWithCurrentRequestVersion(initialUpdateResponse, false, time.milliseconds());
        MockClient client = new MockClient((Time)time, (Metadata)metadata);
        KafkaProducer producer = new KafkaProducer(configs, (Serializer)new StringSerializer(), (Serializer)new StringSerializer(), metadata, (KafkaClient)client, null, (Time)time);
        producer.close();
        Assert.assertThrows(IllegalStateException.class, () -> ((Producer)producer).initTransactions());
    }

    @Test
    public void testCloseIsForcedOnPendingFindCoordinator() throws InterruptedException {
        HashMap<String, String> configs = new HashMap<String, String>();
        configs.put("bootstrap.servers", "localhost:9000");
        configs.put("transactional.id", "this-is-a-transactional-id");
        MockTime time = new MockTime();
        MetadataResponse initialUpdateResponse = TestUtils.metadataUpdateWith(1, Collections.singletonMap("testTopic", 1));
        ProducerMetadata metadata = this.newMetadata(0L, Long.MAX_VALUE);
        metadata.updateWithCurrentRequestVersion(initialUpdateResponse, false, time.milliseconds());
        MockClient client = new MockClient((Time)time, (Metadata)metadata);
        KafkaProducer producer = new KafkaProducer(configs, (Serializer)new StringSerializer(), (Serializer)new StringSerializer(), metadata, (KafkaClient)client, null, (Time)time);
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        CountDownLatch assertionDoneLatch = new CountDownLatch(1);
        executorService.submit(() -> KafkaProducerTest.lambda$testCloseIsForcedOnPendingFindCoordinator$25((Producer)producer, assertionDoneLatch));
        client.waitForRequests(1, 2000L);
        producer.close(Duration.ofMillis(1000L));
        assertionDoneLatch.await(5000L, TimeUnit.MILLISECONDS);
    }

    @Test
    public void testCloseIsForcedOnPendingInitProducerId() throws InterruptedException {
        HashMap<String, String> configs = new HashMap<String, String>();
        configs.put("bootstrap.servers", "localhost:9000");
        configs.put("transactional.id", "this-is-a-transactional-id");
        MockTime time = new MockTime();
        MetadataResponse initialUpdateResponse = TestUtils.metadataUpdateWith(1, Collections.singletonMap("testTopic", 1));
        ProducerMetadata metadata = this.newMetadata(0L, Long.MAX_VALUE);
        metadata.updateWithCurrentRequestVersion(initialUpdateResponse, false, time.milliseconds());
        MockClient client = new MockClient((Time)time, (Metadata)metadata);
        KafkaProducer producer = new KafkaProducer(configs, (Serializer)new StringSerializer(), (Serializer)new StringSerializer(), metadata, (KafkaClient)client, null, (Time)time);
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        CountDownLatch assertionDoneLatch = new CountDownLatch(1);
        client.prepareResponse((AbstractResponse)FindCoordinatorResponse.prepareResponse((Errors)Errors.NONE, (Node)this.host1));
        executorService.submit(() -> KafkaProducerTest.lambda$testCloseIsForcedOnPendingInitProducerId$26((Producer)producer, assertionDoneLatch));
        client.waitForRequests(1, 2000L);
        producer.close(Duration.ofMillis(1000L));
        assertionDoneLatch.await(5000L, TimeUnit.MILLISECONDS);
    }

    @Test
    public void testCloseIsForcedOnPendingAddOffsetRequest() throws InterruptedException {
        HashMap<String, String> configs = new HashMap<String, String>();
        configs.put("bootstrap.servers", "localhost:9000");
        configs.put("transactional.id", "this-is-a-transactional-id");
        MockTime time = new MockTime();
        MetadataResponse initialUpdateResponse = TestUtils.metadataUpdateWith(1, Collections.singletonMap("testTopic", 1));
        ProducerMetadata metadata = this.newMetadata(0L, Long.MAX_VALUE);
        metadata.updateWithCurrentRequestVersion(initialUpdateResponse, false, time.milliseconds());
        MockClient client = new MockClient((Time)time, (Metadata)metadata);
        KafkaProducer producer = new KafkaProducer(configs, (Serializer)new StringSerializer(), (Serializer)new StringSerializer(), metadata, (KafkaClient)client, null, (Time)time);
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        CountDownLatch assertionDoneLatch = new CountDownLatch(1);
        client.prepareResponse((AbstractResponse)FindCoordinatorResponse.prepareResponse((Errors)Errors.NONE, (Node)this.host1));
        executorService.submit(() -> KafkaProducerTest.lambda$testCloseIsForcedOnPendingAddOffsetRequest$27((Producer)producer, assertionDoneLatch));
        client.waitForRequests(1, 2000L);
        producer.close(Duration.ofMillis(1000L));
        assertionDoneLatch.await(5000L, TimeUnit.MILLISECONDS);
    }

    @Test
    public void testProducerJmxPrefix() throws Exception {
        HashMap<String, String> props = new HashMap<String, String>();
        props.put("bootstrap.servers", "localhost:9999");
        props.put("client.id", "client-1");
        KafkaProducer producer = new KafkaProducer(props, (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
        MBeanServer server = ManagementFactory.getPlatformMBeanServer();
        MetricName testMetricName = producer.metrics.metricName("test-metric", "grp1", "test metric");
        producer.metrics.addMetric(testMetricName, (Measurable)new Avg());
        Assert.assertNotNull((Object)server.getObjectInstance(new ObjectName("kafka.producer:type=grp1,client-id=client-1")));
        producer.close();
    }

    private ProducerMetadata newMetadata(long refreshBackoffMs, long expirationMs) {
        return new ProducerMetadata(refreshBackoffMs, expirationMs, 300000L, new LogContext(), new ClusterResourceListeners(), Time.SYSTEM);
    }

    @Test
    public void serializerShouldSeeGeneratedClientId() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9999");
        props.put("key.serializer", SerializerForClientId.class.getName());
        props.put("value.serializer", SerializerForClientId.class.getName());
        KafkaProducer producer = new KafkaProducer(props);
        Assert.assertEquals((long)2L, (long)SerializerForClientId.CLIENT_IDS.size());
        Assert.assertEquals((Object)SerializerForClientId.CLIENT_IDS.get(0), (Object)producer.getClientId());
        Assert.assertEquals((Object)SerializerForClientId.CLIENT_IDS.get(1), (Object)producer.getClientId());
        producer.close();
    }

    private static /* synthetic */ void lambda$testCloseIsForcedOnPendingAddOffsetRequest$27(Producer producer, CountDownLatch assertionDoneLatch) {
        Assert.assertThrows(KafkaException.class, () -> ((Producer)producer).initTransactions());
        assertionDoneLatch.countDown();
    }

    private static /* synthetic */ void lambda$testCloseIsForcedOnPendingInitProducerId$26(Producer producer, CountDownLatch assertionDoneLatch) {
        Assert.assertThrows(KafkaException.class, () -> ((Producer)producer).initTransactions());
        assertionDoneLatch.countDown();
    }

    private static /* synthetic */ void lambda$testCloseIsForcedOnPendingFindCoordinator$25(Producer producer, CountDownLatch assertionDoneLatch) {
        Assert.assertThrows(KafkaException.class, () -> ((Producer)producer).initTransactions());
        assertionDoneLatch.countDown();
    }

    private static /* synthetic */ void lambda$testCloseWhenWaitingForMetadataUpdate$22(Producer producer, String topicName, AtomicReference sendException) {
        try {
            producer.send(new ProducerRecord(topicName, (Object)"key", (Object)"value"));
            Assert.fail();
        }
        catch (Exception e) {
            sendException.set(e);
        }
    }

    private static /* synthetic */ void lambda$verifyInvalidGroupMetadata$21(Producer producer, ConsumerGroupMetadata groupMetadata) throws Throwable {
        producer.sendOffsetsToTransaction(Collections.emptyMap(), groupMetadata);
    }

    private static /* synthetic */ void lambda$closeWithNegativeTimestampShouldThrow$12(Producer producer) throws Throwable {
        producer.close(Duration.ofMillis(-100L));
    }

    private static /* synthetic */ void lambda$shouldCloseProperlyAndThrowIfInterrupted$1(Producer producer, AtomicReference closeException) {
        producer.send(new ProducerRecord("topic", (Object)"key", (Object)"value"));
        try {
            producer.close();
            Assert.fail((String)"Close should block and throw.");
        }
        catch (Exception e) {
            closeException.set(e);
        }
    }

    public static class SerializerForClientId
    implements Serializer<byte[]> {
        static final List<String> CLIENT_IDS = new ArrayList<String>();

        public void configure(Map<String, ?> configs, boolean isKey) {
            CLIENT_IDS.add(configs.get("client.id").toString());
        }

        public byte[] serialize(String topic, byte[] data) {
            return data;
        }
    }
}

