package org.apache.kafka.clients.producer;

import java.lang.management.ManagementFactory;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.LeastLoadedNode;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.internals.FutureRecordMetadata;
import org.apache.kafka.clients.producer.internals.ProduceRequestResult;
import org.apache.kafka.clients.producer.internals.ProducerInterceptors;
import org.apache.kafka.clients.producer.internals.ProducerMetadata;
import org.apache.kafka.clients.producer.internals.RecordAccumulator;
import org.apache.kafka.clients.producer.internals.Sender;
import org.apache.kafka.clients.producer.internals.TransactionManager;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.AbstractConfigTest;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.message.AddOffsetsToTxnResponseData;
import org.apache.kafka.common.message.EndTxnResponseData;
import org.apache.kafka.common.message.InitProducerIdResponseData;
import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.AddOffsetsToTxnResponse;
import org.apache.kafka.common.requests.EndTxnResponse;
import org.apache.kafka.common.requests.FindCoordinatorRequest;
import org.apache.kafka.common.requests.FindCoordinatorResponse;
import org.apache.kafka.common.requests.InitProducerIdResponse;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.requests.RequestTestUtils;
import org.apache.kafka.common.requests.TxnOffsetCommitRequest;
import org.apache.kafka.common.requests.TxnOffsetCommitResponse;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter;
import org.apache.kafka.common.telemetry.internals.ClientTelemetrySender;
import org.apache.kafka.common.utils.KafkaThread;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.test.MockMetricsReporter;
import org.apache.kafka.test.MockPartitioner;
import org.apache.kafka.test.MockProducerInterceptor;
import org.apache.kafka.test.MockSerializer;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentMatchers;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.mockito.internal.stubbing.answers.CallsRealMethods;

/* loaded from: input_file:org/apache/kafka/clients/producer/KafkaProducerTest.class */
public class KafkaProducerTest {
    private final String topic = "topic";
    private final Collection<Node> nodes = Collections.singletonList(NODE);
    private final Cluster emptyCluster = new Cluster((String) null, this.nodes, Collections.emptySet(), Collections.emptySet(), Collections.emptySet());
    private final Cluster onePartitionCluster = new Cluster("dummy", this.nodes, Collections.singletonList(new PartitionInfo("topic", 0, (Node) null, (Node[]) null, (Node[]) null)), Collections.emptySet(), Collections.emptySet());
    private final Cluster threePartitionCluster = new Cluster("dummy", this.nodes, Arrays.asList(new PartitionInfo("topic", 0, (Node) null, (Node[]) null, (Node[]) null), new PartitionInfo("topic", 1, (Node) null, (Node[]) null, (Node[]) null), new PartitionInfo("topic", 2, (Node) null, (Node[]) null, (Node[]) null)), Collections.emptySet(), Collections.emptySet());
    private TestInfo testInfo;
    private static final int DEFAULT_METADATA_IDLE_MS = 300000;
    private static final Node NODE = new Node(0, "host1", 1000);
    private static final List<String> CLIENT_IDS = new ArrayList();

    /* loaded from: input_file:org/apache/kafka/clients/producer/KafkaProducerTest$BuggyPartitioner.class */
    public static class BuggyPartitioner implements Partitioner {
        public int partition(String str, Object obj, byte[] bArr, Object obj2, byte[] bArr2, Cluster cluster) {
            return -1;
        }

        public void close() {
        }

        public void configure(Map<String, ?> map) {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/clients/producer/KafkaProducerTest$KafkaProducerTestContext.class */
    public static class KafkaProducerTestContext<T> {
        private final TestInfo testInfo;
        private final Map<String, Object> configs;
        private final Serializer<T> serializer;
        private final Partitioner partitioner;
        private final KafkaThread ioThread;
        private final List<ProducerInterceptor<T, T>> interceptors;
        private ProducerMetadata metadata;
        private RecordAccumulator accumulator;
        private Sender sender;
        private TransactionManager transactionManager;
        private Time time;
        private final Metrics metrics;

        public KafkaProducerTestContext(TestInfo testInfo, Serializer<T> serializer) {
            this(testInfo, new HashMap(), serializer);
        }

        public KafkaProducerTestContext(TestInfo testInfo, Map<String, Object> map, Serializer<T> serializer) {
            this.partitioner = (Partitioner) Mockito.mock(Partitioner.class);
            this.ioThread = (KafkaThread) Mockito.mock(KafkaThread.class);
            this.interceptors = new ArrayList();
            this.metadata = (ProducerMetadata) Mockito.mock(ProducerMetadata.class);
            this.accumulator = (RecordAccumulator) Mockito.mock(RecordAccumulator.class);
            this.sender = (Sender) Mockito.mock(Sender.class);
            this.transactionManager = (TransactionManager) Mockito.mock(TransactionManager.class);
            this.time = new MockTime();
            this.metrics = new Metrics(this.time);
            this.testInfo = testInfo;
            this.configs = map;
            this.serializer = serializer;
            if (map.containsKey("bootstrap.servers")) {
                return;
            }
            map.put("bootstrap.servers", "localhost:9999");
        }

        public KafkaProducerTestContext<T> setProducerMetadata(ProducerMetadata producerMetadata) {
            this.metadata = producerMetadata;
            return this;
        }

        public KafkaProducerTestContext<T> setAccumulator(RecordAccumulator recordAccumulator) {
            this.accumulator = recordAccumulator;
            return this;
        }

        public KafkaProducerTestContext<T> setSender(Sender sender) {
            this.sender = sender;
            return this;
        }

        public KafkaProducerTestContext<T> setTransactionManager(TransactionManager transactionManager) {
            this.transactionManager = transactionManager;
            return this;
        }

        public KafkaProducerTestContext<T> addInterceptor(ProducerInterceptor<T, T> producerInterceptor) {
            this.interceptors.add(producerInterceptor);
            return this;
        }

        public KafkaProducerTestContext<T> setTime(Time time) {
            this.time = time;
            return this;
        }

        public KafkaProducer<T, T> newKafkaProducer() {
            return new KafkaProducer<>(new ProducerConfig(ProducerConfig.appendSerializerToConfig(this.configs, this.serializer, this.serializer)), new LogContext("[Producer test=" + this.testInfo.getDisplayName() + "] "), this.metrics, this.serializer, this.serializer, this.metadata, this.accumulator, this.transactionManager, this.sender, new ProducerInterceptors(this.interceptors), this.partitioner, this.time, this.ioThread, Optional.empty());
        }
    }

    /* loaded from: input_file:org/apache/kafka/clients/producer/KafkaProducerTest$PartitionerForClientId.class */
    public static class PartitionerForClientId implements Partitioner {
        public int partition(String str, Object obj, byte[] bArr, Object obj2, byte[] bArr2, Cluster cluster) {
            return 0;
        }

        public void close() {
        }

        public void configure(Map<String, ?> map) {
            KafkaProducerTest.CLIENT_IDS.add(map.get("client.id").toString());
        }
    }

    /* loaded from: input_file:org/apache/kafka/clients/producer/KafkaProducerTest$ProducerInterceptorForClientId.class */
    public static class ProducerInterceptorForClientId implements ProducerInterceptor<byte[], byte[]> {
        public ProducerRecord<byte[], byte[]> onSend(ProducerRecord<byte[], byte[]> producerRecord) {
            return producerRecord;
        }

        public void onAcknowledgement(RecordMetadata recordMetadata, Exception exc) {
        }

        public void close() {
        }

        public void configure(Map<String, ?> map) {
            KafkaProducerTest.CLIENT_IDS.add(map.get("client.id").toString());
        }
    }

    /* loaded from: input_file:org/apache/kafka/clients/producer/KafkaProducerTest$SerializerForClientId.class */
    public static class SerializerForClientId implements Serializer<byte[]> {
        public void configure(Map<String, ?> map, boolean z) {
            KafkaProducerTest.CLIENT_IDS.add(map.get("client.id").toString());
        }

        public byte[] serialize(String str, byte[] bArr) {
            return bArr;
        }
    }

    private static <K, V> KafkaProducer<K, V> kafkaProducer(Map<String, Object> map, Serializer<K> serializer, Serializer<V> serializer2, ProducerMetadata producerMetadata, KafkaClient kafkaClient, ProducerInterceptors<K, V> producerInterceptors, Time time) {
        return new KafkaProducer<>(new ProducerConfig(ProducerConfig.appendSerializerToConfig(map, serializer, serializer2)), serializer, serializer2, producerMetadata, kafkaClient, producerInterceptors, time);
    }

    @BeforeEach
    public void setup(TestInfo testInfo) {
        this.testInfo = testInfo;
    }

    @Test
    public void testOverwriteAcksAndRetriesForIdempotentProducers() {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9999");
        properties.setProperty("transactional.id", "transactionalId");
        properties.setProperty("key.serializer", StringSerializer.class.getName());
        properties.setProperty("value.serializer", StringSerializer.class.getName());
        ProducerConfig producerConfig = new ProducerConfig(properties);
        Assertions.assertTrue(producerConfig.getBoolean("enable.idempotence").booleanValue());
        Assertions.assertTrue(Stream.of((Object[]) new String[]{"-1", "all"}).anyMatch(str -> {
            return str.equalsIgnoreCase(producerConfig.getString("acks"));
        }));
        Assertions.assertEquals(producerConfig.getInt("retries").intValue(), Integer.MAX_VALUE);
        Assertions.assertTrue(producerConfig.getString("client.id").equalsIgnoreCase("producer-" + producerConfig.getString("transactional.id")));
    }

    @Test
    public void testAcksAndIdempotenceForIdempotentProducers() {
        final Properties properties = new Properties() { // from class: org.apache.kafka.clients.producer.KafkaProducerTest.1
            {
                setProperty("bootstrap.servers", "localhost:9999");
                setProperty("key.serializer", StringSerializer.class.getName());
                setProperty("value.serializer", StringSerializer.class.getName());
            }
        };
        ProducerConfig producerConfig = new ProducerConfig(new Properties() { // from class: org.apache.kafka.clients.producer.KafkaProducerTest.2
            {
                putAll(properties);
                setProperty("acks", "0");
                setProperty("enable.idempotence", "false");
            }
        });
        Assertions.assertFalse(producerConfig.getBoolean("enable.idempotence").booleanValue(), "idempotence should be overwritten");
        Assertions.assertEquals("0", producerConfig.getString("acks"), "acks should be overwritten");
        ProducerConfig producerConfig2 = new ProducerConfig(new Properties() { // from class: org.apache.kafka.clients.producer.KafkaProducerTest.3
            {
                putAll(properties);
                setProperty("transactional.id", "transactionalId");
            }
        });
        Assertions.assertTrue(producerConfig2.getBoolean("enable.idempotence").booleanValue(), "idempotence should be set with the default value");
        Assertions.assertEquals("-1", producerConfig2.getString("acks"), "acks should be set with the default value");
        ProducerConfig producerConfig3 = new ProducerConfig(new Properties() { // from class: org.apache.kafka.clients.producer.KafkaProducerTest.4
            {
                putAll(properties);
                setProperty("acks", "all");
                setProperty("enable.idempotence", "false");
            }
        });
        Assertions.assertFalse(producerConfig3.getBoolean("enable.idempotence").booleanValue(), "idempotence should be overwritten");
        Assertions.assertEquals("-1", producerConfig3.getString("acks"), "acks should be overwritten");
        ProducerConfig producerConfig4 = new ProducerConfig(new Properties() { // from class: org.apache.kafka.clients.producer.KafkaProducerTest.5
            {
                putAll(properties);
                setProperty("acks", "0");
            }
        });
        Assertions.assertFalse(producerConfig4.getBoolean("enable.idempotence").booleanValue(), "idempotence should be disabled when acks not set to all and `enable.idempotence` config is unset.");
        Assertions.assertEquals("0", producerConfig4.getString("acks"), "acks should be set with overridden value");
        ProducerConfig producerConfig5 = new ProducerConfig(new Properties() { // from class: org.apache.kafka.clients.producer.KafkaProducerTest.6
            {
                putAll(properties);
                setProperty("acks", "1");
            }
        });
        Assertions.assertFalse(producerConfig5.getBoolean("enable.idempotence").booleanValue(), "idempotence should be disabled when acks not set to all and `enable.idempotence` config is unset.");
        Assertions.assertEquals("1", producerConfig5.getString("acks"), "acks should be set with overridden value");
        Properties properties2 = new Properties() { // from class: org.apache.kafka.clients.producer.KafkaProducerTest.7
            {
                putAll(properties);
                setProperty("acks", "0");
                setProperty("enable.idempotence", "false");
                setProperty("transactional.id", "transactionalId");
            }
        };
        Assertions.assertThrows(ConfigException.class, () -> {
            new ProducerConfig(properties2);
        }, "Cannot set a transactional.id without also enabling idempotence");
        Properties properties3 = new Properties() { // from class: org.apache.kafka.clients.producer.KafkaProducerTest.8
            {
                putAll(properties);
                setProperty("acks", "1");
                setProperty("enable.idempotence", "true");
            }
        };
        Assertions.assertThrows(ConfigException.class, () -> {
            new ProducerConfig(properties3);
        }, "Must set acks to all in order to use the idempotent producer");
        Properties properties4 = new Properties() { // from class: org.apache.kafka.clients.producer.KafkaProducerTest.9
            {
                putAll(properties);
                setProperty("acks", "0");
                setProperty("transactional.id", "transactionalId");
            }
        };
        Assertions.assertThrows(ConfigException.class, () -> {
            new ProducerConfig(properties4);
        }, "Must set acks to all when using the transactional producer.");
    }

    @Test
    public void testRetriesAndIdempotenceForIdempotentProducers() {
        final Properties properties = new Properties() { // from class: org.apache.kafka.clients.producer.KafkaProducerTest.10
            {
                setProperty("bootstrap.servers", "localhost:9999");
                setProperty("key.serializer", StringSerializer.class.getName());
                setProperty("value.serializer", StringSerializer.class.getName());
            }
        };
        ProducerConfig producerConfig = new ProducerConfig(new Properties() { // from class: org.apache.kafka.clients.producer.KafkaProducerTest.11
            {
                putAll(properties);
                setProperty("retries", "0");
                setProperty("enable.idempotence", "false");
            }
        });
        Assertions.assertFalse(producerConfig.getBoolean("enable.idempotence").booleanValue(), "idempotence should be overwritten");
        Assertions.assertEquals(0, producerConfig.getInt("retries"), "retries should be overwritten");
        ProducerConfig producerConfig2 = new ProducerConfig(new Properties() { // from class: org.apache.kafka.clients.producer.KafkaProducerTest.12
            {
                putAll(properties);
                setProperty("retries", "0");
            }
        });
        Assertions.assertFalse(producerConfig2.getBoolean("enable.idempotence").booleanValue(), "idempotence should be disabled when retries set to 0 and `enable.idempotence` config is unset.");
        Assertions.assertEquals(0, producerConfig2.getInt("retries"), "retries should be set with overridden value");
        Properties properties2 = new Properties() { // from class: org.apache.kafka.clients.producer.KafkaProducerTest.13
            {
                putAll(properties);
                setProperty("retries", "0");
                setProperty("enable.idempotence", "false");
                setProperty("transactional.id", "transactionalId");
            }
        };
        Assertions.assertThrows(ConfigException.class, () -> {
            new ProducerConfig(properties2);
        }, "Cannot set a transactional.id without also enabling idempotence");
        Properties properties3 = new Properties() { // from class: org.apache.kafka.clients.producer.KafkaProducerTest.14
            {
                putAll(properties);
                setProperty("retries", "0");
                setProperty("enable.idempotence", "true");
            }
        };
        Assertions.assertThrows(ConfigException.class, () -> {
            new ProducerConfig(properties3);
        }, "Must set retries to non-zero when using the idempotent producer.");
        Properties properties4 = new Properties() { // from class: org.apache.kafka.clients.producer.KafkaProducerTest.15
            {
                putAll(properties);
                setProperty("retries", "0");
                setProperty("transactional.id", "transactionalId");
            }
        };
        Assertions.assertThrows(ConfigException.class, () -> {
            new ProducerConfig(properties4);
        }, "Must set retries to non-zero when using the transactional producer.");
    }

    @Test
    public void testInflightRequestsAndIdempotenceForIdempotentProducers() {
        final Properties properties = new Properties() { // from class: org.apache.kafka.clients.producer.KafkaProducerTest.16
            {
                setProperty("bootstrap.servers", "localhost:9999");
                setProperty("key.serializer", StringSerializer.class.getName());
                setProperty("value.serializer", StringSerializer.class.getName());
            }
        };
        ProducerConfig producerConfig = new ProducerConfig(new Properties() { // from class: org.apache.kafka.clients.producer.KafkaProducerTest.17
            {
                putAll(properties);
                setProperty("max.in.flight.requests.per.connection", "6");
                setProperty("enable.idempotence", "false");
            }
        });
        Assertions.assertFalse(producerConfig.getBoolean("enable.idempotence").booleanValue(), "idempotence should be overwritten");
        Assertions.assertEquals(6, producerConfig.getInt("max.in.flight.requests.per.connection"), "max.in.flight.requests.per.connection should be overwritten");
        ProducerConfig producerConfig2 = new ProducerConfig(new Properties() { // from class: org.apache.kafka.clients.producer.KafkaProducerTest.18
            {
                putAll(properties);
                setProperty("max.in.flight.requests.per.connection", "6");
            }
        });
        Assertions.assertFalse(producerConfig2.getBoolean("enable.idempotence").booleanValue(), "idempotence should be disabled when `max.in.flight.requests.per.connection` is greater than 5 and `enable.idempotence` config is unset.");
        Assertions.assertEquals(6, producerConfig2.getInt("max.in.flight.requests.per.connection"), "`max.in.flight.requests.per.connection` should be set with overridden value");
        Properties properties2 = new Properties() { // from class: org.apache.kafka.clients.producer.KafkaProducerTest.19
            {
                putAll(properties);
                setProperty("max.in.flight.requests.per.connection", "5");
                setProperty("enable.idempotence", "false");
                setProperty("transactional.id", "transactionalId");
            }
        };
        Assertions.assertThrows(ConfigException.class, () -> {
            new ProducerConfig(properties2);
        }, "Cannot set a transactional.id without also enabling idempotence");
        Properties properties3 = new Properties() { // from class: org.apache.kafka.clients.producer.KafkaProducerTest.20
            {
                putAll(properties);
                setProperty("max.in.flight.requests.per.connection", "6");
                setProperty("enable.idempotence", "true");
            }
        };
        Assertions.assertThrows(ConfigException.class, () -> {
            new ProducerConfig(properties3);
        }, "Must set max.in.flight.requests.per.connection to at most 5 when using the idempotent producer.");
        Properties properties4 = new Properties() { // from class: org.apache.kafka.clients.producer.KafkaProducerTest.21
            {
                putAll(properties);
                setProperty("max.in.flight.requests.per.connection", "6");
                setProperty("transactional.id", "transactionalId");
            }
        };
        Assertions.assertThrows(ConfigException.class, () -> {
            new ProducerConfig(properties4);
        }, "Must set retries to non-zero when using the idempotent producer.");
    }

    @Test
    public void testMetricsReporterAutoGeneratedClientId() {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9999");
        properties.setProperty(AbstractConfigTest.TestConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
        KafkaProducer kafkaProducer = new KafkaProducer(properties, new StringSerializer(), new StringSerializer());
        Assertions.assertEquals(3, kafkaProducer.metrics.reporters().size());
        Assertions.assertEquals(kafkaProducer.getClientId(), ((MockMetricsReporter) kafkaProducer.metrics.reporters().stream().filter(metricsReporter -> {
            return metricsReporter instanceof MockMetricsReporter;
        }).findFirst().get()).clientId);
        kafkaProducer.close();
    }

    @Test
    public void testDisableJmxAndClientTelemetryReporter() {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9999");
        properties.setProperty("auto.include.jmx.reporter", "false");
        properties.setProperty("enable.metrics.push", "false");
        KafkaProducer kafkaProducer = new KafkaProducer(properties, new StringSerializer(), new StringSerializer());
        Assertions.assertTrue(kafkaProducer.metrics.reporters().isEmpty());
        kafkaProducer.close();
    }

    @Test
    public void testExplicitlyOnlyEnableJmxReporter() {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9999");
        properties.setProperty(AbstractConfigTest.TestConfig.METRIC_REPORTER_CLASSES_CONFIG, "org.apache.kafka.common.metrics.JmxReporter");
        properties.setProperty("enable.metrics.push", "false");
        KafkaProducer kafkaProducer = new KafkaProducer(properties, new StringSerializer(), new StringSerializer());
        Assertions.assertEquals(1, kafkaProducer.metrics.reporters().size());
        Assertions.assertInstanceOf(JmxReporter.class, kafkaProducer.metrics.reporters().get(0));
        kafkaProducer.close();
    }

    @Test
    public void testExplicitlyOnlyEnableClientTelemetryReporter() {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9999");
        properties.setProperty("auto.include.jmx.reporter", "false");
        KafkaProducer kafkaProducer = new KafkaProducer(properties, new StringSerializer(), new StringSerializer());
        Assertions.assertEquals(1, kafkaProducer.metrics.reporters().size());
        Assertions.assertInstanceOf(ClientTelemetryReporter.class, kafkaProducer.metrics.reporters().get(0));
        kafkaProducer.close();
    }

    @Test
    public void testConstructorWithSerializers() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:9000");
        new KafkaProducer(properties, new ByteArraySerializer(), new ByteArraySerializer()).close();
    }

    @Test
    public void testNoSerializerProvided() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:9000");
        Assertions.assertThrows(ConfigException.class, () -> {
            new KafkaProducer(properties);
        });
        HashMap hashMap = new HashMap();
        hashMap.put("bootstrap.servers", "localhost:9999");
        Assertions.assertThrows(ConfigException.class, () -> {
            new KafkaProducer(hashMap);
        });
    }

    @Test
    public void testConstructorFailureCloseResource() {
        Properties properties = new Properties();
        properties.setProperty("client.id", "testConstructorClose");
        properties.setProperty("bootstrap.servers", "some.invalid.hostname.foo.bar.local:9999");
        properties.setProperty(AbstractConfigTest.TestConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
        int i = MockMetricsReporter.INIT_COUNT.get();
        int i2 = MockMetricsReporter.CLOSE_COUNT.get();
        try {
            KafkaProducer kafkaProducer = new KafkaProducer(properties, new ByteArraySerializer(), new ByteArraySerializer());
            Throwable th = null;
            try {
                try {
                    Assertions.fail("should have caught an exception and returned");
                    if (kafkaProducer != null) {
                        if (0 != 0) {
                            try {
                                kafkaProducer.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            kafkaProducer.close();
                        }
                    }
                } catch (Throwable th3) {
                    th = th3;
                    throw th3;
                }
            } finally {
            }
        } catch (KafkaException e) {
            Assertions.assertEquals(i + 1, MockMetricsReporter.INIT_COUNT.get());
            Assertions.assertEquals(i2 + 1, MockMetricsReporter.CLOSE_COUNT.get());
            Assertions.assertEquals("Failed to construct kafka producer", e.getMessage());
        }
    }

    @Test
    public void testConstructorWithNotStringKey() {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9999");
        properties.put(1, "not string key");
        ConfigException assertThrows = Assertions.assertThrows(ConfigException.class, () -> {
            new KafkaProducer(properties, new StringSerializer(), new StringSerializer());
        });
        Assertions.assertTrue(assertThrows.getMessage().contains("not string key"), "Unexpected exception message: " + assertThrows.getMessage());
    }

    @Test
    public void testSerializerClose() {
        HashMap hashMap = new HashMap();
        hashMap.put("client.id", "testConstructorClose");
        hashMap.put("bootstrap.servers", "localhost:9999");
        hashMap.put(AbstractConfigTest.TestConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
        hashMap.put("security.protocol", "PLAINTEXT");
        int i = MockSerializer.INIT_COUNT.get();
        int i2 = MockSerializer.CLOSE_COUNT.get();
        KafkaProducer kafkaProducer = new KafkaProducer(hashMap, new MockSerializer(), new MockSerializer());
        Assertions.assertEquals(i + 2, MockSerializer.INIT_COUNT.get());
        Assertions.assertEquals(i2, MockSerializer.CLOSE_COUNT.get());
        kafkaProducer.close();
        Assertions.assertEquals(i + 2, MockSerializer.INIT_COUNT.get());
        Assertions.assertEquals(i2 + 2, MockSerializer.CLOSE_COUNT.get());
    }

    @Test
    public void testInterceptorConstructClose() {
        try {
            Properties properties = new Properties();
            properties.setProperty("bootstrap.servers", "localhost:9999");
            properties.setProperty("interceptor.classes", MockProducerInterceptor.class.getName());
            properties.setProperty(MockProducerInterceptor.APPEND_STRING_PROP, "something");
            KafkaProducer kafkaProducer = new KafkaProducer(properties, new StringSerializer(), new StringSerializer());
            Assertions.assertEquals(1, MockProducerInterceptor.INIT_COUNT.get());
            Assertions.assertEquals(0, MockProducerInterceptor.CLOSE_COUNT.get());
            Assertions.assertNull(MockProducerInterceptor.CLUSTER_META.get());
            kafkaProducer.close();
            Assertions.assertEquals(1, MockProducerInterceptor.INIT_COUNT.get());
            Assertions.assertEquals(1, MockProducerInterceptor.CLOSE_COUNT.get());
        } finally {
            MockProducerInterceptor.resetCounters();
        }
    }

    @Test
    public void testInterceptorConstructorConfigurationWithExceptionShouldCloseRemainingInstances() {
        try {
            Properties properties = new Properties();
            properties.setProperty("bootstrap.servers", "localhost:9999");
            properties.setProperty("interceptor.classes", MockProducerInterceptor.class.getName() + ", " + MockProducerInterceptor.class.getName() + ", " + MockProducerInterceptor.class.getName());
            properties.setProperty(MockProducerInterceptor.APPEND_STRING_PROP, "something");
            MockProducerInterceptor.setThrowOnConfigExceptionThreshold(3);
            Assertions.assertThrows(KafkaException.class, () -> {
                new KafkaProducer(properties, new StringSerializer(), new StringSerializer());
            });
            Assertions.assertEquals(3, MockProducerInterceptor.CONFIG_COUNT.get());
            Assertions.assertEquals(3, MockProducerInterceptor.CLOSE_COUNT.get());
        } finally {
            MockProducerInterceptor.resetCounters();
        }
    }

    @Test
    public void testPartitionerClose() {
        try {
            Properties properties = new Properties();
            properties.setProperty("bootstrap.servers", "localhost:9999");
            MockPartitioner.resetCounters();
            properties.setProperty("partitioner.class", MockPartitioner.class.getName());
            KafkaProducer kafkaProducer = new KafkaProducer(properties, new StringSerializer(), new StringSerializer());
            Assertions.assertEquals(1, MockPartitioner.INIT_COUNT.get());
            Assertions.assertEquals(0, MockPartitioner.CLOSE_COUNT.get());
            kafkaProducer.close();
            Assertions.assertEquals(1, MockPartitioner.INIT_COUNT.get());
            Assertions.assertEquals(1, MockPartitioner.CLOSE_COUNT.get());
        } finally {
            MockPartitioner.resetCounters();
        }
    }

    @Test
    public void shouldCloseProperlyAndThrowIfInterrupted() throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put("bootstrap.servers", "localhost:9999");
        hashMap.put("partitioner.class", MockPartitioner.class.getName());
        hashMap.put("batch.size", "1");
        MockTime mockTime = new MockTime();
        MetadataResponse metadataUpdateWith = RequestTestUtils.metadataUpdateWith(1, Collections.singletonMap("topic", 1));
        ProducerMetadata newMetadata = newMetadata(0L, 0L, Long.MAX_VALUE);
        MockClient mockClient = new MockClient((Time) mockTime, (Metadata) newMetadata);
        mockClient.updateMetadata(metadataUpdateWith);
        KafkaProducer kafkaProducer = kafkaProducer(hashMap, new StringSerializer(), new StringSerializer(), newMetadata, mockClient, null, mockTime);
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        AtomicReference atomicReference = new AtomicReference();
        try {
            Future<?> submit = newSingleThreadExecutor.submit(() -> {
                kafkaProducer.send(new ProducerRecord("topic", "key", "value"));
                try {
                    kafkaProducer.close();
                    Assertions.fail("Close should block and throw.");
                } catch (Exception e) {
                    atomicReference.set(e);
                }
            });
            try {
                submit.get(100L, TimeUnit.MILLISECONDS);
                Assertions.fail("Close completed without waiting for send");
            } catch (TimeoutException e) {
            }
            mockClient.waitForRequests(1, 1000L);
            Assertions.assertTrue(submit.cancel(true), "Close terminated prematurely");
            TestUtils.waitForCondition(() -> {
                return atomicReference.get() != null;
            }, "InterruptException did not occur within timeout.");
            Assertions.assertInstanceOf(InterruptException.class, atomicReference.get(), "Expected exception not thrown " + atomicReference);
            newSingleThreadExecutor.shutdownNow();
        } catch (Throwable th) {
            newSingleThreadExecutor.shutdownNow();
            throw th;
        }
    }

    @Test
    public void testOsDefaultSocketBufferSizes() {
        HashMap hashMap = new HashMap();
        hashMap.put("bootstrap.servers", "localhost:9999");
        hashMap.put("send.buffer.bytes", -1);
        hashMap.put("receive.buffer.bytes", -1);
        new KafkaProducer(hashMap, new ByteArraySerializer(), new ByteArraySerializer()).close();
    }

    @Test
    public void testInvalidSocketSendBufferSize() {
        HashMap hashMap = new HashMap();
        hashMap.put("bootstrap.servers", "localhost:9999");
        hashMap.put("send.buffer.bytes", -2);
        Assertions.assertThrows(KafkaException.class, () -> {
            new KafkaProducer(hashMap, new ByteArraySerializer(), new ByteArraySerializer());
        });
    }

    @Test
    public void testInvalidSocketReceiveBufferSize() {
        HashMap hashMap = new HashMap();
        hashMap.put("bootstrap.servers", "localhost:9999");
        hashMap.put("receive.buffer.bytes", -2);
        Assertions.assertThrows(KafkaException.class, () -> {
            new KafkaProducer(hashMap, new ByteArraySerializer(), new ByteArraySerializer());
        });
    }

    private static KafkaProducer<String, String> producerWithOverrideNewSender(Map<String, Object> map, ProducerMetadata producerMetadata) {
        return producerWithOverrideNewSender(map, producerMetadata, Time.SYSTEM);
    }

    private static KafkaProducer<String, String> producerWithOverrideNewSender(Map<String, Object> map, ProducerMetadata producerMetadata, Time time) {
        return new KafkaProducer<String, String>(new ProducerConfig(ProducerConfig.appendSerializerToConfig(map, new StringSerializer(), new StringSerializer())), new StringSerializer(), new StringSerializer(), producerMetadata, new MockClient(Time.SYSTEM, producerMetadata) { // from class: org.apache.kafka.clients.producer.KafkaProducerTest.22
            @Override // org.apache.kafka.clients.MockClient
            public LeastLoadedNode leastLoadedNode(long j) {
                return new LeastLoadedNode(KafkaProducerTest.NODE, true);
            }
        }, null, time) { // from class: org.apache.kafka.clients.producer.KafkaProducerTest.23
            Sender newSender(LogContext logContext, KafkaClient kafkaClient, ProducerMetadata producerMetadata2) {
                return super.newSender(logContext, kafkaClient, KafkaProducerTest.newMetadata(0L, 0L, 100000L));
            }
        };
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testMetadataFetch(boolean z) throws InterruptedException {
        HashMap hashMap = new HashMap();
        hashMap.put("bootstrap.servers", "localhost:9999");
        hashMap.put("enable.idempotence", Boolean.valueOf(z));
        ProducerMetadata producerMetadata = (ProducerMetadata) Mockito.mock(ProducerMetadata.class);
        Mockito.when(producerMetadata.fetch()).thenReturn(this.emptyCluster, new Cluster[]{this.emptyCluster, this.emptyCluster, this.emptyCluster, this.onePartitionCluster});
        KafkaProducer<String, String> producerWithOverrideNewSender = producerWithOverrideNewSender(hashMap, producerMetadata);
        ProducerRecord producerRecord = new ProducerRecord("topic", "value");
        producerWithOverrideNewSender.send(producerRecord);
        ((ProducerMetadata) Mockito.verify(producerMetadata, Mockito.times(4))).requestUpdateForTopic("topic");
        ((ProducerMetadata) Mockito.verify(producerMetadata, Mockito.times(4))).awaitUpdate(ArgumentMatchers.anyInt(), ArgumentMatchers.anyLong());
        ((ProducerMetadata) Mockito.verify(producerMetadata, Mockito.times(5))).fetch();
        producerWithOverrideNewSender.send(producerRecord, (Callback) null);
        ((ProducerMetadata) Mockito.verify(producerMetadata, Mockito.times(4))).requestUpdateForTopic("topic");
        ((ProducerMetadata) Mockito.verify(producerMetadata, Mockito.times(4))).awaitUpdate(ArgumentMatchers.anyInt(), ArgumentMatchers.anyLong());
        ((ProducerMetadata) Mockito.verify(producerMetadata, Mockito.times(6))).fetch();
        producerWithOverrideNewSender.partitionsFor("topic");
        ((ProducerMetadata) Mockito.verify(producerMetadata, Mockito.times(4))).requestUpdateForTopic("topic");
        ((ProducerMetadata) Mockito.verify(producerMetadata, Mockito.times(4))).awaitUpdate(ArgumentMatchers.anyInt(), ArgumentMatchers.anyLong());
        ((ProducerMetadata) Mockito.verify(producerMetadata, Mockito.times(7))).fetch();
        producerWithOverrideNewSender.close(Duration.ofMillis(0L));
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testMetadataExpiry(boolean z) throws InterruptedException {
        HashMap hashMap = new HashMap();
        hashMap.put("bootstrap.servers", "localhost:9999");
        hashMap.put("enable.idempotence", Boolean.valueOf(z));
        ProducerMetadata producerMetadata = (ProducerMetadata) Mockito.mock(ProducerMetadata.class);
        Mockito.when(producerMetadata.fetch()).thenReturn(this.onePartitionCluster, new Cluster[]{this.emptyCluster, this.onePartitionCluster});
        KafkaProducer<String, String> producerWithOverrideNewSender = producerWithOverrideNewSender(hashMap, producerMetadata);
        ProducerRecord producerRecord = new ProducerRecord("topic", "value");
        producerWithOverrideNewSender.send(producerRecord);
        ((ProducerMetadata) Mockito.verify(producerMetadata, Mockito.times(0))).requestUpdateForTopic("topic");
        ((ProducerMetadata) Mockito.verify(producerMetadata, Mockito.times(0))).awaitUpdate(ArgumentMatchers.anyInt(), ArgumentMatchers.anyLong());
        ((ProducerMetadata) Mockito.verify(producerMetadata, Mockito.times(1))).fetch();
        producerWithOverrideNewSender.send(producerRecord, (Callback) null);
        ((ProducerMetadata) Mockito.verify(producerMetadata, Mockito.times(1))).requestUpdateForTopic("topic");
        ((ProducerMetadata) Mockito.verify(producerMetadata, Mockito.times(1))).awaitUpdate(ArgumentMatchers.anyInt(), ArgumentMatchers.anyLong());
        ((ProducerMetadata) Mockito.verify(producerMetadata, Mockito.times(3))).fetch();
        producerWithOverrideNewSender.close(Duration.ofMillis(0L));
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testMetadataTimeoutWithMissingTopic(boolean z) throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put("bootstrap.servers", "localhost:9999");
        hashMap.put("max.block.ms", 60000);
        hashMap.put("enable.idempotence", Boolean.valueOf(z));
        ProducerRecord producerRecord = new ProducerRecord("topic", 2, (Object) null, "value");
        ProducerMetadata producerMetadata = (ProducerMetadata) Mockito.mock(ProducerMetadata.class);
        MockTime mockTime = new MockTime();
        AtomicInteger atomicInteger = new AtomicInteger(0);
        Mockito.when(producerMetadata.fetch()).then(invocationOnMock -> {
            atomicInteger.incrementAndGet();
            if (atomicInteger.get() == 5) {
                mockTime.setCurrentTimeMs(mockTime.milliseconds() + 70000);
            }
            return this.emptyCluster;
        });
        KafkaProducer<String, String> producerWithOverrideNewSender = producerWithOverrideNewSender(hashMap, producerMetadata, mockTime);
        Future send = producerWithOverrideNewSender.send(producerRecord);
        ((ProducerMetadata) Mockito.verify(producerMetadata, Mockito.times(4))).requestUpdateForTopic("topic");
        ((ProducerMetadata) Mockito.verify(producerMetadata, Mockito.times(4))).awaitUpdate(ArgumentMatchers.anyInt(), ArgumentMatchers.anyLong());
        ((ProducerMetadata) Mockito.verify(producerMetadata, Mockito.times(5))).fetch();
        try {
            send.getClass();
            Assertions.assertInstanceOf(org.apache.kafka.common.errors.TimeoutException.class, ((ExecutionException) Assertions.assertThrows(ExecutionException.class, send::get)).getCause());
            producerWithOverrideNewSender.close(Duration.ofMillis(0L));
        } catch (Throwable th) {
            producerWithOverrideNewSender.close(Duration.ofMillis(0L));
            throw th;
        }
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testMetadataWithPartitionOutOfRange(boolean z) throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put("bootstrap.servers", "localhost:9999");
        hashMap.put("max.block.ms", 60000);
        hashMap.put("enable.idempotence", Boolean.valueOf(z));
        ProducerRecord producerRecord = new ProducerRecord("topic", 2, (Object) null, "value");
        ProducerMetadata producerMetadata = (ProducerMetadata) Mockito.mock(ProducerMetadata.class);
        MockTime mockTime = new MockTime();
        Mockito.when(producerMetadata.fetch()).thenReturn(this.onePartitionCluster, new Cluster[]{this.onePartitionCluster, this.threePartitionCluster});
        KafkaProducer<String, String> producerWithOverrideNewSender = producerWithOverrideNewSender(hashMap, producerMetadata, mockTime);
        producerWithOverrideNewSender.send(producerRecord);
        ((ProducerMetadata) Mockito.verify(producerMetadata, Mockito.times(2))).requestUpdateForTopic("topic");
        ((ProducerMetadata) Mockito.verify(producerMetadata, Mockito.times(2))).awaitUpdate(ArgumentMatchers.anyInt(), ArgumentMatchers.anyLong());
        ((ProducerMetadata) Mockito.verify(producerMetadata, Mockito.times(3))).fetch();
        producerWithOverrideNewSender.close(Duration.ofMillis(0L));
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testMetadataTimeoutWithPartitionOutOfRange(boolean z) throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put("bootstrap.servers", "localhost:9999");
        hashMap.put("max.block.ms", 60000);
        hashMap.put("enable.idempotence", Boolean.valueOf(z));
        ProducerRecord producerRecord = new ProducerRecord("topic", 2, (Object) null, "value");
        ProducerMetadata producerMetadata = (ProducerMetadata) Mockito.mock(ProducerMetadata.class);
        MockTime mockTime = new MockTime();
        AtomicInteger atomicInteger = new AtomicInteger(0);
        Mockito.when(producerMetadata.fetch()).then(invocationOnMock -> {
            atomicInteger.incrementAndGet();
            if (atomicInteger.get() == 5) {
                mockTime.setCurrentTimeMs(mockTime.milliseconds() + 70000);
            }
            return this.onePartitionCluster;
        });
        KafkaProducer<String, String> producerWithOverrideNewSender = producerWithOverrideNewSender(hashMap, producerMetadata, mockTime);
        Future send = producerWithOverrideNewSender.send(producerRecord);
        ((ProducerMetadata) Mockito.verify(producerMetadata, Mockito.times(4))).requestUpdateForTopic("topic");
        ((ProducerMetadata) Mockito.verify(producerMetadata, Mockito.times(4))).awaitUpdate(ArgumentMatchers.anyInt(), ArgumentMatchers.anyLong());
        ((ProducerMetadata) Mockito.verify(producerMetadata, Mockito.times(5))).fetch();
        try {
            send.getClass();
            Assertions.assertInstanceOf(org.apache.kafka.common.errors.TimeoutException.class, ((ExecutionException) Assertions.assertThrows(ExecutionException.class, send::get)).getCause());
            producerWithOverrideNewSender.close(Duration.ofMillis(0L));
        } catch (Throwable th) {
            producerWithOverrideNewSender.close(Duration.ofMillis(0L));
            throw th;
        }
    }

    @Test
    public void testTopicRefreshInMetadata() throws InterruptedException {
        HashMap hashMap = new HashMap();
        hashMap.put("bootstrap.servers", "localhost:9999");
        hashMap.put("max.block.ms", "600000");
        hashMap.put("enable.idempotence", false);
        MockTime mockTime = new MockTime();
        ProducerMetadata producerMetadata = new ProducerMetadata(500L, 5000L, 60000L, 60000L, new LogContext(), new ClusterResourceListeners(), mockTime);
        KafkaProducer kafkaProducer = kafkaProducer(hashMap, new StringSerializer(), new StringSerializer(), producerMetadata, new MockClient((Time) mockTime, (Metadata) producerMetadata), null, mockTime);
        Throwable th = null;
        try {
            try {
                AtomicBoolean atomicBoolean = new AtomicBoolean(true);
                Thread thread = new Thread(() -> {
                    long currentTimeMillis = System.currentTimeMillis();
                    while (atomicBoolean.get()) {
                        while (!producerMetadata.updateRequested() && System.currentTimeMillis() - currentTimeMillis < 100) {
                            Thread.yield();
                        }
                        producerMetadata.updateWithCurrentRequestVersion(RequestTestUtils.metadataUpdateWith("kafka-cluster", 1, Collections.singletonMap("topic", Errors.UNKNOWN_TOPIC_OR_PARTITION), Collections.emptyMap()), false, mockTime.milliseconds());
                        mockTime.sleep(60000L);
                    }
                });
                thread.start();
                Assertions.assertInstanceOf(UnknownTopicOrPartitionException.class, Assertions.assertThrows(org.apache.kafka.common.errors.TimeoutException.class, () -> {
                    kafkaProducer.partitionsFor("topic");
                }).getCause());
                atomicBoolean.set(false);
                thread.join();
                if (kafkaProducer != null) {
                    if (0 == 0) {
                        kafkaProducer.close();
                        return;
                    }
                    try {
                        kafkaProducer.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (kafkaProducer != null) {
                if (th != null) {
                    try {
                        kafkaProducer.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    kafkaProducer.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testTopicNotExistingInMetadata() throws InterruptedException {
        HashMap hashMap = new HashMap();
        hashMap.put("bootstrap.servers", "localhost:9999");
        hashMap.put("max.block.ms", "30000");
        MockTime mockTime = new MockTime();
        ProducerMetadata producerMetadata = new ProducerMetadata(500L, 5000L, 60000L, 60000L, new LogContext(), new ClusterResourceListeners(), mockTime);
        KafkaProducer kafkaProducer = kafkaProducer(hashMap, new StringSerializer(), new StringSerializer(), producerMetadata, new MockClient((Time) mockTime, (Metadata) producerMetadata), null, mockTime);
        Throwable th = null;
        try {
            try {
                Exchanger exchanger = new Exchanger();
                Thread thread = new Thread(() -> {
                    try {
                        producerMetadata.updateWithCurrentRequestVersion(RequestTestUtils.metadataUpdateWith("kafka-cluster", 1, Collections.singletonMap("topic", Errors.UNKNOWN_TOPIC_OR_PARTITION), Collections.emptyMap()), false, mockTime.milliseconds());
                        exchanger.exchange(null);
                        while (!producerMetadata.updateRequested()) {
                            Thread.sleep(100L);
                        }
                        mockTime.sleep(30000L);
                    } catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                });
                thread.start();
                exchanger.exchange(null);
                Assertions.assertInstanceOf(UnknownTopicOrPartitionException.class, Assertions.assertThrows(org.apache.kafka.common.errors.TimeoutException.class, () -> {
                    kafkaProducer.partitionsFor("topic");
                }).getCause());
                thread.join();
                if (kafkaProducer != null) {
                    if (0 == 0) {
                        kafkaProducer.close();
                        return;
                    }
                    try {
                        kafkaProducer.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (kafkaProducer != null) {
                if (th != null) {
                    try {
                        kafkaProducer.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    kafkaProducer.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testTopicExpiryInMetadata() throws InterruptedException {
        HashMap hashMap = new HashMap();
        hashMap.put("bootstrap.servers", "localhost:9999");
        hashMap.put("max.block.ms", "30000");
        MockTime mockTime = new MockTime();
        ProducerMetadata producerMetadata = new ProducerMetadata(500L, 5000L, 60000L, 60000L, new LogContext(), new ClusterResourceListeners(), mockTime);
        KafkaProducer kafkaProducer = kafkaProducer(hashMap, new StringSerializer(), new StringSerializer(), producerMetadata, new MockClient((Time) mockTime, (Metadata) producerMetadata), null, mockTime);
        Throwable th = null;
        try {
            try {
                Exchanger exchanger = new Exchanger();
                Thread thread = new Thread(() -> {
                    try {
                        exchanger.exchange(null);
                        while (!producerMetadata.updateRequested()) {
                            Thread.sleep(100L);
                        }
                        producerMetadata.updateWithCurrentRequestVersion(RequestTestUtils.metadataUpdateWith(1, Collections.singletonMap("topic", 1)), false, mockTime.milliseconds());
                        exchanger.exchange(null);
                        mockTime.sleep(120000L);
                        producerMetadata.updateWithCurrentRequestVersion(RequestTestUtils.metadataUpdateWith(1, Collections.singletonMap("topic", 1)), false, mockTime.milliseconds());
                        exchanger.exchange(null);
                        while (!producerMetadata.updateRequested()) {
                            Thread.sleep(100L);
                        }
                        mockTime.sleep(30000L);
                    } catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                });
                thread.start();
                exchanger.exchange(null);
                Assertions.assertNotNull(kafkaProducer.partitionsFor("topic"));
                exchanger.exchange(null);
                exchanger.exchange(null);
                Assertions.assertThrows(org.apache.kafka.common.errors.TimeoutException.class, () -> {
                    kafkaProducer.partitionsFor("topic");
                });
                thread.join();
                if (kafkaProducer != null) {
                    if (0 == 0) {
                        kafkaProducer.close();
                        return;
                    }
                    try {
                        kafkaProducer.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (kafkaProducer != null) {
                if (th != null) {
                    try {
                        kafkaProducer.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    kafkaProducer.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testHeaders() {
        doTestHeaders(Serializer.class);
    }

    private <T extends Serializer<String>> void doTestHeaders(Class<T> cls) {
        HashMap hashMap = new HashMap();
        hashMap.put("bootstrap.servers", "localhost:9999");
        Serializer serializer = (Serializer) Mockito.mock(cls);
        Serializer serializer2 = (Serializer) Mockito.mock(cls);
        long milliseconds = Time.SYSTEM.milliseconds();
        ProducerMetadata newMetadata = newMetadata(0L, 0L, 90000L);
        newMetadata.add("topic", milliseconds);
        newMetadata.updateWithCurrentRequestVersion(RequestTestUtils.metadataUpdateWith(1, Collections.singletonMap("topic", 1)), false, milliseconds);
        KafkaProducer kafkaProducer = kafkaProducer(hashMap, serializer, serializer2, newMetadata, null, null, Time.SYSTEM);
        Mockito.when(serializer.serialize((String) ArgumentMatchers.any(), (Headers) ArgumentMatchers.any(), ArgumentMatchers.any())).then(invocationOnMock -> {
            return ((String) invocationOnMock.getArgument(2)).getBytes();
        });
        Mockito.when(serializer2.serialize((String) ArgumentMatchers.any(), (Headers) ArgumentMatchers.any(), ArgumentMatchers.any())).then(invocationOnMock2 -> {
            return ((String) invocationOnMock2.getArgument(2)).getBytes();
        });
        ProducerRecord producerRecord = new ProducerRecord("topic", "key", "value");
        producerRecord.headers().add(new RecordHeader("test", "header2".getBytes()));
        kafkaProducer.send(producerRecord, (Callback) null);
        Assertions.assertThrows(IllegalStateException.class, () -> {
            producerRecord.headers().add(new RecordHeader("test", "test".getBytes()));
        });
        Assertions.assertArrayEquals(producerRecord.headers().lastHeader("test").value(), "header2".getBytes());
        ((Serializer) Mockito.verify(serializer2)).serialize("topic", producerRecord.headers(), "value");
        ((Serializer) Mockito.verify(serializer)).serialize("topic", producerRecord.headers(), "key");
        kafkaProducer.close(Duration.ofMillis(0L));
    }

    @Test
    public void closeShouldBeIdempotent() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:9000");
        KafkaProducer kafkaProducer = new KafkaProducer(properties, new ByteArraySerializer(), new ByteArraySerializer());
        kafkaProducer.close();
        kafkaProducer.close();
    }

    @Test
    public void closeWithNegativeTimestampShouldThrow() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:9000");
        KafkaProducer kafkaProducer = new KafkaProducer(properties, new ByteArraySerializer(), new ByteArraySerializer());
        Throwable th = null;
        try {
            try {
                Assertions.assertThrows(IllegalArgumentException.class, () -> {
                    kafkaProducer.close(Duration.ofMillis(-100L));
                });
                if (kafkaProducer != null) {
                    if (0 == 0) {
                        kafkaProducer.close();
                        return;
                    }
                    try {
                        kafkaProducer.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (kafkaProducer != null) {
                if (th != null) {
                    try {
                        kafkaProducer.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    kafkaProducer.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testFlushCompleteSendOfInflightBatches() {
        HashMap hashMap = new HashMap();
        hashMap.put("bootstrap.servers", "localhost:9000");
        hashMap.put("enable.idempotence", false);
        MockTime mockTime = new MockTime(1L);
        MetadataResponse metadataUpdateWith = RequestTestUtils.metadataUpdateWith(1, Collections.singletonMap("topic", 1));
        ProducerMetadata newMetadata = newMetadata(0L, 0L, Long.MAX_VALUE);
        MockClient mockClient = new MockClient((Time) mockTime, (Metadata) newMetadata);
        mockClient.updateMetadata(metadataUpdateWith);
        KafkaProducer kafkaProducer = kafkaProducer(hashMap, new StringSerializer(), new StringSerializer(), newMetadata, mockClient, null, mockTime);
        Throwable th = null;
        try {
            try {
                ArrayList arrayList = new ArrayList();
                for (int i = 0; i < 50; i++) {
                    arrayList.add(kafkaProducer.send(new ProducerRecord("topic", "value" + i)));
                }
                arrayList.forEach(future -> {
                    Assertions.assertFalse(future.isDone());
                });
                kafkaProducer.flush();
                arrayList.forEach(future2 -> {
                    Assertions.assertTrue(future2.isDone());
                });
                if (kafkaProducer != null) {
                    if (0 == 0) {
                        kafkaProducer.close();
                        return;
                    }
                    try {
                        kafkaProducer.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (kafkaProducer != null) {
                if (th != null) {
                    try {
                        kafkaProducer.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    kafkaProducer.close();
                }
            }
            throw th4;
        }
    }

    private static Double getMetricValue(KafkaProducer<?, ?> kafkaProducer, String str) {
        Metrics metrics = kafkaProducer.metrics;
        return (Double) metrics.metric(metrics.metricName(str, "producer-metrics")).metricValue();
    }

    @Test
    public void testFlushMeasureLatency() {
        HashMap hashMap = new HashMap();
        hashMap.put("bootstrap.servers", "localhost:9000");
        MockTime mockTime = new MockTime(1L);
        MetadataResponse metadataUpdateWith = RequestTestUtils.metadataUpdateWith(1, Collections.singletonMap("topic", 1));
        ProducerMetadata newMetadata = newMetadata(0L, 0L, Long.MAX_VALUE);
        MockClient mockClient = new MockClient((Time) mockTime, (Metadata) newMetadata);
        mockClient.updateMetadata(metadataUpdateWith);
        KafkaProducer kafkaProducer = kafkaProducer(hashMap, new StringSerializer(), new StringSerializer(), newMetadata, mockClient, null, mockTime);
        Throwable th = null;
        try {
            kafkaProducer.flush();
            double doubleValue = getMetricValue(kafkaProducer, "flush-time-ns-total").doubleValue();
            Assertions.assertTrue(doubleValue > 0.0d);
            kafkaProducer.flush();
            Assertions.assertTrue(getMetricValue(kafkaProducer, "flush-time-ns-total").doubleValue() > doubleValue);
            if (kafkaProducer != null) {
                if (0 == 0) {
                    kafkaProducer.close();
                    return;
                }
                try {
                    kafkaProducer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (kafkaProducer != null) {
                if (0 != 0) {
                    try {
                        kafkaProducer.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    kafkaProducer.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testMetricConfigRecordingLevel() {
        Throwable th;
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:9000");
        KafkaProducer kafkaProducer = new KafkaProducer(properties, new ByteArraySerializer(), new ByteArraySerializer());
        Throwable th2 = null;
        try {
            try {
                Assertions.assertEquals(Sensor.RecordingLevel.INFO, kafkaProducer.metrics.config().recordLevel());
                if (kafkaProducer != null) {
                    if (0 != 0) {
                        try {
                            kafkaProducer.close();
                        } catch (Throwable th3) {
                            th2.addSuppressed(th3);
                        }
                    } else {
                        kafkaProducer.close();
                    }
                }
                properties.put("metrics.recording.level", "DEBUG");
                kafkaProducer = new KafkaProducer(properties, new ByteArraySerializer(), new ByteArraySerializer());
                th = null;
            } catch (Throwable th4) {
                th2 = th4;
                throw th4;
            }
            try {
                try {
                    Assertions.assertEquals(Sensor.RecordingLevel.DEBUG, kafkaProducer.metrics.config().recordLevel());
                    if (kafkaProducer != null) {
                        if (0 == 0) {
                            kafkaProducer.close();
                            return;
                        }
                        try {
                            kafkaProducer.close();
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                        }
                    }
                } catch (Throwable th6) {
                    th = th6;
                    throw th6;
                }
            } finally {
            }
        } finally {
        }
    }

    @Test
    public void testInterceptorPartitionSetOnTooLargeRecord() {
        HashMap hashMap = new HashMap();
        hashMap.put("bootstrap.servers", "localhost:9999");
        hashMap.put("max.request.size", "1");
        ProducerRecord producerRecord = new ProducerRecord("topic", "value");
        long milliseconds = Time.SYSTEM.milliseconds();
        ProducerMetadata newMetadata = newMetadata(0L, 0L, 90000L);
        newMetadata.add("topic", milliseconds);
        newMetadata.updateWithCurrentRequestVersion(RequestTestUtils.metadataUpdateWith(1, Collections.singletonMap("topic", 1)), false, milliseconds);
        ProducerInterceptors producerInterceptors = (ProducerInterceptors) Mockito.mock(ProducerInterceptors.class);
        KafkaProducer kafkaProducer = kafkaProducer(hashMap, new StringSerializer(), new StringSerializer(), newMetadata, null, producerInterceptors, Time.SYSTEM);
        Mockito.when(producerInterceptors.onSend((ProducerRecord) ArgumentMatchers.any())).then(invocationOnMock -> {
            return invocationOnMock.getArgument(0);
        });
        kafkaProducer.send(producerRecord);
        ((ProducerInterceptors) Mockito.verify(producerInterceptors)).onSend(producerRecord);
        ((ProducerInterceptors) Mockito.verify(producerInterceptors)).onSendError((ProducerRecord) ArgumentMatchers.eq(producerRecord), (TopicPartition) ArgumentMatchers.notNull(), (Exception) ArgumentMatchers.notNull());
        kafkaProducer.close(Duration.ofMillis(0L));
    }

    @Test
    public void testPartitionsForWithNullTopic() {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9000");
        KafkaProducer kafkaProducer = new KafkaProducer(properties, new ByteArraySerializer(), new ByteArraySerializer());
        Throwable th = null;
        try {
            Assertions.assertThrows(NullPointerException.class, () -> {
                kafkaProducer.partitionsFor((String) null);
            });
            if (kafkaProducer != null) {
                if (0 == 0) {
                    kafkaProducer.close();
                    return;
                }
                try {
                    kafkaProducer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (kafkaProducer != null) {
                if (0 != 0) {
                    try {
                        kafkaProducer.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    kafkaProducer.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testInitTransactionsResponseAfterTimeout() throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put("transactional.id", "bad-transaction");
        hashMap.put("max.block.ms", 500);
        hashMap.put("bootstrap.servers", "localhost:9000");
        MockTime mockTime = new MockTime();
        MetadataResponse metadataUpdateWith = RequestTestUtils.metadataUpdateWith(1, Collections.singletonMap("topic", 1));
        ProducerMetadata newMetadata = newMetadata(0L, 0L, Long.MAX_VALUE);
        newMetadata.updateWithCurrentRequestVersion(metadataUpdateWith, false, mockTime.milliseconds());
        MockClient mockClient = new MockClient((Time) mockTime, (Metadata) newMetadata);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(1);
        KafkaProducer kafkaProducer = kafkaProducer(hashMap, new StringSerializer(), new StringSerializer(), newMetadata, mockClient, null, mockTime);
        Throwable th = null;
        try {
            try {
                mockClient.prepareResponse(abstractRequest -> {
                    return (abstractRequest instanceof FindCoordinatorRequest) && ((FindCoordinatorRequest) abstractRequest).data().keyType() == FindCoordinatorRequest.CoordinatorType.TRANSACTION.id();
                }, (AbstractResponse) FindCoordinatorResponse.prepareResponse(Errors.NONE, "bad-transaction", NODE));
                kafkaProducer.getClass();
                Future<?> submit = newFixedThreadPool.submit(kafkaProducer::initTransactions);
                mockClient.getClass();
                TestUtils.waitForCondition(mockClient::hasInFlightRequests, "Timed out while waiting for expected `InitProducerId` request to be sent");
                mockTime.sleep(500);
                TestUtils.assertFutureThrows(submit, org.apache.kafka.common.errors.TimeoutException.class);
                mockClient.respond(initProducerIdResponse(1L, (short) 5, Errors.NONE));
                Thread.sleep(1000L);
                kafkaProducer.initTransactions();
                if (kafkaProducer != null) {
                    if (0 == 0) {
                        kafkaProducer.close();
                        return;
                    }
                    try {
                        kafkaProducer.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (kafkaProducer != null) {
                if (th != null) {
                    try {
                        kafkaProducer.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    kafkaProducer.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testInitTransactionTimeout() {
        HashMap hashMap = new HashMap();
        hashMap.put("transactional.id", "bad-transaction");
        hashMap.put("max.block.ms", 500);
        hashMap.put("bootstrap.servers", "localhost:9000");
        MockTime mockTime = new MockTime(1L);
        MetadataResponse metadataUpdateWith = RequestTestUtils.metadataUpdateWith(1, Collections.singletonMap("topic", 1));
        ProducerMetadata newMetadata = newMetadata(0L, 0L, Long.MAX_VALUE);
        newMetadata.updateWithCurrentRequestVersion(metadataUpdateWith, false, mockTime.milliseconds());
        MockClient mockClient = new MockClient((Time) mockTime, (Metadata) newMetadata);
        KafkaProducer kafkaProducer = kafkaProducer(hashMap, new StringSerializer(), new StringSerializer(), newMetadata, mockClient, null, mockTime);
        Throwable th = null;
        try {
            try {
                mockClient.prepareResponse(abstractRequest -> {
                    return (abstractRequest instanceof FindCoordinatorRequest) && ((FindCoordinatorRequest) abstractRequest).data().keyType() == FindCoordinatorRequest.CoordinatorType.TRANSACTION.id();
                }, (AbstractResponse) FindCoordinatorResponse.prepareResponse(Errors.NONE, "bad-transaction", NODE));
                kafkaProducer.getClass();
                Assertions.assertThrows(org.apache.kafka.common.errors.TimeoutException.class, kafkaProducer::initTransactions);
                mockClient.prepareResponse(abstractRequest2 -> {
                    return (abstractRequest2 instanceof FindCoordinatorRequest) && ((FindCoordinatorRequest) abstractRequest2).data().keyType() == FindCoordinatorRequest.CoordinatorType.TRANSACTION.id();
                }, (AbstractResponse) FindCoordinatorResponse.prepareResponse(Errors.NONE, "bad-transaction", NODE));
                mockClient.prepareResponse(initProducerIdResponse(1L, (short) 5, Errors.NONE));
                kafkaProducer.initTransactions();
                if (kafkaProducer != null) {
                    if (0 == 0) {
                        kafkaProducer.close();
                        return;
                    }
                    try {
                        kafkaProducer.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (kafkaProducer != null) {
                if (th != null) {
                    try {
                        kafkaProducer.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    kafkaProducer.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testInitTransactionWhileThrottled() {
        HashMap hashMap = new HashMap();
        hashMap.put("transactional.id", "some.id");
        hashMap.put("max.block.ms", 10000);
        hashMap.put("bootstrap.servers", "localhost:9000");
        MockTime mockTime = new MockTime(1L);
        MetadataResponse metadataUpdateWith = RequestTestUtils.metadataUpdateWith(1, Collections.singletonMap("topic", 1));
        ProducerMetadata newMetadata = newMetadata(0L, 0L, Long.MAX_VALUE);
        MockClient mockClient = new MockClient((Time) mockTime, (Metadata) newMetadata);
        mockClient.updateMetadata(metadataUpdateWith);
        mockClient.throttle((Node) newMetadata.fetch().nodes().get(0), 5000L);
        mockClient.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "some.id", NODE));
        mockClient.prepareResponse(initProducerIdResponse(1L, (short) 5, Errors.NONE));
        KafkaProducer kafkaProducer = kafkaProducer(hashMap, new StringSerializer(), new StringSerializer(), newMetadata, mockClient, null, mockTime);
        Throwable th = null;
        try {
            try {
                kafkaProducer.initTransactions();
                if (kafkaProducer != null) {
                    if (0 == 0) {
                        kafkaProducer.close();
                        return;
                    }
                    try {
                        kafkaProducer.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (kafkaProducer != null) {
                if (th != null) {
                    try {
                        kafkaProducer.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    kafkaProducer.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testClusterAuthorizationFailure() throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put("max.block.ms", 500);
        hashMap.put("bootstrap.servers", "localhost:9000");
        hashMap.put("enable.idempotence", true);
        hashMap.put("transactional.id", "some-txn");
        MockTime mockTime = new MockTime(1L);
        MetadataResponse metadataUpdateWith = RequestTestUtils.metadataUpdateWith(1, Collections.singletonMap("topic", 1));
        ProducerMetadata newMetadata = newMetadata(500L, 5000L, Long.MAX_VALUE);
        MockClient mockClient = new MockClient((Time) mockTime, (Metadata) newMetadata);
        mockClient.updateMetadata(metadataUpdateWith);
        mockClient.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "some-txn", NODE));
        mockClient.prepareResponse(initProducerIdResponse(1L, (short) 5, Errors.CLUSTER_AUTHORIZATION_FAILED));
        KafkaProducer kafkaProducer = kafkaProducer(hashMap, new StringSerializer(), new StringSerializer(), newMetadata, mockClient, null, mockTime);
        kafkaProducer.getClass();
        Assertions.assertThrows(ClusterAuthorizationException.class, kafkaProducer::initTransactions);
        mockClient.prepareResponse(initProducerIdResponse(1L, (short) 5, Errors.NONE));
        kafkaProducer.getClass();
        TestUtils.retryOnExceptionWithTimeout(1000L, 100L, kafkaProducer::initTransactions);
        kafkaProducer.close();
    }

    @Test
    public void testAbortTransaction() {
        HashMap hashMap = new HashMap();
        hashMap.put("transactional.id", "some.id");
        hashMap.put("bootstrap.servers", "localhost:9000");
        MockTime mockTime = new MockTime(1L);
        MetadataResponse metadataUpdateWith = RequestTestUtils.metadataUpdateWith(1, Collections.singletonMap("topic", 1));
        ProducerMetadata newMetadata = newMetadata(0L, 0L, Long.MAX_VALUE);
        MockClient mockClient = new MockClient((Time) mockTime, (Metadata) newMetadata);
        mockClient.updateMetadata(metadataUpdateWith);
        mockClient.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "some.id", NODE));
        mockClient.prepareResponse(initProducerIdResponse(1L, (short) 5, Errors.NONE));
        mockClient.prepareResponse(endTxnResponse(Errors.NONE));
        KafkaProducer kafkaProducer = kafkaProducer(hashMap, new StringSerializer(), new StringSerializer(), newMetadata, mockClient, null, mockTime);
        Throwable th = null;
        try {
            kafkaProducer.initTransactions();
            kafkaProducer.beginTransaction();
            kafkaProducer.abortTransaction();
            if (kafkaProducer != null) {
                if (0 == 0) {
                    kafkaProducer.close();
                    return;
                }
                try {
                    kafkaProducer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (kafkaProducer != null) {
                if (0 != 0) {
                    try {
                        kafkaProducer.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    kafkaProducer.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testMeasureAbortTransactionDuration() {
        HashMap hashMap = new HashMap();
        hashMap.put("transactional.id", "some.id");
        hashMap.put("bootstrap.servers", "localhost:9000");
        MockTime mockTime = new MockTime(1L);
        MetadataResponse metadataUpdateWith = RequestTestUtils.metadataUpdateWith(1, Collections.singletonMap("topic", 1));
        ProducerMetadata newMetadata = newMetadata(0L, 0L, Long.MAX_VALUE);
        MockClient mockClient = new MockClient((Time) mockTime, (Metadata) newMetadata);
        mockClient.updateMetadata(metadataUpdateWith);
        mockClient.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "some.id", NODE));
        mockClient.prepareResponse(initProducerIdResponse(1L, (short) 5, Errors.NONE));
        KafkaProducer kafkaProducer = kafkaProducer(hashMap, new StringSerializer(), new StringSerializer(), newMetadata, mockClient, null, mockTime);
        Throwable th = null;
        try {
            try {
                kafkaProducer.initTransactions();
                mockClient.prepareResponse(endTxnResponse(Errors.NONE));
                kafkaProducer.beginTransaction();
                kafkaProducer.abortTransaction();
                double doubleValue = getMetricValue(kafkaProducer, "txn-abort-time-ns-total").doubleValue();
                Assertions.assertTrue(doubleValue > 0.0d);
                mockClient.prepareResponse(endTxnResponse(Errors.NONE));
                kafkaProducer.beginTransaction();
                kafkaProducer.abortTransaction();
                Assertions.assertTrue(getMetricValue(kafkaProducer, "txn-abort-time-ns-total").doubleValue() > doubleValue);
                if (kafkaProducer != null) {
                    if (0 == 0) {
                        kafkaProducer.close();
                        return;
                    }
                    try {
                        kafkaProducer.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (kafkaProducer != null) {
                if (th != null) {
                    try {
                        kafkaProducer.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    kafkaProducer.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testCommitTransactionWithRecordTooLargeException() throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put("transactional.id", "some.id");
        hashMap.put("bootstrap.servers", "localhost:9000");
        hashMap.put("max.request.size", 1000);
        MockTime mockTime = new MockTime(1L);
        MetadataResponse metadataUpdateWith = RequestTestUtils.metadataUpdateWith(1, Collections.singletonMap("topic", 1));
        ProducerMetadata producerMetadata = (ProducerMetadata) Mockito.mock(ProducerMetadata.class);
        MockClient mockClient = new MockClient((Time) mockTime, (Metadata) producerMetadata);
        mockClient.updateMetadata(metadataUpdateWith);
        mockClient.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "some.id", NODE));
        mockClient.prepareResponse(initProducerIdResponse(1L, (short) 5, Errors.NONE));
        Mockito.when(producerMetadata.fetch()).thenReturn(this.onePartitionCluster);
        ProducerRecord producerRecord = new ProducerRecord("topic", "large string", (String) IntStream.range(0, 1000).mapToObj(i -> {
            return "*";
        }).collect(Collectors.joining()));
        KafkaProducer kafkaProducer = kafkaProducer(hashMap, new StringSerializer(), new StringSerializer(), producerMetadata, mockClient, null, mockTime);
        Throwable th = null;
        try {
            try {
                kafkaProducer.initTransactions();
                mockClient.prepareResponse(endTxnResponse(Errors.NONE));
                kafkaProducer.beginTransaction();
                TestUtils.assertFutureError(kafkaProducer.send(producerRecord), RecordTooLargeException.class);
                kafkaProducer.getClass();
                Assertions.assertThrows(KafkaException.class, kafkaProducer::commitTransaction);
                if (kafkaProducer != null) {
                    if (0 == 0) {
                        kafkaProducer.close();
                        return;
                    }
                    try {
                        kafkaProducer.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (kafkaProducer != null) {
                if (th != null) {
                    try {
                        kafkaProducer.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    kafkaProducer.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testCommitTransactionWithMetadataTimeoutForMissingTopic() throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put("transactional.id", "some.id");
        hashMap.put("bootstrap.servers", "localhost:9999");
        hashMap.put("max.block.ms", 60000);
        ProducerRecord producerRecord = new ProducerRecord("topic", "value");
        ProducerMetadata producerMetadata = (ProducerMetadata) Mockito.mock(ProducerMetadata.class);
        MockTime mockTime = new MockTime();
        MockClient mockClient = new MockClient((Time) mockTime, (Metadata) producerMetadata);
        mockClient.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "some.id", NODE));
        mockClient.prepareResponse(initProducerIdResponse(1L, (short) 5, Errors.NONE));
        AtomicInteger atomicInteger = new AtomicInteger(0);
        Mockito.when(producerMetadata.fetch()).then(invocationOnMock -> {
            atomicInteger.incrementAndGet();
            if (atomicInteger.get() > 5) {
                mockTime.setCurrentTimeMs(mockTime.milliseconds() + 70000);
            }
            return this.emptyCluster;
        });
        KafkaProducer kafkaProducer = kafkaProducer(hashMap, new StringSerializer(), new StringSerializer(), producerMetadata, mockClient, null, mockTime);
        Throwable th = null;
        try {
            try {
                kafkaProducer.initTransactions();
                kafkaProducer.beginTransaction();
                TestUtils.assertFutureError(kafkaProducer.send(producerRecord), org.apache.kafka.common.errors.TimeoutException.class);
                kafkaProducer.getClass();
                Assertions.assertThrows(KafkaException.class, kafkaProducer::commitTransaction);
                if (kafkaProducer != null) {
                    if (0 == 0) {
                        kafkaProducer.close();
                        return;
                    }
                    try {
                        kafkaProducer.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (kafkaProducer != null) {
                if (th != null) {
                    try {
                        kafkaProducer.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    kafkaProducer.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testCommitTransactionWithMetadataTimeoutForPartitionOutOfRange() throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put("transactional.id", "some.id");
        hashMap.put("bootstrap.servers", "localhost:9999");
        hashMap.put("max.block.ms", 60000);
        ProducerRecord producerRecord = new ProducerRecord("topic", 2, (Object) null, "value");
        ProducerMetadata producerMetadata = (ProducerMetadata) Mockito.mock(ProducerMetadata.class);
        MockTime mockTime = new MockTime();
        MockClient mockClient = new MockClient((Time) mockTime, (Metadata) producerMetadata);
        mockClient.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "some.id", NODE));
        mockClient.prepareResponse(initProducerIdResponse(1L, (short) 5, Errors.NONE));
        AtomicInteger atomicInteger = new AtomicInteger(0);
        Mockito.when(producerMetadata.fetch()).then(invocationOnMock -> {
            atomicInteger.incrementAndGet();
            if (atomicInteger.get() > 5) {
                mockTime.setCurrentTimeMs(mockTime.milliseconds() + 70000);
            }
            return this.onePartitionCluster;
        });
        KafkaProducer kafkaProducer = kafkaProducer(hashMap, new StringSerializer(), new StringSerializer(), producerMetadata, mockClient, null, mockTime);
        Throwable th = null;
        try {
            try {
                kafkaProducer.initTransactions();
                kafkaProducer.beginTransaction();
                TestUtils.assertFutureError(kafkaProducer.send(producerRecord), org.apache.kafka.common.errors.TimeoutException.class);
                kafkaProducer.getClass();
                Assertions.assertThrows(KafkaException.class, kafkaProducer::commitTransaction);
                if (kafkaProducer != null) {
                    if (0 == 0) {
                        kafkaProducer.close();
                        return;
                    }
                    try {
                        kafkaProducer.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (kafkaProducer != null) {
                if (th != null) {
                    try {
                        kafkaProducer.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    kafkaProducer.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testCommitTransactionWithSendToInvalidTopic() throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put("transactional.id", "some.id");
        hashMap.put("bootstrap.servers", "localhost:9000");
        hashMap.put("max.block.ms", "15000");
        MockTime mockTime = new MockTime();
        MetadataResponse metadataUpdateWith = RequestTestUtils.metadataUpdateWith(1, Collections.emptyMap());
        ProducerMetadata newMetadata = newMetadata(0L, 0L, Long.MAX_VALUE);
        newMetadata.updateWithCurrentRequestVersion(metadataUpdateWith, false, mockTime.milliseconds());
        MockClient mockClient = new MockClient((Time) mockTime, (Metadata) newMetadata);
        mockClient.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "some.id", NODE));
        mockClient.prepareResponse(initProducerIdResponse(1L, (short) 5, Errors.NONE));
        ProducerRecord producerRecord = new ProducerRecord("topic abc", "HelloKafka");
        ArrayList arrayList = new ArrayList();
        arrayList.add(new MetadataResponse.TopicMetadata(Errors.INVALID_TOPIC_EXCEPTION, "topic abc", false, Collections.emptyList()));
        mockClient.prepareMetadataUpdate(RequestTestUtils.metadataResponse(new ArrayList(metadataUpdateWith.brokers()), metadataUpdateWith.clusterId(), metadataUpdateWith.controller().id(), arrayList));
        KafkaProducer kafkaProducer = kafkaProducer(hashMap, new StringSerializer(), new StringSerializer(), newMetadata, mockClient, null, mockTime);
        Throwable th = null;
        try {
            try {
                kafkaProducer.initTransactions();
                kafkaProducer.beginTransaction();
                TestUtils.assertFutureError(kafkaProducer.send(producerRecord), InvalidTopicException.class);
                kafkaProducer.getClass();
                Assertions.assertThrows(KafkaException.class, kafkaProducer::commitTransaction);
                if (kafkaProducer != null) {
                    if (0 == 0) {
                        kafkaProducer.close();
                        return;
                    }
                    try {
                        kafkaProducer.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (kafkaProducer != null) {
                if (th != null) {
                    try {
                        kafkaProducer.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    kafkaProducer.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testSendTxnOffsetsWithGroupId() {
        HashMap hashMap = new HashMap();
        hashMap.put("transactional.id", "some.id");
        hashMap.put("max.block.ms", 10000);
        hashMap.put("bootstrap.servers", "localhost:9000");
        MockTime mockTime = new MockTime(1L);
        MetadataResponse metadataUpdateWith = RequestTestUtils.metadataUpdateWith(1, Collections.singletonMap("topic", 1));
        ProducerMetadata newMetadata = newMetadata(0L, 0L, Long.MAX_VALUE);
        MockClient mockClient = new MockClient((Time) mockTime, (Metadata) newMetadata);
        mockClient.updateMetadata(metadataUpdateWith);
        mockClient.throttle((Node) newMetadata.fetch().nodes().get(0), 5000L);
        mockClient.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "some.id", NODE));
        mockClient.prepareResponse(initProducerIdResponse(1L, (short) 5, Errors.NONE));
        mockClient.prepareResponse(addOffsetsToTxnResponse(Errors.NONE));
        mockClient.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "some.id", NODE));
        String str = "group";
        mockClient.prepareResponse(abstractRequest -> {
            return ((TxnOffsetCommitRequest) abstractRequest).data().groupId().equals(str);
        }, (AbstractResponse) txnOffsetsCommitResponse(Collections.singletonMap(new TopicPartition("topic", 0), Errors.NONE)));
        mockClient.prepareResponse(endTxnResponse(Errors.NONE));
        KafkaProducer kafkaProducer = kafkaProducer(hashMap, new StringSerializer(), new StringSerializer(), newMetadata, mockClient, null, mockTime);
        Throwable th = null;
        try {
            try {
                kafkaProducer.initTransactions();
                kafkaProducer.beginTransaction();
                kafkaProducer.sendOffsetsToTransaction(Collections.emptyMap(), new ConsumerGroupMetadata("group"));
                kafkaProducer.commitTransaction();
                if (kafkaProducer != null) {
                    if (0 == 0) {
                        kafkaProducer.close();
                        return;
                    }
                    try {
                        kafkaProducer.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (kafkaProducer != null) {
                if (th != null) {
                    try {
                        kafkaProducer.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    kafkaProducer.close();
                }
            }
            throw th4;
        }
    }

    private void assertDurationAtLeast(KafkaProducer<?, ?> kafkaProducer, String str, double d) {
        getAndAssertDurationAtLeast(kafkaProducer, str, d);
    }

    private double getAndAssertDurationAtLeast(KafkaProducer<?, ?> kafkaProducer, String str, double d) {
        double doubleValue = getMetricValue(kafkaProducer, str).doubleValue();
        Assertions.assertTrue(doubleValue >= d);
        return doubleValue;
    }

    @Test
    public void testMeasureTransactionDurations() {
        HashMap hashMap = new HashMap();
        hashMap.put("transactional.id", "some.id");
        hashMap.put("max.block.ms", 10000);
        hashMap.put("bootstrap.servers", "localhost:9000");
        MockTime mockTime = new MockTime(Duration.ofSeconds(1L).toMillis());
        MetadataResponse metadataUpdateWith = RequestTestUtils.metadataUpdateWith(1, Collections.singletonMap("topic", 1));
        ProducerMetadata newMetadata = newMetadata(0L, 0L, Long.MAX_VALUE);
        MockClient mockClient = new MockClient((Time) mockTime, (Metadata) newMetadata);
        mockClient.updateMetadata(metadataUpdateWith);
        mockClient.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "some.id", NODE));
        mockClient.prepareResponse(initProducerIdResponse(1L, (short) 5, Errors.NONE));
        KafkaProducer<?, ?> kafkaProducer = kafkaProducer(hashMap, new StringSerializer(), new StringSerializer(), newMetadata, mockClient, null, mockTime);
        Throwable th = null;
        try {
            kafkaProducer.initTransactions();
            assertDurationAtLeast(kafkaProducer, "txn-init-time-ns-total", r0.toNanos());
            mockClient.prepareResponse(addOffsetsToTxnResponse(Errors.NONE));
            mockClient.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "some.id", NODE));
            mockClient.prepareResponse(txnOffsetsCommitResponse(Collections.singletonMap(new TopicPartition("topic", 0), Errors.NONE)));
            mockClient.prepareResponse(endTxnResponse(Errors.NONE));
            kafkaProducer.beginTransaction();
            double andAssertDurationAtLeast = getAndAssertDurationAtLeast(kafkaProducer, "txn-begin-time-ns-total", r0.toNanos());
            kafkaProducer.sendOffsetsToTransaction(Collections.singletonMap(new TopicPartition("topic", 0), new OffsetAndMetadata(5L)), new ConsumerGroupMetadata("group"));
            double andAssertDurationAtLeast2 = getAndAssertDurationAtLeast(kafkaProducer, "txn-send-offsets-time-ns-total", r0.toNanos());
            kafkaProducer.commitTransaction();
            double andAssertDurationAtLeast3 = getAndAssertDurationAtLeast(kafkaProducer, "txn-commit-time-ns-total", r0.toNanos());
            mockClient.prepareResponse(addOffsetsToTxnResponse(Errors.NONE));
            mockClient.prepareResponse(txnOffsetsCommitResponse(Collections.singletonMap(new TopicPartition("topic", 0), Errors.NONE)));
            mockClient.prepareResponse(endTxnResponse(Errors.NONE));
            kafkaProducer.beginTransaction();
            assertDurationAtLeast(kafkaProducer, "txn-begin-time-ns-total", andAssertDurationAtLeast + r0.toNanos());
            kafkaProducer.sendOffsetsToTransaction(Collections.singletonMap(new TopicPartition("topic", 0), new OffsetAndMetadata(10L)), new ConsumerGroupMetadata("group"));
            assertDurationAtLeast(kafkaProducer, "txn-send-offsets-time-ns-total", andAssertDurationAtLeast2 + r0.toNanos());
            kafkaProducer.commitTransaction();
            assertDurationAtLeast(kafkaProducer, "txn-commit-time-ns-total", andAssertDurationAtLeast3 + r0.toNanos());
            if (kafkaProducer != null) {
                if (0 == 0) {
                    kafkaProducer.close();
                    return;
                }
                try {
                    kafkaProducer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (kafkaProducer != null) {
                if (0 != 0) {
                    try {
                        kafkaProducer.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    kafkaProducer.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testSendTxnOffsetsWithGroupMetadata() {
        HashMap hashMap = new HashMap();
        hashMap.put("transactional.id", "some.id");
        hashMap.put("max.block.ms", 10000);
        hashMap.put("bootstrap.servers", "localhost:9000");
        MockTime mockTime = new MockTime(1L);
        MetadataResponse metadataUpdateWith = RequestTestUtils.metadataUpdateWith(1, Collections.singletonMap("topic", 1));
        ProducerMetadata newMetadata = newMetadata(0L, 0L, Long.MAX_VALUE);
        MockClient mockClient = new MockClient((Time) mockTime, (Metadata) newMetadata);
        mockClient.updateMetadata(metadataUpdateWith);
        mockClient.setNodeApiVersions(NodeApiVersions.create(ApiKeys.TXN_OFFSET_COMMIT.id, (short) 0, (short) 3));
        mockClient.throttle((Node) newMetadata.fetch().nodes().get(0), 5000L);
        mockClient.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "some.id", NODE));
        mockClient.prepareResponse(initProducerIdResponse(1L, (short) 5, Errors.NONE));
        mockClient.prepareResponse(addOffsetsToTxnResponse(Errors.NONE));
        mockClient.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "some.id", NODE));
        String str = "group";
        String str2 = "member";
        int i = 5;
        String str3 = "instance";
        mockClient.prepareResponse(abstractRequest -> {
            TxnOffsetCommitRequestData data = ((TxnOffsetCommitRequest) abstractRequest).data();
            return data.groupId().equals(str) && data.memberId().equals(str2) && data.generationId() == i && data.groupInstanceId().equals(str3);
        }, (AbstractResponse) txnOffsetsCommitResponse(Collections.singletonMap(new TopicPartition("topic", 0), Errors.NONE)));
        mockClient.prepareResponse(endTxnResponse(Errors.NONE));
        KafkaProducer kafkaProducer = kafkaProducer(hashMap, new StringSerializer(), new StringSerializer(), newMetadata, mockClient, null, mockTime);
        Throwable th = null;
        try {
            try {
                kafkaProducer.initTransactions();
                kafkaProducer.beginTransaction();
                kafkaProducer.sendOffsetsToTransaction(Collections.emptyMap(), new ConsumerGroupMetadata("group", 5, "member", Optional.of("instance")));
                kafkaProducer.commitTransaction();
                if (kafkaProducer != null) {
                    if (0 == 0) {
                        kafkaProducer.close();
                        return;
                    }
                    try {
                        kafkaProducer.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (kafkaProducer != null) {
                if (th != null) {
                    try {
                        kafkaProducer.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    kafkaProducer.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testNullGroupMetadataInSendOffsets() {
        verifyInvalidGroupMetadata(null);
    }

    @Test
    public void testInvalidGenerationIdAndMemberIdCombinedInSendOffsets() {
        verifyInvalidGroupMetadata(new ConsumerGroupMetadata("group", 2, "", Optional.empty()));
    }

    @Test
    public void testClientInstanceId() {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9999");
        ClientTelemetryReporter clientTelemetryReporter = (ClientTelemetryReporter) Mockito.mock(ClientTelemetryReporter.class);
        clientTelemetryReporter.configure((Map) ArgumentMatchers.any());
        MockedStatic mockStatic = Mockito.mockStatic(CommonClientConfigs.class, new CallsRealMethods());
        Throwable th = null;
        try {
            mockStatic.when(() -> {
                CommonClientConfigs.telemetryReporter(ArgumentMatchers.anyString(), (AbstractConfig) ArgumentMatchers.any());
            }).thenReturn(Optional.of(clientTelemetryReporter));
            ClientTelemetrySender clientTelemetrySender = (ClientTelemetrySender) Mockito.mock(ClientTelemetrySender.class);
            Uuid randomUuid = Uuid.randomUuid();
            Mockito.when(clientTelemetryReporter.telemetrySender()).thenReturn(clientTelemetrySender);
            Mockito.when(clientTelemetrySender.clientInstanceId((Duration) ArgumentMatchers.any())).thenReturn(Optional.of(randomUuid));
            KafkaProducer kafkaProducer = new KafkaProducer(properties, new StringSerializer(), new StringSerializer());
            Throwable th2 = null;
            try {
                try {
                    Assertions.assertEquals(randomUuid, kafkaProducer.clientInstanceId(Duration.ofMillis(0L)));
                    if (kafkaProducer != null) {
                        if (0 != 0) {
                            try {
                                kafkaProducer.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            kafkaProducer.close();
                        }
                    }
                    if (mockStatic != null) {
                        if (0 == 0) {
                            mockStatic.close();
                            return;
                        }
                        try {
                            mockStatic.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    }
                } catch (Throwable th5) {
                    th2 = th5;
                    throw th5;
                }
            } catch (Throwable th6) {
                if (kafkaProducer != null) {
                    if (th2 != null) {
                        try {
                            kafkaProducer.close();
                        } catch (Throwable th7) {
                            th2.addSuppressed(th7);
                        }
                    } else {
                        kafkaProducer.close();
                    }
                }
                throw th6;
            }
        } catch (Throwable th8) {
            if (mockStatic != null) {
                if (0 != 0) {
                    try {
                        mockStatic.close();
                    } catch (Throwable th9) {
                        th.addSuppressed(th9);
                    }
                } else {
                    mockStatic.close();
                }
            }
            throw th8;
        }
    }

    @Test
    public void testClientInstanceIdInvalidTimeout() {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9999");
        KafkaProducer kafkaProducer = new KafkaProducer(properties, new StringSerializer(), new StringSerializer());
        Assertions.assertEquals("The timeout cannot be negative.", ((Exception) Assertions.assertThrows(IllegalArgumentException.class, () -> {
            kafkaProducer.clientInstanceId(Duration.ofMillis(-1L));
        })).getMessage());
        kafkaProducer.close();
    }

    @Test
    public void testClientInstanceIdNoTelemetryReporterRegistered() {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9999");
        properties.setProperty("enable.metrics.push", "false");
        KafkaProducer kafkaProducer = new KafkaProducer(properties, new StringSerializer(), new StringSerializer());
        Assertions.assertEquals("Telemetry is not enabled. Set config `enable.metrics.push` to `true`.", ((Exception) Assertions.assertThrows(IllegalStateException.class, () -> {
            kafkaProducer.clientInstanceId(Duration.ofMillis(0L));
        })).getMessage());
        kafkaProducer.close();
    }

    private void verifyInvalidGroupMetadata(ConsumerGroupMetadata consumerGroupMetadata) {
        HashMap hashMap = new HashMap();
        hashMap.put("transactional.id", "some.id");
        hashMap.put("max.block.ms", 10000);
        hashMap.put("bootstrap.servers", "localhost:9000");
        MockTime mockTime = new MockTime(1L);
        MetadataResponse metadataUpdateWith = RequestTestUtils.metadataUpdateWith(1, Collections.singletonMap("topic", 1));
        ProducerMetadata newMetadata = newMetadata(0L, 0L, Long.MAX_VALUE);
        MockClient mockClient = new MockClient((Time) mockTime, (Metadata) newMetadata);
        mockClient.updateMetadata(metadataUpdateWith);
        mockClient.throttle((Node) newMetadata.fetch().nodes().get(0), 5000L);
        mockClient.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "some.id", NODE));
        mockClient.prepareResponse(initProducerIdResponse(1L, (short) 5, Errors.NONE));
        KafkaProducer kafkaProducer = kafkaProducer(hashMap, new StringSerializer(), new StringSerializer(), newMetadata, mockClient, null, mockTime);
        Throwable th = null;
        try {
            try {
                kafkaProducer.initTransactions();
                kafkaProducer.beginTransaction();
                Assertions.assertThrows(IllegalArgumentException.class, () -> {
                    kafkaProducer.sendOffsetsToTransaction(Collections.emptyMap(), consumerGroupMetadata);
                });
                if (kafkaProducer != null) {
                    if (0 == 0) {
                        kafkaProducer.close();
                        return;
                    }
                    try {
                        kafkaProducer.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (kafkaProducer != null) {
                if (th != null) {
                    try {
                        kafkaProducer.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    kafkaProducer.close();
                }
            }
            throw th4;
        }
    }

    private InitProducerIdResponse initProducerIdResponse(long j, short s, Errors errors) {
        return new InitProducerIdResponse(new InitProducerIdResponseData().setErrorCode(errors.code()).setProducerEpoch(s).setProducerId(j).setThrottleTimeMs(0));
    }

    private AddOffsetsToTxnResponse addOffsetsToTxnResponse(Errors errors) {
        return new AddOffsetsToTxnResponse(new AddOffsetsToTxnResponseData().setErrorCode(errors.code()).setThrottleTimeMs(10));
    }

    private TxnOffsetCommitResponse txnOffsetsCommitResponse(Map<TopicPartition, Errors> map) {
        return new TxnOffsetCommitResponse(10, map);
    }

    private EndTxnResponse endTxnResponse(Errors errors) {
        return new EndTxnResponse(new EndTxnResponseData().setErrorCode(errors.code()).setThrottleTimeMs(0));
    }

    @Test
    public void testOnlyCanExecuteCloseAfterInitTransactionsTimeout() {
        HashMap hashMap = new HashMap();
        hashMap.put("transactional.id", "bad-transaction");
        hashMap.put("max.block.ms", 5);
        hashMap.put("bootstrap.servers", "localhost:9000");
        MockTime mockTime = new MockTime();
        MetadataResponse metadataUpdateWith = RequestTestUtils.metadataUpdateWith(1, Collections.singletonMap("topic", 1));
        ProducerMetadata newMetadata = newMetadata(0L, 0L, Long.MAX_VALUE);
        newMetadata.updateWithCurrentRequestVersion(metadataUpdateWith, false, mockTime.milliseconds());
        KafkaProducer kafkaProducer = kafkaProducer(hashMap, new StringSerializer(), new StringSerializer(), newMetadata, new MockClient((Time) mockTime, (Metadata) newMetadata), null, mockTime);
        kafkaProducer.getClass();
        Assertions.assertThrows(org.apache.kafka.common.errors.TimeoutException.class, kafkaProducer::initTransactions);
        try {
            kafkaProducer.getClass();
            Assertions.assertThrows(IllegalStateException.class, kafkaProducer::beginTransaction);
            kafkaProducer.close(Duration.ofMillis(0L));
        } catch (Throwable th) {
            kafkaProducer.close(Duration.ofMillis(0L));
            throw th;
        }
    }

    @Test
    public void testSendToInvalidTopic() throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put("bootstrap.servers", "localhost:9000");
        hashMap.put("max.block.ms", "15000");
        MockTime mockTime = new MockTime();
        MetadataResponse metadataUpdateWith = RequestTestUtils.metadataUpdateWith(1, Collections.emptyMap());
        ProducerMetadata newMetadata = newMetadata(0L, 0L, Long.MAX_VALUE);
        newMetadata.updateWithCurrentRequestVersion(metadataUpdateWith, false, mockTime.milliseconds());
        MockClient mockClient = new MockClient((Time) mockTime, (Metadata) newMetadata);
        KafkaProducer kafkaProducer = kafkaProducer(hashMap, new StringSerializer(), new StringSerializer(), newMetadata, mockClient, null, mockTime);
        ProducerRecord producerRecord = new ProducerRecord("topic abc", "HelloKafka");
        ArrayList arrayList = new ArrayList();
        arrayList.add(new MetadataResponse.TopicMetadata(Errors.INVALID_TOPIC_EXCEPTION, "topic abc", false, Collections.emptyList()));
        mockClient.prepareMetadataUpdate(RequestTestUtils.metadataResponse(new ArrayList(metadataUpdateWith.brokers()), metadataUpdateWith.clusterId(), metadataUpdateWith.controller().id(), arrayList));
        Future send = kafkaProducer.send(producerRecord);
        Assertions.assertEquals(Collections.singleton("topic abc"), newMetadata.fetch().invalidTopics(), "Cluster has incorrect invalid topic list.");
        TestUtils.assertFutureError(send, InvalidTopicException.class);
        kafkaProducer.close(Duration.ofMillis(0L));
    }

    @Test
    public void testCloseWhenWaitingForMetadataUpdate() throws InterruptedException {
        HashMap hashMap = new HashMap();
        hashMap.put("max.block.ms", Long.MAX_VALUE);
        hashMap.put("bootstrap.servers", "localhost:9000");
        String str = "test";
        Time time = Time.SYSTEM;
        MetadataResponse metadataUpdateWith = RequestTestUtils.metadataUpdateWith(1, Collections.emptyMap());
        ProducerMetadata producerMetadata = new ProducerMetadata(0L, 0L, Long.MAX_VALUE, Long.MAX_VALUE, new LogContext(), new ClusterResourceListeners(), time);
        producerMetadata.updateWithCurrentRequestVersion(metadataUpdateWith, false, time.milliseconds());
        KafkaProducer kafkaProducer = kafkaProducer(hashMap, new StringSerializer(), new StringSerializer(), producerMetadata, new MockClient(time, (Metadata) producerMetadata), null, time);
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        AtomicReference atomicReference = new AtomicReference();
        try {
            newSingleThreadExecutor.submit(() -> {
                try {
                    kafkaProducer.send(new ProducerRecord(str, "key", "value"));
                    Assertions.fail();
                } catch (Exception e) {
                    atomicReference.set(e);
                }
            });
            TestUtils.waitForCondition(() -> {
                return producerMetadata.containsTopic(str);
            }, "Timeout when waiting for topic to be added to metadata");
            kafkaProducer.close(Duration.ofMillis(0L));
            TestUtils.waitForCondition(() -> {
                return atomicReference.get() != null;
            }, "No producer exception within timeout");
            Assertions.assertEquals(KafkaException.class, ((Exception) atomicReference.get()).getClass());
            newSingleThreadExecutor.shutdownNow();
        } catch (Throwable th) {
            newSingleThreadExecutor.shutdownNow();
            throw th;
        }
    }

    @Test
    public void testTransactionalMethodThrowsWhenSenderClosed() {
        HashMap hashMap = new HashMap();
        hashMap.put("bootstrap.servers", "localhost:9000");
        hashMap.put("transactional.id", "this-is-a-transactional-id");
        MockTime mockTime = new MockTime();
        MetadataResponse metadataUpdateWith = RequestTestUtils.metadataUpdateWith(1, Collections.emptyMap());
        ProducerMetadata newMetadata = newMetadata(0L, 0L, Long.MAX_VALUE);
        newMetadata.updateWithCurrentRequestVersion(metadataUpdateWith, false, mockTime.milliseconds());
        KafkaProducer kafkaProducer = kafkaProducer(hashMap, new StringSerializer(), new StringSerializer(), newMetadata, new MockClient((Time) mockTime, (Metadata) newMetadata), null, mockTime);
        kafkaProducer.close();
        kafkaProducer.getClass();
        Assertions.assertThrows(IllegalStateException.class, kafkaProducer::initTransactions);
    }

    @Test
    public void testCloseIsForcedOnPendingFindCoordinator() throws InterruptedException {
        HashMap hashMap = new HashMap();
        hashMap.put("bootstrap.servers", "localhost:9000");
        hashMap.put("transactional.id", "this-is-a-transactional-id");
        MockTime mockTime = new MockTime();
        MetadataResponse metadataUpdateWith = RequestTestUtils.metadataUpdateWith(1, Collections.singletonMap("testTopic", 1));
        ProducerMetadata newMetadata = newMetadata(0L, 0L, Long.MAX_VALUE);
        newMetadata.updateWithCurrentRequestVersion(metadataUpdateWith, false, mockTime.milliseconds());
        MockClient mockClient = new MockClient((Time) mockTime, (Metadata) newMetadata);
        KafkaProducer kafkaProducer = kafkaProducer(hashMap, new StringSerializer(), new StringSerializer(), newMetadata, mockClient, null, mockTime);
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        newSingleThreadExecutor.submit(() -> {
            kafkaProducer.getClass();
            Assertions.assertThrows(KafkaException.class, kafkaProducer::initTransactions);
            countDownLatch.countDown();
        });
        mockClient.waitForRequests(1, 2000L);
        kafkaProducer.close(Duration.ofMillis(1000L));
        countDownLatch.await(5000L, TimeUnit.MILLISECONDS);
    }

    @Test
    public void testCloseIsForcedOnPendingInitProducerId() throws InterruptedException {
        HashMap hashMap = new HashMap();
        hashMap.put("bootstrap.servers", "localhost:9000");
        hashMap.put("transactional.id", "this-is-a-transactional-id");
        MockTime mockTime = new MockTime();
        MetadataResponse metadataUpdateWith = RequestTestUtils.metadataUpdateWith(1, Collections.singletonMap("testTopic", 1));
        ProducerMetadata newMetadata = newMetadata(0L, 0L, Long.MAX_VALUE);
        newMetadata.updateWithCurrentRequestVersion(metadataUpdateWith, false, mockTime.milliseconds());
        MockClient mockClient = new MockClient((Time) mockTime, (Metadata) newMetadata);
        KafkaProducer kafkaProducer = kafkaProducer(hashMap, new StringSerializer(), new StringSerializer(), newMetadata, mockClient, null, mockTime);
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        mockClient.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "this-is-a-transactional-id", NODE));
        newSingleThreadExecutor.submit(() -> {
            kafkaProducer.getClass();
            Assertions.assertThrows(KafkaException.class, kafkaProducer::initTransactions);
            countDownLatch.countDown();
        });
        mockClient.waitForRequests(1, 2000L);
        kafkaProducer.close(Duration.ofMillis(1000L));
        countDownLatch.await(5000L, TimeUnit.MILLISECONDS);
    }

    @Test
    public void testCloseIsForcedOnPendingAddOffsetRequest() throws InterruptedException {
        HashMap hashMap = new HashMap();
        hashMap.put("bootstrap.servers", "localhost:9000");
        hashMap.put("transactional.id", "this-is-a-transactional-id");
        MockTime mockTime = new MockTime();
        MetadataResponse metadataUpdateWith = RequestTestUtils.metadataUpdateWith(1, Collections.singletonMap("testTopic", 1));
        ProducerMetadata newMetadata = newMetadata(0L, 0L, Long.MAX_VALUE);
        newMetadata.updateWithCurrentRequestVersion(metadataUpdateWith, false, mockTime.milliseconds());
        MockClient mockClient = new MockClient((Time) mockTime, (Metadata) newMetadata);
        KafkaProducer kafkaProducer = kafkaProducer(hashMap, new StringSerializer(), new StringSerializer(), newMetadata, mockClient, null, mockTime);
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        mockClient.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "this-is-a-transactional-id", NODE));
        newSingleThreadExecutor.submit(() -> {
            kafkaProducer.getClass();
            Assertions.assertThrows(KafkaException.class, kafkaProducer::initTransactions);
            countDownLatch.countDown();
        });
        mockClient.waitForRequests(1, 2000L);
        kafkaProducer.close(Duration.ofMillis(1000L));
        countDownLatch.await(5000L, TimeUnit.MILLISECONDS);
    }

    @Test
    public void testProducerJmxPrefix() throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put("bootstrap.servers", "localhost:9999");
        hashMap.put("client.id", "client-1");
        KafkaProducer kafkaProducer = new KafkaProducer(hashMap, new StringSerializer(), new StringSerializer());
        MBeanServer platformMBeanServer = ManagementFactory.getPlatformMBeanServer();
        kafkaProducer.metrics.addMetric(kafkaProducer.metrics.metricName("test-metric", "grp1", "test metric"), new Avg());
        Assertions.assertNotNull(platformMBeanServer.getObjectInstance(new ObjectName("kafka.producer:type=grp1,client-id=client-1")));
        kafkaProducer.close();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static ProducerMetadata newMetadata(long j, long j2, long j3) {
        return new ProducerMetadata(j, j2, j3, 300000L, new LogContext(), new ClusterResourceListeners(), Time.SYSTEM);
    }

    @Test
    public void configurableObjectsShouldSeeGeneratedClientId() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:9999");
        properties.put("key.serializer", SerializerForClientId.class.getName());
        properties.put("value.serializer", SerializerForClientId.class.getName());
        properties.put("partitioner.class", PartitionerForClientId.class.getName());
        properties.put("interceptor.classes", ProducerInterceptorForClientId.class.getName());
        KafkaProducer kafkaProducer = new KafkaProducer(properties);
        Assertions.assertNotNull(kafkaProducer.getClientId());
        Assertions.assertNotEquals(0, kafkaProducer.getClientId().length());
        Assertions.assertEquals(4, CLIENT_IDS.size());
        CLIENT_IDS.forEach(str -> {
            Assertions.assertEquals(str, kafkaProducer.getClientId());
        });
        kafkaProducer.close();
    }

    @Test
    public void testUnusedConfigs() {
        HashMap hashMap = new HashMap();
        hashMap.put("bootstrap.servers", "localhost:9999");
        hashMap.put("ssl.protocol", "TLS");
        ProducerConfig producerConfig = new ProducerConfig(ProducerConfig.appendSerializerToConfig(hashMap, new StringSerializer(), new StringSerializer()));
        Assertions.assertTrue(producerConfig.unused().contains("ssl.protocol"));
        KafkaProducer kafkaProducer = new KafkaProducer(producerConfig, (Serializer) null, (Serializer) null, (ProducerMetadata) null, (KafkaClient) null, (ProducerInterceptors) null, Time.SYSTEM);
        Throwable th = null;
        try {
            try {
                Assertions.assertTrue(producerConfig.unused().contains("ssl.protocol"));
                if (kafkaProducer != null) {
                    if (0 == 0) {
                        kafkaProducer.close();
                        return;
                    }
                    try {
                        kafkaProducer.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (kafkaProducer != null) {
                if (th != null) {
                    try {
                        kafkaProducer.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    kafkaProducer.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testNullTopicName() {
        Assertions.assertThrows(IllegalArgumentException.class, () -> {
            new ProducerRecord((String) null, 1, "key".getBytes(StandardCharsets.UTF_8), "value".getBytes(StandardCharsets.UTF_8));
        });
    }

    @Test
    public void testCallbackAndInterceptorHandleError() {
        HashMap hashMap = new HashMap();
        hashMap.put("bootstrap.servers", "localhost:9000");
        hashMap.put("max.block.ms", "1000");
        hashMap.put("interceptor.classes", MockProducerInterceptor.class.getName());
        hashMap.put(MockProducerInterceptor.APPEND_STRING_PROP, "something");
        MockTime mockTime = new MockTime();
        ProducerMetadata newMetadata = newMetadata(0L, 0L, Long.MAX_VALUE);
        MockClient mockClient = new MockClient((Time) mockTime, (Metadata) newMetadata);
        String str = "topic abc";
        KafkaProducer kafkaProducer = kafkaProducer(hashMap, new StringSerializer(), new StringSerializer(), newMetadata, mockClient, new ProducerInterceptors(Collections.singletonList(new MockProducerInterceptor())), mockTime);
        Throwable th = null;
        try {
            try {
                kafkaProducer.send(new ProducerRecord("topic abc", "HelloKafka"), (recordMetadata, exc) -> {
                    Assertions.assertNotNull(exc);
                    Assertions.assertNotNull(recordMetadata);
                    Assertions.assertNotNull(recordMetadata.topic(), "Topic name should be valid even on send failure");
                    Assertions.assertEquals(str, recordMetadata.topic());
                    Assertions.assertFalse(recordMetadata.hasOffset());
                    Assertions.assertEquals(-1L, recordMetadata.offset());
                    Assertions.assertFalse(recordMetadata.hasTimestamp());
                    Assertions.assertEquals(-1L, recordMetadata.timestamp());
                    Assertions.assertEquals(-1, recordMetadata.serializedKeySize());
                    Assertions.assertEquals(-1, recordMetadata.serializedValueSize());
                    Assertions.assertEquals(-1, recordMetadata.partition());
                });
                Assertions.assertEquals(1, MockProducerInterceptor.ON_ACKNOWLEDGEMENT_COUNT.intValue());
                if (kafkaProducer != null) {
                    if (0 == 0) {
                        kafkaProducer.close();
                        return;
                    }
                    try {
                        kafkaProducer.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (kafkaProducer != null) {
                if (th != null) {
                    try {
                        kafkaProducer.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    kafkaProducer.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void negativePartitionShouldThrow() {
        HashMap hashMap = new HashMap();
        hashMap.put("bootstrap.servers", "localhost:9000");
        hashMap.put("partitioner.class", BuggyPartitioner.class.getName());
        MockTime mockTime = new MockTime(1L);
        MetadataResponse metadataUpdateWith = RequestTestUtils.metadataUpdateWith(1, Collections.singletonMap("topic", 1));
        ProducerMetadata newMetadata = newMetadata(0L, 0L, Long.MAX_VALUE);
        MockClient mockClient = new MockClient((Time) mockTime, (Metadata) newMetadata);
        mockClient.updateMetadata(metadataUpdateWith);
        KafkaProducer kafkaProducer = kafkaProducer(hashMap, new StringSerializer(), new StringSerializer(), newMetadata, mockClient, null, mockTime);
        Throwable th = null;
        try {
            try {
                Assertions.assertThrows(IllegalArgumentException.class, () -> {
                    kafkaProducer.send(new ProducerRecord("topic", "key", "value"));
                });
                if (kafkaProducer != null) {
                    if (0 == 0) {
                        kafkaProducer.close();
                        return;
                    }
                    try {
                        kafkaProducer.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (kafkaProducer != null) {
                if (th != null) {
                    try {
                        kafkaProducer.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    kafkaProducer.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testPartitionAddedToTransaction() throws Exception {
        KafkaProducerTestContext kafkaProducerTestContext = new KafkaProducerTestContext(this.testInfo, new StringSerializer());
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        Cluster singletonCluster = TestUtils.singletonCluster("foo", 1);
        Mockito.when(Boolean.valueOf(kafkaProducerTestContext.sender.isRunning())).thenReturn(true);
        Mockito.when(kafkaProducerTestContext.metadata.fetch()).thenReturn(singletonCluster);
        ProducerRecord producerRecord = new ProducerRecord("foo", (Integer) null, Long.valueOf(kafkaProducerTestContext.time.milliseconds()), "key", "value");
        FutureRecordMetadata expectAppend = expectAppend(kafkaProducerTestContext, producerRecord, topicPartition, singletonCluster);
        KafkaProducer newKafkaProducer = kafkaProducerTestContext.newKafkaProducer();
        Throwable th = null;
        try {
            try {
                Assertions.assertEquals(expectAppend, newKafkaProducer.send(producerRecord));
                Assertions.assertFalse(expectAppend.isDone());
                ((TransactionManager) Mockito.verify(kafkaProducerTestContext.transactionManager)).maybeAddPartition(topicPartition);
                if (newKafkaProducer != null) {
                    if (0 == 0) {
                        newKafkaProducer.close();
                        return;
                    }
                    try {
                        newKafkaProducer.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (newKafkaProducer != null) {
                if (th != null) {
                    try {
                        newKafkaProducer.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    newKafkaProducer.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testPartitionAddedToTransactionAfterFullBatchRetry() throws Exception {
        KafkaProducerTestContext kafkaProducerTestContext = new KafkaProducerTestContext(this.testInfo, new StringSerializer());
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        TopicPartition topicPartition2 = new TopicPartition("foo", 1);
        Cluster singletonCluster = TestUtils.singletonCluster("foo", 2);
        Mockito.when(Boolean.valueOf(kafkaProducerTestContext.sender.isRunning())).thenReturn(true);
        Mockito.when(kafkaProducerTestContext.metadata.fetch()).thenReturn(singletonCluster);
        ProducerRecord producerRecord = new ProducerRecord("foo", (Integer) null, Long.valueOf(kafkaProducerTestContext.time.milliseconds()), "key", "value");
        FutureRecordMetadata expectAppendWithAbortForNewBatch = expectAppendWithAbortForNewBatch(kafkaProducerTestContext, producerRecord, topicPartition, topicPartition2, singletonCluster);
        KafkaProducer newKafkaProducer = kafkaProducerTestContext.newKafkaProducer();
        Throwable th = null;
        try {
            try {
                Assertions.assertEquals(expectAppendWithAbortForNewBatch, newKafkaProducer.send(producerRecord));
                Assertions.assertFalse(expectAppendWithAbortForNewBatch.isDone());
                ((Partitioner) Mockito.verify(kafkaProducerTestContext.partitioner)).onNewBatch("foo", singletonCluster, 0);
                ((TransactionManager) Mockito.verify(kafkaProducerTestContext.transactionManager, Mockito.never())).maybeAddPartition(topicPartition);
                ((TransactionManager) Mockito.verify(kafkaProducerTestContext.transactionManager)).maybeAddPartition(topicPartition2);
                if (newKafkaProducer != null) {
                    if (0 == 0) {
                        newKafkaProducer.close();
                        return;
                    }
                    try {
                        newKafkaProducer.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (newKafkaProducer != null) {
                if (th != null) {
                    try {
                        newKafkaProducer.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    newKafkaProducer.close();
                }
            }
            throw th4;
        }
    }

    private <T> FutureRecordMetadata expectAppend(KafkaProducerTestContext<T> kafkaProducerTestContext, ProducerRecord<T, T> producerRecord, TopicPartition topicPartition, Cluster cluster) throws InterruptedException {
        byte[] serialize = ((KafkaProducerTestContext) kafkaProducerTestContext).serializer.serialize("topic", producerRecord.key());
        byte[] serialize2 = ((KafkaProducerTestContext) kafkaProducerTestContext).serializer.serialize("topic", producerRecord.value());
        long milliseconds = producerRecord.timestamp() == null ? ((KafkaProducerTestContext) kafkaProducerTestContext).time.milliseconds() : producerRecord.timestamp().longValue();
        FutureRecordMetadata futureRecordMetadata = new FutureRecordMetadata(new ProduceRequestResult(topicPartition), 5, milliseconds, serialize.length, serialize2.length, ((KafkaProducerTestContext) kafkaProducerTestContext).time);
        Mockito.when(Integer.valueOf(((KafkaProducerTestContext) kafkaProducerTestContext).partitioner.partition(topicPartition.topic(), producerRecord.key(), serialize, producerRecord.value(), serialize2, cluster))).thenReturn(Integer.valueOf(topicPartition.partition()));
        Mockito.when(((KafkaProducerTestContext) kafkaProducerTestContext).accumulator.append((String) ArgumentMatchers.eq(topicPartition.topic()), ArgumentMatchers.eq(topicPartition.partition()), ArgumentMatchers.eq(milliseconds), (byte[]) ArgumentMatchers.eq(serialize), (byte[]) ArgumentMatchers.eq(serialize2), (Header[]) ArgumentMatchers.eq(Record.EMPTY_HEADERS), (RecordAccumulator.AppendCallbacks) ArgumentMatchers.any(RecordAccumulator.AppendCallbacks.class), ArgumentMatchers.anyLong(), ArgumentMatchers.eq(true), ArgumentMatchers.anyLong(), (Cluster) ArgumentMatchers.any())).thenAnswer(invocationOnMock -> {
            ((RecordAccumulator.AppendCallbacks) invocationOnMock.getArguments()[6]).setPartition(topicPartition.partition());
            return new RecordAccumulator.RecordAppendResult(futureRecordMetadata, false, false, false, 0);
        });
        return futureRecordMetadata;
    }

    private <T> FutureRecordMetadata expectAppendWithAbortForNewBatch(KafkaProducerTestContext<T> kafkaProducerTestContext, ProducerRecord<T, T> producerRecord, TopicPartition topicPartition, TopicPartition topicPartition2, Cluster cluster) throws InterruptedException {
        byte[] serialize = ((KafkaProducerTestContext) kafkaProducerTestContext).serializer.serialize("topic", producerRecord.key());
        byte[] serialize2 = ((KafkaProducerTestContext) kafkaProducerTestContext).serializer.serialize("topic", producerRecord.value());
        long milliseconds = producerRecord.timestamp() == null ? ((KafkaProducerTestContext) kafkaProducerTestContext).time.milliseconds() : producerRecord.timestamp().longValue();
        FutureRecordMetadata futureRecordMetadata = new FutureRecordMetadata(new ProduceRequestResult(topicPartition2), 0, milliseconds, serialize.length, serialize2.length, ((KafkaProducerTestContext) kafkaProducerTestContext).time);
        Mockito.when(Integer.valueOf(((KafkaProducerTestContext) kafkaProducerTestContext).partitioner.partition(topicPartition.topic(), producerRecord.key(), serialize, producerRecord.value(), serialize2, cluster))).thenReturn(Integer.valueOf(topicPartition.partition())).thenReturn(Integer.valueOf(topicPartition2.partition()));
        Mockito.when(((KafkaProducerTestContext) kafkaProducerTestContext).accumulator.append((String) ArgumentMatchers.eq(topicPartition.topic()), ArgumentMatchers.eq(topicPartition.partition()), ArgumentMatchers.eq(milliseconds), (byte[]) ArgumentMatchers.eq(serialize), (byte[]) ArgumentMatchers.eq(serialize2), (Header[]) ArgumentMatchers.eq(Record.EMPTY_HEADERS), (RecordAccumulator.AppendCallbacks) ArgumentMatchers.any(RecordAccumulator.AppendCallbacks.class), ArgumentMatchers.anyLong(), ArgumentMatchers.eq(true), ArgumentMatchers.anyLong(), (Cluster) ArgumentMatchers.any())).thenAnswer(invocationOnMock -> {
            ((RecordAccumulator.AppendCallbacks) invocationOnMock.getArguments()[6]).setPartition(topicPartition.partition());
            return new RecordAccumulator.RecordAppendResult((FutureRecordMetadata) null, false, false, true, 0);
        });
        Mockito.when(((KafkaProducerTestContext) kafkaProducerTestContext).accumulator.append((String) ArgumentMatchers.eq(topicPartition2.topic()), ArgumentMatchers.eq(topicPartition2.partition()), ArgumentMatchers.eq(milliseconds), (byte[]) ArgumentMatchers.eq(serialize), (byte[]) ArgumentMatchers.eq(serialize2), (Header[]) ArgumentMatchers.eq(Record.EMPTY_HEADERS), (RecordAccumulator.AppendCallbacks) ArgumentMatchers.any(RecordAccumulator.AppendCallbacks.class), ArgumentMatchers.anyLong(), ArgumentMatchers.eq(false), ArgumentMatchers.anyLong(), (Cluster) ArgumentMatchers.any())).thenAnswer(invocationOnMock2 -> {
            ((RecordAccumulator.AppendCallbacks) invocationOnMock2.getArguments()[6]).setPartition(topicPartition2.partition());
            return new RecordAccumulator.RecordAppendResult(futureRecordMetadata, false, true, false, 0);
        });
        return futureRecordMetadata;
    }

    @Test
    void testDeliveryTimeoutAndLingerMsConfig() {
        HashMap hashMap = new HashMap();
        hashMap.put("client.id", "testDeliveryTimeoutAndLingerMsConfig");
        hashMap.put("bootstrap.servers", "localhost:9999");
        hashMap.put("delivery.timeout.ms", 1000);
        hashMap.put("linger.ms", 1000);
        hashMap.put("request.timeout.ms", 1);
        Assertions.assertThrows(KafkaException.class, () -> {
            new KafkaProducer(hashMap, new StringSerializer(), new StringSerializer());
        });
        hashMap.put("delivery.timeout.ms", 1000);
        hashMap.put("linger.ms", 999);
        hashMap.put("request.timeout.ms", 1);
        Assertions.assertDoesNotThrow(() -> {
            new KafkaProducer(hashMap, new StringSerializer(), new StringSerializer()).close();
        });
    }
}
