package org.apache.kafka.clients.producer.internals;

import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import org.apache.kafka.clients.ApiVersion;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.clients.producer.internals.TransactionManager;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.FencedInstanceIdException;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.TransactionalIdAuthorizationException;
import org.apache.kafka.common.errors.UnsupportedForMessageFormatException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.message.EndTxnResponseData;
import org.apache.kafka.common.message.InitProducerIdResponseData;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MemoryRecordsBuilder;
import org.apache.kafka.common.record.MutableRecordBatch;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.AddOffsetsToTxnRequest;
import org.apache.kafka.common.requests.AddOffsetsToTxnResponse;
import org.apache.kafka.common.requests.AddPartitionsToTxnRequest;
import org.apache.kafka.common.requests.AddPartitionsToTxnResponse;
import org.apache.kafka.common.requests.EndTxnRequest;
import org.apache.kafka.common.requests.EndTxnResponse;
import org.apache.kafka.common.requests.FindCoordinatorRequest;
import org.apache.kafka.common.requests.FindCoordinatorResponse;
import org.apache.kafka.common.requests.InitProducerIdRequest;
import org.apache.kafka.common.requests.InitProducerIdResponse;
import org.apache.kafka.common.requests.ProduceRequest;
import org.apache.kafka.common.requests.ProduceResponse;
import org.apache.kafka.common.requests.TransactionResult;
import org.apache.kafka.common.requests.TxnOffsetCommitRequest;
import org.apache.kafka.common.requests.TxnOffsetCommitResponse;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.ProducerIdAndEpoch;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.test.TestUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/clients/producer/internals/TransactionManagerTest.class */
public class TransactionManagerTest {
    private static final int MAX_REQUEST_SIZE = 1048576;
    private static final short ACKS_ALL = -1;
    private static final int MAX_RETRIES = Integer.MAX_VALUE;
    private static final int MAX_BLOCK_TIMEOUT = 1000;
    private static final int REQUEST_TIMEOUT = 1000;
    private static final long DEFAULT_RETRY_BACKOFF_MS = 100;
    private final String transactionalId = "foobar";
    private final int transactionTimeoutMs = 1121;
    private final String topic = "test";
    private final TopicPartition tp0 = new TopicPartition("test", 0);
    private final TopicPartition tp1 = new TopicPartition("test", 1);
    private final LogContext logContext = new LogContext();
    private final MockTime time = new MockTime();
    private final ProducerMetadata metadata = new ProducerMetadata(0, Long.MAX_VALUE, Long.MAX_VALUE, this.logContext, new ClusterResourceListeners(), this.time);
    private final MockClient client = new MockClient((Time) this.time, (Metadata) this.metadata);
    private final ApiVersions apiVersions = new ApiVersions();
    private RecordAccumulator accumulator = null;
    private Sender sender = null;
    private TransactionManager transactionManager = null;
    private Node brokerNode = null;

    @Before
    public void setup() {
        this.metadata.add("test", this.time.milliseconds());
        this.client.updateMetadata(TestUtils.metadataUpdateWith(1, Collections.singletonMap("test", 2)));
        this.brokerNode = new Node(0, "localhost", 2211);
        initializeTransactionManager(Optional.of("foobar"));
    }

    private void initializeTransactionManager(Optional<String> optional) {
        Metrics metrics = new Metrics(this.time);
        this.apiVersions.update("0", new NodeApiVersions(Arrays.asList(new ApiVersion(ApiKeys.INIT_PRODUCER_ID.id, (short) 0, (short) 3), new ApiVersion(ApiKeys.PRODUCE.id, (short) 0, (short) 7))));
        this.transactionManager = new TransactionManager(this.logContext, optional.orElse(null), 1121, 100L, this.apiVersions);
        this.brokerNode = new Node(0, "localhost", 2211);
        this.accumulator = new RecordAccumulator(this.logContext, 16384, CompressionType.NONE, 0, 0L, 3000, metrics, "producer-metrics", this.time, this.apiVersions, this.transactionManager, new BufferPool(1048576L, 16384, metrics, this.time, "producer-metrics"));
        this.sender = new Sender(this.logContext, this.client, this.metadata, this.accumulator, true, MAX_REQUEST_SIZE, (short) -1, MAX_RETRIES, new SenderMetricsRegistry(metrics), this.time, 1000, 50L, this.transactionManager, this.apiVersions);
    }

    @Test
    public void testSenderShutdownWithPendingTransactions() throws Exception {
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        this.transactionManager.failIfNotReadyForSend();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        FutureRecordMetadata appendToAccumulator = appendToAccumulator(this.tp0);
        prepareAddPartitionsToTxn(this.tp0, Errors.NONE);
        prepareProduceResponse(Errors.NONE, 13131L, (short) 1);
        runUntil(() -> {
            return Boolean.valueOf(!this.client.hasPendingResponses());
        });
        this.sender.initiateClose();
        this.sender.runOnce();
        TransactionalRequestResult beginCommit = this.transactionManager.beginCommit();
        prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, 13131L, (short) 1);
        beginCommit.getClass();
        runUntil(beginCommit::isCompleted);
        appendToAccumulator.getClass();
        runUntil(appendToAccumulator::isDone);
    }

    @Test
    public void testEndTxnNotSentIfIncompleteBatches() {
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        this.transactionManager.failIfNotReadyForSend();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        prepareAddPartitionsToTxn(this.tp0, Errors.NONE);
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.isPartitionAdded(this.tp0));
        });
        this.transactionManager.beginCommit();
        Assert.assertNull(this.transactionManager.nextRequest(true));
        Assert.assertTrue(this.transactionManager.nextRequest(false).isEndTxn());
    }

    @Test(expected = IllegalStateException.class)
    public void testFailIfNotReadyForSendNoProducerId() {
        this.transactionManager.failIfNotReadyForSend();
    }

    @Test
    public void testFailIfNotReadyForSendIdempotentProducer() {
        initializeTransactionManager(Optional.empty());
        this.transactionManager.failIfNotReadyForSend();
    }

    @Test(expected = KafkaException.class)
    public void testFailIfNotReadyForSendIdempotentProducerFatalError() {
        initializeTransactionManager(Optional.empty());
        this.transactionManager.transitionToFatalError(new KafkaException());
        this.transactionManager.failIfNotReadyForSend();
    }

    @Test(expected = IllegalStateException.class)
    public void testFailIfNotReadyForSendNoOngoingTransaction() {
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.failIfNotReadyForSend();
    }

    @Test(expected = KafkaException.class)
    public void testFailIfNotReadyForSendAfterAbortableError() {
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        this.transactionManager.transitionToAbortableError(new KafkaException());
        this.transactionManager.failIfNotReadyForSend();
    }

    @Test(expected = KafkaException.class)
    public void testFailIfNotReadyForSendAfterFatalError() {
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.transitionToFatalError(new KafkaException());
        this.transactionManager.failIfNotReadyForSend();
    }

    @Test
    public void testHasOngoingTransactionSuccessfulAbort() {
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        Assert.assertFalse(this.transactionManager.hasOngoingTransaction());
        doInitTransactions(13131L, (short) 1);
        Assert.assertFalse(this.transactionManager.hasOngoingTransaction());
        this.transactionManager.beginTransaction();
        Assert.assertTrue(this.transactionManager.hasOngoingTransaction());
        this.transactionManager.failIfNotReadyForSend();
        this.transactionManager.maybeAddPartitionToTransaction(topicPartition);
        TransactionManager transactionManager = this.transactionManager;
        transactionManager.getClass();
        runUntil(transactionManager::hasOngoingTransaction);
        prepareAddPartitionsToTxn(topicPartition, Errors.NONE);
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.isPartitionAdded(topicPartition));
        });
        this.transactionManager.beginAbort();
        Assert.assertTrue(this.transactionManager.hasOngoingTransaction());
        prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, 13131L, (short) 1);
        runUntil(() -> {
            return Boolean.valueOf(!this.transactionManager.hasOngoingTransaction());
        });
    }

    @Test
    public void testHasOngoingTransactionSuccessfulCommit() {
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        Assert.assertFalse(this.transactionManager.hasOngoingTransaction());
        doInitTransactions(13131L, (short) 1);
        Assert.assertFalse(this.transactionManager.hasOngoingTransaction());
        this.transactionManager.beginTransaction();
        Assert.assertTrue(this.transactionManager.hasOngoingTransaction());
        this.transactionManager.failIfNotReadyForSend();
        this.transactionManager.maybeAddPartitionToTransaction(topicPartition);
        Assert.assertTrue(this.transactionManager.hasOngoingTransaction());
        prepareAddPartitionsToTxn(topicPartition, Errors.NONE);
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.isPartitionAdded(topicPartition));
        });
        this.transactionManager.beginCommit();
        Assert.assertTrue(this.transactionManager.hasOngoingTransaction());
        prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, 13131L, (short) 1);
        runUntil(() -> {
            return Boolean.valueOf(!this.transactionManager.hasOngoingTransaction());
        });
    }

    @Test
    public void testHasOngoingTransactionAbortableError() {
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        Assert.assertFalse(this.transactionManager.hasOngoingTransaction());
        doInitTransactions(13131L, (short) 1);
        Assert.assertFalse(this.transactionManager.hasOngoingTransaction());
        this.transactionManager.beginTransaction();
        Assert.assertTrue(this.transactionManager.hasOngoingTransaction());
        this.transactionManager.failIfNotReadyForSend();
        this.transactionManager.maybeAddPartitionToTransaction(topicPartition);
        Assert.assertTrue(this.transactionManager.hasOngoingTransaction());
        prepareAddPartitionsToTxn(topicPartition, Errors.NONE);
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.isPartitionAdded(topicPartition));
        });
        this.transactionManager.transitionToAbortableError(new KafkaException());
        Assert.assertTrue(this.transactionManager.hasOngoingTransaction());
        this.transactionManager.beginAbort();
        Assert.assertTrue(this.transactionManager.hasOngoingTransaction());
        prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, 13131L, (short) 1);
        runUntil(() -> {
            return Boolean.valueOf(!this.transactionManager.hasOngoingTransaction());
        });
    }

    @Test
    public void testHasOngoingTransactionFatalError() {
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        Assert.assertFalse(this.transactionManager.hasOngoingTransaction());
        doInitTransactions(13131L, (short) 1);
        Assert.assertFalse(this.transactionManager.hasOngoingTransaction());
        this.transactionManager.beginTransaction();
        Assert.assertTrue(this.transactionManager.hasOngoingTransaction());
        this.transactionManager.failIfNotReadyForSend();
        this.transactionManager.maybeAddPartitionToTransaction(topicPartition);
        Assert.assertTrue(this.transactionManager.hasOngoingTransaction());
        prepareAddPartitionsToTxn(topicPartition, Errors.NONE);
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.isPartitionAdded(topicPartition));
        });
        this.transactionManager.transitionToFatalError(new KafkaException());
        Assert.assertFalse(this.transactionManager.hasOngoingTransaction());
    }

    @Test
    public void testMaybeAddPartitionToTransaction() {
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        this.transactionManager.failIfNotReadyForSend();
        this.transactionManager.maybeAddPartitionToTransaction(topicPartition);
        Assert.assertTrue(this.transactionManager.hasPartitionsToAdd());
        Assert.assertFalse(this.transactionManager.isPartitionAdded(topicPartition));
        Assert.assertTrue(this.transactionManager.isPartitionPendingAdd(topicPartition));
        prepareAddPartitionsToTxn(topicPartition, Errors.NONE);
        Assert.assertTrue(this.transactionManager.hasPartitionsToAdd());
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.isPartitionAdded(topicPartition));
        });
        Assert.assertFalse(this.transactionManager.hasPartitionsToAdd());
        Assert.assertFalse(this.transactionManager.isPartitionPendingAdd(topicPartition));
        this.transactionManager.failIfNotReadyForSend();
        this.transactionManager.maybeAddPartitionToTransaction(topicPartition);
        Assert.assertFalse(this.transactionManager.hasPartitionsToAdd());
        Assert.assertTrue(this.transactionManager.isPartitionAdded(topicPartition));
        Assert.assertFalse(this.transactionManager.isPartitionPendingAdd(topicPartition));
    }

    @Test
    public void testAddPartitionToTransactionOverridesRetryBackoffForConcurrentTransactions() {
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        this.transactionManager.failIfNotReadyForSend();
        this.transactionManager.maybeAddPartitionToTransaction(topicPartition);
        Assert.assertTrue(this.transactionManager.hasPartitionsToAdd());
        Assert.assertFalse(this.transactionManager.isPartitionAdded(topicPartition));
        Assert.assertTrue(this.transactionManager.isPartitionPendingAdd(topicPartition));
        prepareAddPartitionsToTxn(topicPartition, Errors.CONCURRENT_TRANSACTIONS);
        runUntil(() -> {
            return Boolean.valueOf(!this.client.hasPendingResponses());
        });
        TransactionManager.TxnRequestHandler nextRequest = this.transactionManager.nextRequest(false);
        Assert.assertNotNull(nextRequest);
        Assert.assertEquals(20L, nextRequest.retryBackoffMs());
    }

    @Test
    public void testAddPartitionToTransactionRetainsRetryBackoffForRegularRetriableError() {
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        this.transactionManager.failIfNotReadyForSend();
        this.transactionManager.maybeAddPartitionToTransaction(topicPartition);
        Assert.assertTrue(this.transactionManager.hasPartitionsToAdd());
        Assert.assertFalse(this.transactionManager.isPartitionAdded(topicPartition));
        Assert.assertTrue(this.transactionManager.isPartitionPendingAdd(topicPartition));
        prepareAddPartitionsToTxn(topicPartition, Errors.COORDINATOR_NOT_AVAILABLE);
        runUntil(() -> {
            return Boolean.valueOf(!this.client.hasPendingResponses());
        });
        TransactionManager.TxnRequestHandler nextRequest = this.transactionManager.nextRequest(false);
        Assert.assertNotNull(nextRequest);
        Assert.assertEquals(100L, nextRequest.retryBackoffMs());
    }

    @Test
    public void testAddPartitionToTransactionRetainsRetryBackoffWhenPartitionsAlreadyAdded() {
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        this.transactionManager.failIfNotReadyForSend();
        this.transactionManager.maybeAddPartitionToTransaction(topicPartition);
        Assert.assertTrue(this.transactionManager.hasPartitionsToAdd());
        Assert.assertFalse(this.transactionManager.isPartitionAdded(topicPartition));
        Assert.assertTrue(this.transactionManager.isPartitionPendingAdd(topicPartition));
        prepareAddPartitionsToTxn(topicPartition, Errors.NONE);
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.isPartitionAdded(topicPartition));
        });
        TopicPartition topicPartition2 = new TopicPartition("foo", 1);
        this.transactionManager.failIfNotReadyForSend();
        this.transactionManager.maybeAddPartitionToTransaction(topicPartition2);
        prepareAddPartitionsToTxn(topicPartition2, Errors.CONCURRENT_TRANSACTIONS);
        TransactionManager.TxnRequestHandler nextRequest = this.transactionManager.nextRequest(false);
        Assert.assertNotNull(nextRequest);
        Assert.assertEquals(100L, nextRequest.retryBackoffMs());
    }

    @Test(expected = IllegalStateException.class)
    public void testMaybeAddPartitionToTransactionBeforeInitTransactions() {
        this.transactionManager.failIfNotReadyForSend();
        this.transactionManager.maybeAddPartitionToTransaction(new TopicPartition("foo", 0));
    }

    @Test(expected = IllegalStateException.class)
    public void testMaybeAddPartitionToTransactionBeforeBeginTransaction() {
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.failIfNotReadyForSend();
        this.transactionManager.maybeAddPartitionToTransaction(new TopicPartition("foo", 0));
    }

    @Test(expected = KafkaException.class)
    public void testMaybeAddPartitionToTransactionAfterAbortableError() {
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        this.transactionManager.transitionToAbortableError(new KafkaException());
        this.transactionManager.failIfNotReadyForSend();
        this.transactionManager.maybeAddPartitionToTransaction(new TopicPartition("foo", 0));
    }

    @Test(expected = KafkaException.class)
    public void testMaybeAddPartitionToTransactionAfterFatalError() {
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.transitionToFatalError(new KafkaException());
        this.transactionManager.failIfNotReadyForSend();
        this.transactionManager.maybeAddPartitionToTransaction(new TopicPartition("foo", 0));
    }

    @Test
    public void testIsSendToPartitionAllowedWithPendingPartitionAfterAbortableError() {
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        this.transactionManager.failIfNotReadyForSend();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        this.transactionManager.transitionToAbortableError(new KafkaException());
        Assert.assertFalse(this.transactionManager.isSendToPartitionAllowed(this.tp0));
        Assert.assertTrue(this.transactionManager.hasAbortableError());
    }

    @Test
    public void testIsSendToPartitionAllowedWithInFlightPartitionAddAfterAbortableError() {
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        this.transactionManager.failIfNotReadyForSend();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        TransactionManager transactionManager = this.transactionManager;
        transactionManager.getClass();
        runUntil(transactionManager::hasInFlightRequest);
        this.transactionManager.transitionToAbortableError(new KafkaException());
        Assert.assertFalse(this.transactionManager.isSendToPartitionAllowed(this.tp0));
        Assert.assertTrue(this.transactionManager.hasAbortableError());
    }

    @Test
    public void testIsSendToPartitionAllowedWithPendingPartitionAfterFatalError() {
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        this.transactionManager.failIfNotReadyForSend();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        this.transactionManager.transitionToFatalError(new KafkaException());
        Assert.assertFalse(this.transactionManager.isSendToPartitionAllowed(this.tp0));
        Assert.assertTrue(this.transactionManager.hasFatalError());
    }

    @Test
    public void testIsSendToPartitionAllowedWithInFlightPartitionAddAfterFatalError() {
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        this.transactionManager.failIfNotReadyForSend();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        TransactionManager transactionManager = this.transactionManager;
        transactionManager.getClass();
        runUntil(transactionManager::hasInFlightRequest);
        this.transactionManager.transitionToFatalError(new KafkaException());
        Assert.assertFalse(this.transactionManager.isSendToPartitionAllowed(this.tp0));
        Assert.assertTrue(this.transactionManager.hasFatalError());
    }

    @Test
    public void testIsSendToPartitionAllowedWithAddedPartitionAfterAbortableError() {
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        this.transactionManager.failIfNotReadyForSend();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short) 1, 13131L);
        runUntil(() -> {
            return Boolean.valueOf(!this.transactionManager.hasPartitionsToAdd());
        });
        this.transactionManager.transitionToAbortableError(new KafkaException());
        Assert.assertTrue(this.transactionManager.isSendToPartitionAllowed(this.tp0));
        Assert.assertTrue(this.transactionManager.hasAbortableError());
    }

    @Test
    public void testIsSendToPartitionAllowedWithAddedPartitionAfterFatalError() {
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        this.transactionManager.failIfNotReadyForSend();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short) 1, 13131L);
        runUntil(() -> {
            return Boolean.valueOf(!this.transactionManager.hasPartitionsToAdd());
        });
        this.transactionManager.transitionToFatalError(new KafkaException());
        Assert.assertFalse(this.transactionManager.isSendToPartitionAllowed(this.tp0));
        Assert.assertTrue(this.transactionManager.hasFatalError());
    }

    @Test
    public void testIsSendToPartitionAllowedWithPartitionNotAdded() {
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        Assert.assertFalse(this.transactionManager.isSendToPartitionAllowed(this.tp0));
    }

    @Test
    public void testDefaultSequenceNumber() {
        initializeTransactionManager(Optional.empty());
        Assert.assertEquals(this.transactionManager.sequenceNumber(this.tp0).intValue(), 0L);
        this.transactionManager.incrementSequenceNumber(this.tp0, 3);
        Assert.assertEquals(this.transactionManager.sequenceNumber(this.tp0).intValue(), 3L);
    }

    @Test
    public void testBumpEpochAndResetSequenceNumbersAfterUnknownProducerId() {
        initializeTransactionManager(Optional.empty());
        initializeIdempotentProducerId(13131L, (short) 1);
        ProducerBatch writeIdempotentBatchWithValue = writeIdempotentBatchWithValue(this.transactionManager, this.tp0, "1");
        ProducerBatch writeIdempotentBatchWithValue2 = writeIdempotentBatchWithValue(this.transactionManager, this.tp0, "2");
        ProducerBatch writeIdempotentBatchWithValue3 = writeIdempotentBatchWithValue(this.transactionManager, this.tp0, "3");
        ProducerBatch writeIdempotentBatchWithValue4 = writeIdempotentBatchWithValue(this.transactionManager, this.tp0, "4");
        ProducerBatch writeIdempotentBatchWithValue5 = writeIdempotentBatchWithValue(this.transactionManager, this.tp0, "5");
        Assert.assertEquals(5L, this.transactionManager.sequenceNumber(this.tp0).intValue());
        long milliseconds = this.time.milliseconds();
        ProduceResponse.PartitionResponse partitionResponse = new ProduceResponse.PartitionResponse(Errors.NONE, 500L, milliseconds, 0L);
        writeIdempotentBatchWithValue.done(500L, milliseconds, (RuntimeException) null);
        this.transactionManager.handleCompletedBatch(writeIdempotentBatchWithValue, partitionResponse);
        Assert.assertTrue(this.transactionManager.canRetry(new ProduceResponse.PartitionResponse(Errors.UNKNOWN_PRODUCER_ID, -1L, -1L, 500L), writeIdempotentBatchWithValue2));
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.producerIdAndEpoch().epoch == 2);
        });
        Assert.assertEquals(2L, writeIdempotentBatchWithValue2.producerEpoch());
        Assert.assertEquals(0L, writeIdempotentBatchWithValue2.baseSequence());
        Assert.assertEquals(1L, writeIdempotentBatchWithValue3.baseSequence());
        Assert.assertEquals(2L, writeIdempotentBatchWithValue4.baseSequence());
        Assert.assertEquals(3L, writeIdempotentBatchWithValue5.baseSequence());
    }

    @Test
    public void testBatchFailureAfterProducerReset() {
        initializeTransactionManager(Optional.empty());
        initializeIdempotentProducerId(13131L, Short.MAX_VALUE);
        ProducerBatch writeIdempotentBatchWithValue = writeIdempotentBatchWithValue(this.transactionManager, this.tp0, "1");
        ProducerBatch writeIdempotentBatchWithValue2 = writeIdempotentBatchWithValue(this.transactionManager, this.tp1, "1");
        this.transactionManager.handleCompletedBatch(writeIdempotentBatchWithValue, new ProduceResponse.PartitionResponse(Errors.NONE, -1L, -1L, 400L));
        this.transactionManager.handleCompletedBatch(writeIdempotentBatchWithValue2, new ProduceResponse.PartitionResponse(Errors.NONE, -1L, -1L, 400L));
        ProducerBatch writeIdempotentBatchWithValue3 = writeIdempotentBatchWithValue(this.transactionManager, this.tp0, "2");
        ProducerBatch writeIdempotentBatchWithValue4 = writeIdempotentBatchWithValue(this.transactionManager, this.tp1, "2");
        Assert.assertEquals(2L, this.transactionManager.sequenceNumber(this.tp0).intValue());
        Assert.assertEquals(2L, this.transactionManager.sequenceNumber(this.tp1).intValue());
        Assert.assertTrue(this.transactionManager.canRetry(new ProduceResponse.PartitionResponse(Errors.UNKNOWN_PRODUCER_ID, -1L, -1L, 400L), writeIdempotentBatchWithValue));
        this.transactionManager.handleCompletedBatch(writeIdempotentBatchWithValue2, new ProduceResponse.PartitionResponse(Errors.NONE, -1L, -1L, 400L));
        this.transactionManager.bumpIdempotentEpochAndResetIdIfNeeded();
        Assert.assertEquals(1L, this.transactionManager.sequenceNumber(this.tp0).intValue());
        Assert.assertEquals(writeIdempotentBatchWithValue3, this.transactionManager.nextBatchBySequence(this.tp0));
        Assert.assertEquals(2L, this.transactionManager.sequenceNumber(this.tp1).intValue());
        Assert.assertEquals(writeIdempotentBatchWithValue4, this.transactionManager.nextBatchBySequence(this.tp1));
    }

    @Test
    public void testBatchCompletedAfterProducerReset() {
        initializeTransactionManager(Optional.empty());
        initializeIdempotentProducerId(13131L, Short.MAX_VALUE);
        ProducerBatch writeIdempotentBatchWithValue = writeIdempotentBatchWithValue(this.transactionManager, this.tp0, "1");
        writeIdempotentBatchWithValue(this.transactionManager, this.tp1, "1");
        ProducerBatch writeIdempotentBatchWithValue2 = writeIdempotentBatchWithValue(this.transactionManager, this.tp0, "2");
        Assert.assertEquals(2L, this.transactionManager.sequenceNumber(this.tp0).intValue());
        this.transactionManager.requestEpochBumpForPartition(this.tp1);
        this.transactionManager.bumpIdempotentEpochAndResetIdIfNeeded();
        initializeIdempotentProducerId(13132L, (short) 0);
        this.transactionManager.handleCompletedBatch(writeIdempotentBatchWithValue, new ProduceResponse.PartitionResponse(Errors.NONE, 500L, this.time.milliseconds(), 0L));
        Assert.assertEquals(2L, this.transactionManager.sequenceNumber(this.tp0).intValue());
        Assert.assertEquals(0L, this.transactionManager.lastAckedSequence(this.tp0).getAsInt());
        Assert.assertEquals(writeIdempotentBatchWithValue2, this.transactionManager.nextBatchBySequence(this.tp0));
        Assert.assertEquals(32767L, this.transactionManager.nextBatchBySequence(this.tp0).producerEpoch());
        this.transactionManager.handleCompletedBatch(writeIdempotentBatchWithValue2, new ProduceResponse.PartitionResponse(Errors.NONE, 500L, this.time.milliseconds(), 0L));
        Assert.assertEquals(0L, this.transactionManager.sequenceNumber(this.tp0).intValue());
        Assert.assertFalse(this.transactionManager.lastAckedSequence(this.tp0).isPresent());
        Assert.assertNull(this.transactionManager.nextBatchBySequence(this.tp0));
    }

    private ProducerBatch writeIdempotentBatchWithValue(TransactionManager transactionManager, TopicPartition topicPartition, String str) {
        int intValue = transactionManager.sequenceNumber(topicPartition).intValue();
        transactionManager.incrementSequenceNumber(topicPartition, 1);
        ProducerBatch batchWithValue = batchWithValue(topicPartition, str);
        batchWithValue.setProducerState(transactionManager.producerIdAndEpoch(), intValue, false);
        transactionManager.addInFlightBatch(batchWithValue);
        batchWithValue.close();
        return batchWithValue;
    }

    private ProducerBatch batchWithValue(TopicPartition topicPartition, String str) {
        MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(64), CompressionType.NONE, TimestampType.CREATE_TIME, 0L);
        long milliseconds = this.time.milliseconds();
        ProducerBatch producerBatch = new ProducerBatch(topicPartition, builder, milliseconds);
        producerBatch.tryAppend(milliseconds, new byte[0], str.getBytes(), new Header[0], (Callback) null, milliseconds);
        return producerBatch;
    }

    @Test
    public void testSequenceNumberOverflow() {
        initializeTransactionManager(Optional.empty());
        Assert.assertEquals(this.transactionManager.sequenceNumber(this.tp0).intValue(), 0L);
        this.transactionManager.incrementSequenceNumber(this.tp0, MAX_RETRIES);
        Assert.assertEquals(this.transactionManager.sequenceNumber(this.tp0).intValue(), 2147483647L);
        this.transactionManager.incrementSequenceNumber(this.tp0, 100);
        Assert.assertEquals(this.transactionManager.sequenceNumber(this.tp0).intValue(), 99L);
        this.transactionManager.incrementSequenceNumber(this.tp0, MAX_RETRIES);
        Assert.assertEquals(this.transactionManager.sequenceNumber(this.tp0).intValue(), 98L);
    }

    @Test
    public void testProducerIdReset() {
        initializeTransactionManager(Optional.empty());
        initializeIdempotentProducerId(15L, Short.MAX_VALUE);
        Assert.assertEquals(this.transactionManager.sequenceNumber(this.tp0).intValue(), 0L);
        Assert.assertEquals(this.transactionManager.sequenceNumber(this.tp1).intValue(), 0L);
        this.transactionManager.incrementSequenceNumber(this.tp0, 3);
        Assert.assertEquals(this.transactionManager.sequenceNumber(this.tp0).intValue(), 3L);
        this.transactionManager.incrementSequenceNumber(this.tp1, 3);
        Assert.assertEquals(this.transactionManager.sequenceNumber(this.tp1).intValue(), 3L);
        this.transactionManager.requestEpochBumpForPartition(this.tp0);
        this.transactionManager.bumpIdempotentEpochAndResetIdIfNeeded();
        Assert.assertEquals(this.transactionManager.sequenceNumber(this.tp0).intValue(), 0L);
        Assert.assertEquals(this.transactionManager.sequenceNumber(this.tp1).intValue(), 3L);
    }

    @Test
    public void testBasicTransaction() throws InterruptedException {
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        this.transactionManager.failIfNotReadyForSend();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        FutureRecordMetadata appendToAccumulator = appendToAccumulator(this.tp0);
        Assert.assertFalse(appendToAccumulator.isDone());
        prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short) 1, 13131L);
        prepareProduceResponse(Errors.NONE, 13131L, (short) 1);
        Assert.assertFalse(this.transactionManager.transactionContainsPartition(this.tp0));
        Assert.assertFalse(this.transactionManager.isSendToPartitionAllowed(this.tp0));
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.transactionContainsPartition(this.tp0));
        });
        Assert.assertTrue(this.transactionManager.isSendToPartitionAllowed(this.tp0));
        Assert.assertFalse(appendToAccumulator.isDone());
        appendToAccumulator.getClass();
        runUntil(appendToAccumulator::isDone);
        HashMap hashMap = new HashMap();
        hashMap.put(this.tp1, new OffsetAndMetadata(1L));
        TransactionalRequestResult sendOffsetsToTransaction = this.transactionManager.sendOffsetsToTransaction(hashMap, new ConsumerGroupMetadata("myconsumergroup"));
        Assert.assertFalse(this.transactionManager.hasPendingOffsetCommits());
        prepareAddOffsetsToTxnResponse(Errors.NONE, "myconsumergroup", 13131L, (short) 1);
        TransactionManager transactionManager = this.transactionManager;
        transactionManager.getClass();
        runUntil(transactionManager::hasPendingOffsetCommits);
        Assert.assertFalse(sendOffsetsToTransaction.isCompleted());
        HashMap hashMap2 = new HashMap();
        hashMap2.put(this.tp1, Errors.NONE);
        prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.GROUP, "myconsumergroup");
        prepareTxnOffsetCommitResponse("myconsumergroup", 13131L, (short) 1, hashMap2);
        Assert.assertNull(this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.GROUP));
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.GROUP) != null);
        });
        Assert.assertTrue(this.transactionManager.hasPendingOffsetCommits());
        runUntil(() -> {
            return Boolean.valueOf(!this.transactionManager.hasPendingOffsetCommits());
        });
        Assert.assertTrue(sendOffsetsToTransaction.isCompleted());
        this.transactionManager.beginCommit();
        prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, 13131L, (short) 1);
        runUntil(() -> {
            return Boolean.valueOf(!this.transactionManager.hasOngoingTransaction());
        });
        Assert.assertFalse(this.transactionManager.isCompleting());
        Assert.assertFalse(this.transactionManager.transactionContainsPartition(this.tp0));
    }

    @Test
    public void testDisconnectAndRetry() {
        this.transactionManager.initializeTransactions();
        prepareFindCoordinatorResponse(Errors.NONE, true, FindCoordinatorRequest.CoordinatorType.TRANSACTION, "foobar");
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION) == null);
        });
        prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, "foobar");
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION) != null);
        });
        Assert.assertEquals(this.brokerNode, this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
    }

    @Test
    public void testInitializeTransactionsTwiceRaisesError() {
        doInitTransactions(13131L, (short) 1);
        Assert.assertTrue(this.transactionManager.hasProducerId());
        Assert.assertThrows(KafkaException.class, () -> {
            this.transactionManager.initializeTransactions();
        });
    }

    @Test
    public void testUnsupportedFindCoordinator() {
        this.transactionManager.initializeTransactions();
        this.client.prepareUnsupportedVersionResponse(abstractRequest -> {
            FindCoordinatorRequest findCoordinatorRequest = (FindCoordinatorRequest) abstractRequest;
            Assert.assertEquals(FindCoordinatorRequest.CoordinatorType.forId(findCoordinatorRequest.data().keyType()), FindCoordinatorRequest.CoordinatorType.TRANSACTION);
            Assert.assertEquals(findCoordinatorRequest.data().key(), "foobar");
            return true;
        });
        TransactionManager transactionManager = this.transactionManager;
        transactionManager.getClass();
        runUntil(transactionManager::hasFatalError);
        Assert.assertTrue(this.transactionManager.hasFatalError());
        Assert.assertTrue(this.transactionManager.lastError() instanceof UnsupportedVersionException);
    }

    @Test
    public void testUnsupportedInitTransactions() {
        this.transactionManager.initializeTransactions();
        prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, "foobar");
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION) != null);
        });
        Assert.assertFalse(this.transactionManager.hasError());
        this.client.prepareUnsupportedVersionResponse(abstractRequest -> {
            Assert.assertEquals(((InitProducerIdRequest) abstractRequest).data.transactionalId(), "foobar");
            Assert.assertEquals(r0.data.transactionTimeoutMs(), 1121L);
            return true;
        });
        TransactionManager transactionManager = this.transactionManager;
        transactionManager.getClass();
        runUntil(transactionManager::hasFatalError);
        Assert.assertTrue(this.transactionManager.hasFatalError());
        Assert.assertTrue(this.transactionManager.lastError() instanceof UnsupportedVersionException);
    }

    @Test
    public void testUnsupportedForMessageFormatInTxnOffsetCommit() {
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        TransactionalRequestResult sendOffsetsToTransaction = this.transactionManager.sendOffsetsToTransaction(Collections.singletonMap(topicPartition, new OffsetAndMetadata(39L)), new ConsumerGroupMetadata("consumer"));
        prepareAddOffsetsToTxnResponse(Errors.NONE, "consumer", 13131L, (short) 1);
        prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.GROUP, "consumer");
        prepareTxnOffsetCommitResponse("consumer", 13131L, (short) 1, Collections.singletonMap(topicPartition, Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT));
        TransactionManager transactionManager = this.transactionManager;
        transactionManager.getClass();
        runUntil(transactionManager::hasError);
        Assert.assertTrue(this.transactionManager.lastError() instanceof UnsupportedForMessageFormatException);
        Assert.assertTrue(sendOffsetsToTransaction.isCompleted());
        Assert.assertFalse(sendOffsetsToTransaction.isSuccessful());
        Assert.assertTrue(sendOffsetsToTransaction.error() instanceof UnsupportedForMessageFormatException);
        assertFatalError(UnsupportedForMessageFormatException.class);
    }

    @Test
    public void testFencedInstanceIdInTxnOffsetCommitByGroupMetadata() {
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        TransactionalRequestResult sendOffsetsToTransaction = this.transactionManager.sendOffsetsToTransaction(Collections.singletonMap(topicPartition, new OffsetAndMetadata(39L)), new ConsumerGroupMetadata("consumer", 5, "fenced_member", Optional.of("instance")));
        prepareAddOffsetsToTxnResponse(Errors.NONE, "consumer", 13131L, (short) 1);
        prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.GROUP, "consumer");
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.GROUP) != null);
        });
        this.client.prepareResponse(abstractRequest -> {
            TxnOffsetCommitRequest txnOffsetCommitRequest = (TxnOffsetCommitRequest) abstractRequest;
            Assert.assertEquals("consumer", txnOffsetCommitRequest.data.groupId());
            Assert.assertEquals(13131L, txnOffsetCommitRequest.data.producerId());
            Assert.assertEquals(1L, txnOffsetCommitRequest.data.producerEpoch());
            return txnOffsetCommitRequest.data.groupInstanceId().equals("instance") && !txnOffsetCommitRequest.data.memberId().equals("member");
        }, (AbstractResponse) new TxnOffsetCommitResponse(0, Collections.singletonMap(topicPartition, Errors.FENCED_INSTANCE_ID)));
        TransactionManager transactionManager = this.transactionManager;
        transactionManager.getClass();
        runUntil(transactionManager::hasError);
        Assert.assertTrue(this.transactionManager.lastError() instanceof FencedInstanceIdException);
        Assert.assertTrue(sendOffsetsToTransaction.isCompleted());
        Assert.assertFalse(sendOffsetsToTransaction.isSuccessful());
        Assert.assertTrue(sendOffsetsToTransaction.error() instanceof FencedInstanceIdException);
        assertAbortableError(FencedInstanceIdException.class);
    }

    @Test
    public void testUnknownMemberIdInTxnOffsetCommitByGroupMetadata() {
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        TransactionalRequestResult sendOffsetsToTransaction = this.transactionManager.sendOffsetsToTransaction(Collections.singletonMap(topicPartition, new OffsetAndMetadata(39L)), new ConsumerGroupMetadata("consumer", 5, "unknownMember", Optional.empty()));
        prepareAddOffsetsToTxnResponse(Errors.NONE, "consumer", 13131L, (short) 1);
        prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.GROUP, "consumer");
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.GROUP) != null);
        });
        this.client.prepareResponse(abstractRequest -> {
            TxnOffsetCommitRequest txnOffsetCommitRequest = (TxnOffsetCommitRequest) abstractRequest;
            Assert.assertEquals("consumer", txnOffsetCommitRequest.data.groupId());
            Assert.assertEquals(13131L, txnOffsetCommitRequest.data.producerId());
            Assert.assertEquals(1L, txnOffsetCommitRequest.data.producerEpoch());
            return !txnOffsetCommitRequest.data.memberId().equals("member");
        }, (AbstractResponse) new TxnOffsetCommitResponse(0, Collections.singletonMap(topicPartition, Errors.UNKNOWN_MEMBER_ID)));
        TransactionManager transactionManager = this.transactionManager;
        transactionManager.getClass();
        runUntil(transactionManager::hasError);
        Assert.assertTrue(this.transactionManager.lastError() instanceof CommitFailedException);
        Assert.assertTrue(sendOffsetsToTransaction.isCompleted());
        Assert.assertFalse(sendOffsetsToTransaction.isSuccessful());
        Assert.assertTrue(sendOffsetsToTransaction.error() instanceof CommitFailedException);
        assertAbortableError(CommitFailedException.class);
    }

    @Test
    public void testIllegalGenerationInTxnOffsetCommitByGroupMetadata() {
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        TransactionalRequestResult sendOffsetsToTransaction = this.transactionManager.sendOffsetsToTransaction(Collections.singletonMap(topicPartition, new OffsetAndMetadata(39L)), new ConsumerGroupMetadata("consumer", 1, "", Optional.empty()));
        prepareAddOffsetsToTxnResponse(Errors.NONE, "consumer", 13131L, (short) 1);
        prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.GROUP, "consumer");
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.GROUP) != null);
        });
        prepareTxnOffsetCommitResponse("consumer", 13131L, (short) 1, Collections.singletonMap(topicPartition, Errors.ILLEGAL_GENERATION));
        this.client.prepareResponse(abstractRequest -> {
            TxnOffsetCommitRequest txnOffsetCommitRequest = (TxnOffsetCommitRequest) abstractRequest;
            Assert.assertEquals("consumer", txnOffsetCommitRequest.data.groupId());
            Assert.assertEquals(13131L, txnOffsetCommitRequest.data.producerId());
            Assert.assertEquals(1L, txnOffsetCommitRequest.data.producerEpoch());
            return txnOffsetCommitRequest.data.generationId() != 5;
        }, (AbstractResponse) new TxnOffsetCommitResponse(0, Collections.singletonMap(topicPartition, Errors.ILLEGAL_GENERATION)));
        TransactionManager transactionManager = this.transactionManager;
        transactionManager.getClass();
        runUntil(transactionManager::hasError);
        Assert.assertTrue(this.transactionManager.lastError() instanceof CommitFailedException);
        Assert.assertTrue(sendOffsetsToTransaction.isCompleted());
        Assert.assertFalse(sendOffsetsToTransaction.isSuccessful());
        Assert.assertTrue(sendOffsetsToTransaction.error() instanceof CommitFailedException);
        assertAbortableError(CommitFailedException.class);
    }

    @Test
    public void testLookupCoordinatorOnDisconnectAfterSend() {
        TransactionalRequestResult initializeTransactions = this.transactionManager.initializeTransactions();
        prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, "foobar");
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION) != null);
        });
        Assert.assertEquals(this.brokerNode, this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
        prepareInitPidResponse(Errors.NONE, true, 13131L, (short) 1);
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION) == null);
        });
        Assert.assertNull(this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
        Assert.assertFalse(initializeTransactions.isCompleted());
        Assert.assertFalse(this.transactionManager.hasProducerId());
        prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, "foobar");
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION) != null);
        });
        Assert.assertEquals(this.brokerNode, this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
        Assert.assertFalse(initializeTransactions.isCompleted());
        prepareInitPidResponse(Errors.NONE, false, 13131L, (short) 1);
        initializeTransactions.getClass();
        runUntil(initializeTransactions::isCompleted);
        Assert.assertTrue(initializeTransactions.isCompleted());
        Assert.assertTrue(this.transactionManager.hasProducerId());
        Assert.assertEquals(13131L, this.transactionManager.producerIdAndEpoch().producerId);
        Assert.assertEquals(1L, this.transactionManager.producerIdAndEpoch().epoch);
    }

    @Test
    public void testLookupCoordinatorOnDisconnectBeforeSend() {
        TransactionalRequestResult initializeTransactions = this.transactionManager.initializeTransactions();
        prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, "foobar");
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION) != null);
        });
        Assert.assertEquals(this.brokerNode, this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
        this.client.disconnect(this.brokerNode.idString());
        this.client.blackout(this.brokerNode, 100L);
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION) == null);
        });
        this.time.sleep(110L);
        Assert.assertFalse(initializeTransactions.isCompleted());
        Assert.assertFalse(this.transactionManager.hasProducerId());
        prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, "foobar");
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION) != null);
        });
        Assert.assertEquals(this.brokerNode, this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
        Assert.assertFalse(initializeTransactions.isCompleted());
        prepareInitPidResponse(Errors.NONE, false, 13131L, (short) 1);
        initializeTransactions.getClass();
        runUntil(initializeTransactions::isCompleted);
        Assert.assertTrue(this.transactionManager.hasProducerId());
        Assert.assertEquals(13131L, this.transactionManager.producerIdAndEpoch().producerId);
        Assert.assertEquals(1L, this.transactionManager.producerIdAndEpoch().epoch);
    }

    @Test
    public void testLookupCoordinatorOnNotCoordinatorError() {
        TransactionalRequestResult initializeTransactions = this.transactionManager.initializeTransactions();
        prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, "foobar");
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION) != null);
        });
        Assert.assertEquals(this.brokerNode, this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
        prepareInitPidResponse(Errors.NOT_COORDINATOR, false, 13131L, (short) 1);
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION) == null);
        });
        Assert.assertFalse(initializeTransactions.isCompleted());
        Assert.assertFalse(this.transactionManager.hasProducerId());
        prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, "foobar");
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION) != null);
        });
        Assert.assertEquals(this.brokerNode, this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
        Assert.assertFalse(initializeTransactions.isCompleted());
        prepareInitPidResponse(Errors.NONE, false, 13131L, (short) 1);
        initializeTransactions.getClass();
        runUntil(initializeTransactions::isCompleted);
        Assert.assertTrue(this.transactionManager.hasProducerId());
        Assert.assertEquals(13131L, this.transactionManager.producerIdAndEpoch().producerId);
        Assert.assertEquals(1L, this.transactionManager.producerIdAndEpoch().epoch);
    }

    @Test
    public void testTransactionalIdAuthorizationFailureInFindCoordinator() {
        TransactionalRequestResult initializeTransactions = this.transactionManager.initializeTransactions();
        prepareFindCoordinatorResponse(Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, "foobar");
        TransactionManager transactionManager = this.transactionManager;
        transactionManager.getClass();
        runUntil(transactionManager::hasError);
        Assert.assertTrue(this.transactionManager.hasFatalError());
        Assert.assertTrue(this.transactionManager.lastError() instanceof TransactionalIdAuthorizationException);
        Assert.assertFalse(initializeTransactions.isSuccessful());
        Assert.assertTrue(initializeTransactions.error() instanceof TransactionalIdAuthorizationException);
        assertFatalError(TransactionalIdAuthorizationException.class);
    }

    @Test
    public void testTransactionalIdAuthorizationFailureInInitProducerId() {
        TransactionalRequestResult initializeTransactions = this.transactionManager.initializeTransactions();
        prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, "foobar");
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION) != null);
        });
        Assert.assertEquals(this.brokerNode, this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
        prepareInitPidResponse(Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED, false, 13131L, (short) -1);
        TransactionManager transactionManager = this.transactionManager;
        transactionManager.getClass();
        runUntil(transactionManager::hasError);
        Assert.assertTrue(initializeTransactions.isCompleted());
        Assert.assertFalse(initializeTransactions.isSuccessful());
        Assert.assertTrue(initializeTransactions.error() instanceof TransactionalIdAuthorizationException);
        assertFatalError(TransactionalIdAuthorizationException.class);
    }

    @Test
    public void testGroupAuthorizationFailureInFindCoordinator() {
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        TransactionalRequestResult sendOffsetsToTransaction = this.transactionManager.sendOffsetsToTransaction(Collections.singletonMap(new TopicPartition("foo", 0), new OffsetAndMetadata(39L)), new ConsumerGroupMetadata("consumer"));
        prepareAddOffsetsToTxnResponse(Errors.NONE, "consumer", 13131L, (short) 1);
        runUntil(() -> {
            return Boolean.valueOf(!this.transactionManager.hasPartitionsToAdd());
        });
        prepareFindCoordinatorResponse(Errors.GROUP_AUTHORIZATION_FAILED, false, FindCoordinatorRequest.CoordinatorType.GROUP, "consumer");
        TransactionManager transactionManager = this.transactionManager;
        transactionManager.getClass();
        runUntil(transactionManager::hasError);
        Assert.assertTrue(this.transactionManager.lastError() instanceof GroupAuthorizationException);
        sendOffsetsToTransaction.getClass();
        runUntil(sendOffsetsToTransaction::isCompleted);
        Assert.assertFalse(sendOffsetsToTransaction.isSuccessful());
        Assert.assertTrue(sendOffsetsToTransaction.error() instanceof GroupAuthorizationException);
        Assert.assertEquals("consumer", sendOffsetsToTransaction.error().groupId());
        assertAbortableError(GroupAuthorizationException.class);
    }

    @Test
    public void testGroupAuthorizationFailureInTxnOffsetCommit() {
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        TransactionalRequestResult sendOffsetsToTransaction = this.transactionManager.sendOffsetsToTransaction(Collections.singletonMap(topicPartition, new OffsetAndMetadata(39L)), new ConsumerGroupMetadata("consumer"));
        prepareAddOffsetsToTxnResponse(Errors.NONE, "consumer", 13131L, (short) 1);
        runUntil(() -> {
            return Boolean.valueOf(!this.transactionManager.hasPartitionsToAdd());
        });
        prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.GROUP, "consumer");
        prepareTxnOffsetCommitResponse("consumer", 13131L, (short) 1, Collections.singletonMap(topicPartition, Errors.GROUP_AUTHORIZATION_FAILED));
        TransactionManager transactionManager = this.transactionManager;
        transactionManager.getClass();
        runUntil(transactionManager::hasError);
        Assert.assertTrue(this.transactionManager.lastError() instanceof GroupAuthorizationException);
        Assert.assertTrue(sendOffsetsToTransaction.isCompleted());
        Assert.assertFalse(sendOffsetsToTransaction.isSuccessful());
        Assert.assertTrue(sendOffsetsToTransaction.error() instanceof GroupAuthorizationException);
        Assert.assertFalse(this.transactionManager.hasPendingOffsetCommits());
        Assert.assertEquals("consumer", sendOffsetsToTransaction.error().groupId());
        assertAbortableError(GroupAuthorizationException.class);
    }

    @Test
    public void testTransactionalIdAuthorizationFailureInAddOffsetsToTxn() {
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        TransactionalRequestResult sendOffsetsToTransaction = this.transactionManager.sendOffsetsToTransaction(Collections.singletonMap(topicPartition, new OffsetAndMetadata(39L)), new ConsumerGroupMetadata("consumer"));
        prepareAddOffsetsToTxnResponse(Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED, "consumer", 13131L, (short) 1);
        TransactionManager transactionManager = this.transactionManager;
        transactionManager.getClass();
        runUntil(transactionManager::hasError);
        Assert.assertTrue(this.transactionManager.lastError() instanceof TransactionalIdAuthorizationException);
        Assert.assertTrue(sendOffsetsToTransaction.isCompleted());
        Assert.assertFalse(sendOffsetsToTransaction.isSuccessful());
        Assert.assertTrue(sendOffsetsToTransaction.error() instanceof TransactionalIdAuthorizationException);
        assertFatalError(TransactionalIdAuthorizationException.class);
    }

    @Test
    public void testTransactionalIdAuthorizationFailureInTxnOffsetCommit() {
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        TransactionalRequestResult sendOffsetsToTransaction = this.transactionManager.sendOffsetsToTransaction(Collections.singletonMap(topicPartition, new OffsetAndMetadata(39L)), new ConsumerGroupMetadata("consumer"));
        prepareAddOffsetsToTxnResponse(Errors.NONE, "consumer", 13131L, (short) 1);
        runUntil(() -> {
            return Boolean.valueOf(!this.transactionManager.hasPartitionsToAdd());
        });
        prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.GROUP, "consumer");
        prepareTxnOffsetCommitResponse("consumer", 13131L, (short) 1, Collections.singletonMap(topicPartition, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED));
        TransactionManager transactionManager = this.transactionManager;
        transactionManager.getClass();
        runUntil(transactionManager::hasError);
        Assert.assertTrue(this.transactionManager.lastError() instanceof TransactionalIdAuthorizationException);
        Assert.assertTrue(sendOffsetsToTransaction.isCompleted());
        Assert.assertFalse(sendOffsetsToTransaction.isSuccessful());
        Assert.assertTrue(sendOffsetsToTransaction.error() instanceof TransactionalIdAuthorizationException);
        assertFatalError(TransactionalIdAuthorizationException.class);
    }

    @Test
    public void testTopicAuthorizationFailureInAddPartitions() throws InterruptedException {
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        TopicPartition topicPartition2 = new TopicPartition("bar", 0);
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        this.transactionManager.failIfNotReadyForSend();
        this.transactionManager.maybeAddPartitionToTransaction(topicPartition);
        this.transactionManager.failIfNotReadyForSend();
        this.transactionManager.maybeAddPartitionToTransaction(topicPartition2);
        FutureRecordMetadata appendToAccumulator = appendToAccumulator(topicPartition);
        FutureRecordMetadata appendToAccumulator2 = appendToAccumulator(topicPartition2);
        HashMap hashMap = new HashMap();
        hashMap.put(topicPartition, Errors.TOPIC_AUTHORIZATION_FAILED);
        hashMap.put(topicPartition2, Errors.OPERATION_NOT_ATTEMPTED);
        prepareAddPartitionsToTxn(hashMap);
        TransactionManager transactionManager = this.transactionManager;
        transactionManager.getClass();
        runUntil(transactionManager::hasError);
        Assert.assertTrue(this.transactionManager.lastError() instanceof TopicAuthorizationException);
        Assert.assertFalse(this.transactionManager.isPartitionPendingAdd(topicPartition));
        Assert.assertFalse(this.transactionManager.isPartitionPendingAdd(topicPartition2));
        Assert.assertFalse(this.transactionManager.isPartitionAdded(topicPartition));
        Assert.assertFalse(this.transactionManager.isPartitionAdded(topicPartition2));
        Assert.assertFalse(this.transactionManager.hasPartitionsToAdd());
        Assert.assertEquals(Collections.singleton(topicPartition.topic()), this.transactionManager.lastError().unauthorizedTopics());
        assertAbortableError(TopicAuthorizationException.class);
        this.sender.runOnce();
        TestUtils.assertFutureThrows(appendToAccumulator, KafkaException.class);
        TestUtils.assertFutureThrows(appendToAccumulator2, KafkaException.class);
    }

    @Test
    public void testCommitWithTopicAuthorizationFailureInAddPartitionsInFlight() throws InterruptedException {
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        TopicPartition topicPartition2 = new TopicPartition("bar", 0);
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(topicPartition);
        this.transactionManager.maybeAddPartitionToTransaction(topicPartition2);
        FutureRecordMetadata appendToAccumulator = appendToAccumulator(topicPartition);
        FutureRecordMetadata appendToAccumulator2 = appendToAccumulator(topicPartition2);
        TransactionalRequestResult beginCommit = this.transactionManager.beginCommit();
        this.sender.runOnce();
        Assert.assertFalse(this.transactionManager.hasError());
        Assert.assertFalse(beginCommit.isCompleted());
        Assert.assertFalse(appendToAccumulator.isDone());
        HashMap hashMap = new HashMap();
        hashMap.put(topicPartition, Errors.TOPIC_AUTHORIZATION_FAILED);
        hashMap.put(topicPartition2, Errors.OPERATION_NOT_ATTEMPTED);
        this.client.respond(abstractRequest -> {
            Assert.assertEquals(new HashSet(((AddPartitionsToTxnRequest) abstractRequest).partitions()), new HashSet(hashMap.keySet()));
            return true;
        }, (AbstractResponse) new AddPartitionsToTxnResponse(0, hashMap));
        this.sender.runOnce();
        Assert.assertTrue(this.transactionManager.hasError());
        Assert.assertFalse(beginCommit.isCompleted());
        Assert.assertFalse(appendToAccumulator.isDone());
        Assert.assertFalse(appendToAccumulator2.isDone());
        this.sender.runOnce();
        Assert.assertTrue(beginCommit.isCompleted());
        TestUtils.assertFutureThrows(appendToAccumulator, KafkaException.class);
        TestUtils.assertFutureThrows(appendToAccumulator2, KafkaException.class);
        Assert.assertTrue(beginCommit.error() instanceof TopicAuthorizationException);
    }

    @Test
    public void testRecoveryFromAbortableErrorTransactionNotStarted() throws Exception {
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        this.transactionManager.failIfNotReadyForSend();
        this.transactionManager.maybeAddPartitionToTransaction(topicPartition);
        FutureRecordMetadata appendToAccumulator = appendToAccumulator(topicPartition);
        prepareAddPartitionsToTxn(Collections.singletonMap(topicPartition, Errors.TOPIC_AUTHORIZATION_FAILED));
        runUntil(() -> {
            return Boolean.valueOf(!this.client.hasPendingResponses());
        });
        Assert.assertTrue(this.transactionManager.hasAbortableError());
        this.transactionManager.beginAbort();
        appendToAccumulator.getClass();
        runUntil(appendToAccumulator::isDone);
        assertFutureFailed(appendToAccumulator);
        TransactionManager transactionManager = this.transactionManager;
        transactionManager.getClass();
        runUntil(transactionManager::isReady);
        Assert.assertFalse(this.transactionManager.hasPartitionsToAdd());
        Assert.assertFalse(this.accumulator.hasIncomplete());
        this.transactionManager.beginTransaction();
        this.transactionManager.failIfNotReadyForSend();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        FutureRecordMetadata appendToAccumulator2 = appendToAccumulator(this.tp0);
        prepareAddPartitionsToTxn(Collections.singletonMap(this.tp0, Errors.NONE));
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.isPartitionAdded(this.tp0));
        });
        Assert.assertFalse(this.transactionManager.hasPartitionsToAdd());
        this.transactionManager.beginCommit();
        prepareProduceResponse(Errors.NONE, 13131L, (short) 1);
        appendToAccumulator2.getClass();
        runUntil(appendToAccumulator2::isDone);
        Assert.assertNotNull(appendToAccumulator2.get());
        prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, 13131L, (short) 1);
        TransactionManager transactionManager2 = this.transactionManager;
        transactionManager2.getClass();
        runUntil(transactionManager2::isReady);
    }

    @Test
    public void testRecoveryFromAbortableErrorTransactionStarted() throws Exception {
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        this.transactionManager.failIfNotReadyForSend();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        prepareAddPartitionsToTxn(this.tp0, Errors.NONE);
        FutureRecordMetadata appendToAccumulator = appendToAccumulator(topicPartition);
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.isPartitionAdded(this.tp0));
        });
        this.transactionManager.failIfNotReadyForSend();
        this.transactionManager.maybeAddPartitionToTransaction(topicPartition);
        FutureRecordMetadata appendToAccumulator2 = appendToAccumulator(topicPartition);
        prepareAddPartitionsToTxn(Collections.singletonMap(topicPartition, Errors.TOPIC_AUTHORIZATION_FAILED));
        TransactionManager transactionManager = this.transactionManager;
        transactionManager.getClass();
        runUntil(transactionManager::hasAbortableError);
        Assert.assertTrue(this.transactionManager.isPartitionAdded(this.tp0));
        Assert.assertFalse(this.transactionManager.isPartitionAdded(topicPartition));
        Assert.assertFalse(appendToAccumulator.isDone());
        Assert.assertFalse(appendToAccumulator2.isDone());
        prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, 13131L, (short) 1);
        this.transactionManager.beginAbort();
        TransactionManager transactionManager2 = this.transactionManager;
        transactionManager2.getClass();
        runUntil(transactionManager2::isReady);
        assertFutureFailed(appendToAccumulator);
        assertFutureFailed(appendToAccumulator2);
        Assert.assertFalse(this.transactionManager.hasPartitionsToAdd());
        Assert.assertFalse(this.accumulator.hasIncomplete());
        this.transactionManager.beginTransaction();
        this.transactionManager.failIfNotReadyForSend();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        FutureRecordMetadata appendToAccumulator3 = appendToAccumulator(this.tp0);
        prepareAddPartitionsToTxn(Collections.singletonMap(this.tp0, Errors.NONE));
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.isPartitionAdded(this.tp0));
        });
        Assert.assertFalse(this.transactionManager.hasPartitionsToAdd());
        this.transactionManager.beginCommit();
        prepareProduceResponse(Errors.NONE, 13131L, (short) 1);
        appendToAccumulator3.getClass();
        runUntil(appendToAccumulator3::isDone);
        Assert.assertNotNull(appendToAccumulator3.get());
        prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, 13131L, (short) 1);
        TransactionManager transactionManager3 = this.transactionManager;
        transactionManager3.getClass();
        runUntil(transactionManager3::isReady);
    }

    @Test
    public void testRecoveryFromAbortableErrorProduceRequestInRetry() throws Exception {
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        this.transactionManager.failIfNotReadyForSend();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        prepareAddPartitionsToTxn(this.tp0, Errors.NONE);
        FutureRecordMetadata appendToAccumulator = appendToAccumulator(this.tp0);
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.isPartitionAdded(this.tp0));
        });
        this.accumulator.beginFlush();
        prepareProduceResponse(Errors.REQUEST_TIMED_OUT, 13131L, (short) 1);
        runUntil(() -> {
            return Boolean.valueOf(!this.client.hasPendingResponses());
        });
        Assert.assertFalse(appendToAccumulator.isDone());
        Assert.assertTrue(this.accumulator.hasIncomplete());
        this.transactionManager.failIfNotReadyForSend();
        this.transactionManager.maybeAddPartitionToTransaction(topicPartition);
        FutureRecordMetadata appendToAccumulator2 = appendToAccumulator(topicPartition);
        prepareAddPartitionsToTxn(Collections.singletonMap(topicPartition, Errors.TOPIC_AUTHORIZATION_FAILED));
        TransactionManager transactionManager = this.transactionManager;
        transactionManager.getClass();
        runUntil(transactionManager::hasAbortableError);
        Assert.assertTrue(this.transactionManager.isPartitionAdded(this.tp0));
        Assert.assertFalse(this.transactionManager.isPartitionAdded(topicPartition));
        Assert.assertFalse(appendToAccumulator.isDone());
        prepareProduceResponse(Errors.NONE, 13131L, (short) 1);
        appendToAccumulator.getClass();
        runUntil(appendToAccumulator::isDone);
        assertFutureFailed(appendToAccumulator2);
        Assert.assertNotNull(appendToAccumulator.get());
        Assert.assertTrue(appendToAccumulator.isDone());
        prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, 13131L, (short) 1);
        this.transactionManager.beginAbort();
        TransactionManager transactionManager2 = this.transactionManager;
        transactionManager2.getClass();
        runUntil(transactionManager2::isReady);
        Assert.assertTrue(this.transactionManager.isReady());
        Assert.assertFalse(this.transactionManager.hasPartitionsToAdd());
        Assert.assertFalse(this.accumulator.hasIncomplete());
        this.transactionManager.beginTransaction();
        this.transactionManager.failIfNotReadyForSend();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        FutureRecordMetadata appendToAccumulator3 = appendToAccumulator(this.tp0);
        prepareAddPartitionsToTxn(Collections.singletonMap(this.tp0, Errors.NONE));
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.isPartitionAdded(this.tp0));
        });
        Assert.assertFalse(this.transactionManager.hasPartitionsToAdd());
        this.transactionManager.beginCommit();
        prepareProduceResponse(Errors.NONE, 13131L, (short) 1);
        appendToAccumulator3.getClass();
        runUntil(appendToAccumulator3::isDone);
        Assert.assertNotNull(appendToAccumulator3.get());
        prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, 13131L, (short) 1);
        TransactionManager transactionManager3 = this.transactionManager;
        transactionManager3.getClass();
        runUntil(transactionManager3::isReady);
    }

    @Test
    public void testTransactionalIdAuthorizationFailureInAddPartitions() {
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        this.transactionManager.failIfNotReadyForSend();
        this.transactionManager.maybeAddPartitionToTransaction(topicPartition);
        prepareAddPartitionsToTxn(topicPartition, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED);
        TransactionManager transactionManager = this.transactionManager;
        transactionManager.getClass();
        runUntil(transactionManager::hasError);
        Assert.assertTrue(this.transactionManager.lastError() instanceof TransactionalIdAuthorizationException);
        assertFatalError(TransactionalIdAuthorizationException.class);
    }

    @Test
    public void testFlushPendingPartitionsOnCommit() throws InterruptedException {
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        this.transactionManager.failIfNotReadyForSend();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        FutureRecordMetadata appendToAccumulator = appendToAccumulator(this.tp0);
        Assert.assertFalse(appendToAccumulator.isDone());
        TransactionalRequestResult beginCommit = this.transactionManager.beginCommit();
        Assert.assertFalse(this.transactionManager.transactionContainsPartition(this.tp0));
        prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short) 1, 13131L);
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.transactionContainsPartition(this.tp0));
        });
        Assert.assertFalse(appendToAccumulator.isDone());
        Assert.assertFalse(beginCommit.isCompleted());
        prepareProduceResponse(Errors.NONE, 13131L, (short) 1);
        appendToAccumulator.getClass();
        runUntil(appendToAccumulator::isDone);
        prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, 13131L, (short) 1);
        Assert.assertFalse(beginCommit.isCompleted());
        Assert.assertTrue(this.transactionManager.hasOngoingTransaction());
        Assert.assertTrue(this.transactionManager.isCompleting());
        beginCommit.getClass();
        runUntil(beginCommit::isCompleted);
        Assert.assertFalse(this.transactionManager.hasOngoingTransaction());
    }

    @Test
    public void testMultipleAddPartitionsPerForOneProduce() throws InterruptedException {
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        this.transactionManager.failIfNotReadyForSend();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        FutureRecordMetadata appendToAccumulator = appendToAccumulator(this.tp0);
        Assert.assertFalse(appendToAccumulator.isDone());
        prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short) 1, 13131L);
        Assert.assertFalse(this.transactionManager.transactionContainsPartition(this.tp0));
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.transactionContainsPartition(this.tp0));
        });
        this.transactionManager.failIfNotReadyForSend();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp1);
        FutureRecordMetadata appendToAccumulator2 = appendToAccumulator(this.tp0);
        prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp1, (short) 1, 13131L);
        prepareProduceResponse(Errors.NONE, 13131L, (short) 1);
        Assert.assertFalse(this.transactionManager.transactionContainsPartition(this.tp1));
        Assert.assertFalse(appendToAccumulator.isDone());
        Assert.assertFalse(appendToAccumulator2.isDone());
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.transactionContainsPartition(this.tp1));
        });
        Assert.assertFalse(appendToAccumulator.isDone());
        Assert.assertFalse(appendToAccumulator2.isDone());
        appendToAccumulator.getClass();
        runUntil(appendToAccumulator::isDone);
        Assert.assertTrue(appendToAccumulator2.isDone());
    }

    @Test
    public void testProducerFencedException() throws InterruptedException {
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        this.transactionManager.failIfNotReadyForSend();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        FutureRecordMetadata appendToAccumulator = appendToAccumulator(this.tp0);
        Assert.assertFalse(appendToAccumulator.isDone());
        prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short) 1, 13131L);
        prepareProduceResponse(Errors.INVALID_PRODUCER_EPOCH, 13131L, (short) 1);
        appendToAccumulator.getClass();
        runUntil(appendToAccumulator::isDone);
        Assert.assertTrue(this.transactionManager.hasError());
        try {
            appendToAccumulator.get();
            Assert.fail("Expected to get a ExecutionException from the response");
        } catch (ExecutionException e) {
            Assert.assertTrue(e.getCause() instanceof ProducerFencedException);
        }
        Assert.assertThrows(ProducerFencedException.class, () -> {
            this.transactionManager.beginTransaction();
        });
        Assert.assertThrows(ProducerFencedException.class, () -> {
            this.transactionManager.beginCommit();
        });
        Assert.assertThrows(ProducerFencedException.class, () -> {
            this.transactionManager.beginAbort();
        });
        Assert.assertThrows(ProducerFencedException.class, () -> {
            this.transactionManager.sendOffsetsToTransaction(Collections.emptyMap(), new ConsumerGroupMetadata("dummyId"));
        });
    }

    @Test
    public void testDisallowCommitOnProduceFailure() throws InterruptedException {
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        this.transactionManager.failIfNotReadyForSend();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        FutureRecordMetadata appendToAccumulator = appendToAccumulator(this.tp0);
        TransactionalRequestResult beginCommit = this.transactionManager.beginCommit();
        Assert.assertFalse(appendToAccumulator.isDone());
        prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short) 1, 13131L);
        prepareProduceResponse(Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, 13131L, (short) 1);
        beginCommit.getClass();
        runUntil(beginCommit::isCompleted);
        try {
            beginCommit.await();
            Assert.fail();
        } catch (KafkaException e) {
        }
        try {
            appendToAccumulator.get();
            Assert.fail("Expected produce future to raise an exception");
        } catch (ExecutionException e2) {
            Assert.assertTrue(e2.getCause() instanceof OutOfOrderSequenceException);
        }
        TransactionalRequestResult beginAbort = this.transactionManager.beginAbort();
        prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, 13131L, (short) 1);
        prepareInitPidResponse(Errors.NONE, false, 13131L, (short) 2);
        beginAbort.getClass();
        runUntil(beginAbort::isCompleted);
        Assert.assertTrue(beginAbort.isSuccessful());
        Assert.assertTrue(this.transactionManager.isReady());
    }

    @Test
    public void testAllowAbortOnProduceFailure() throws InterruptedException {
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        this.transactionManager.failIfNotReadyForSend();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        Assert.assertFalse(appendToAccumulator(this.tp0).isDone());
        prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short) 1, 13131L);
        prepareProduceResponse(Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, 13131L, (short) 1);
        TransactionManager transactionManager = this.transactionManager;
        transactionManager.getClass();
        runUntil(transactionManager::hasAbortableError);
        TransactionalRequestResult beginAbort = this.transactionManager.beginAbort();
        prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, 13131L, (short) 1);
        prepareInitPidResponse(Errors.NONE, false, 13131L, (short) 2);
        beginAbort.getClass();
        runUntil(beginAbort::isCompleted);
        Assert.assertTrue(beginAbort.isSuccessful());
        Assert.assertTrue(this.transactionManager.isReady());
    }

    @Test
    public void testAbortableErrorWhileAbortInProgress() throws InterruptedException {
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        this.transactionManager.failIfNotReadyForSend();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        FutureRecordMetadata appendToAccumulator = appendToAccumulator(this.tp0);
        Assert.assertFalse(appendToAccumulator.isDone());
        prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short) 1, 13131L);
        runUntil(() -> {
            return Boolean.valueOf(!this.accumulator.hasUndrained());
        });
        TransactionalRequestResult beginAbort = this.transactionManager.beginAbort();
        Assert.assertTrue(this.transactionManager.isAborting());
        Assert.assertFalse(this.transactionManager.hasError());
        sendProduceResponse(Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, 13131L, (short) 1);
        prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, 13131L, (short) 1);
        appendToAccumulator.getClass();
        runUntil(appendToAccumulator::isDone);
        Assert.assertTrue(this.transactionManager.isAborting());
        Assert.assertFalse(this.transactionManager.hasError());
        beginAbort.getClass();
        runUntil(beginAbort::isCompleted);
        Assert.assertTrue(beginAbort.isSuccessful());
        Assert.assertTrue(this.transactionManager.isReady());
    }

    @Test
    public void testCommitTransactionWithUnsentProduceRequest() throws Exception {
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        this.transactionManager.failIfNotReadyForSend();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        FutureRecordMetadata appendToAccumulator = appendToAccumulator(this.tp0);
        prepareAddPartitionsToTxn(this.tp0, Errors.NONE);
        runUntil(() -> {
            return Boolean.valueOf(!this.client.hasPendingResponses());
        });
        Assert.assertTrue(this.accumulator.hasUndrained());
        this.transactionManager.beginCommit();
        runUntil(() -> {
            return Boolean.valueOf(!this.accumulator.hasUndrained());
        });
        Assert.assertTrue(this.accumulator.hasIncomplete());
        Assert.assertFalse(this.transactionManager.hasInFlightRequest());
        Assert.assertFalse(appendToAccumulator.isDone());
        AtomicInteger atomicInteger = new AtomicInteger(0);
        runUntil(() -> {
            return Boolean.valueOf(atomicInteger.incrementAndGet() >= 4);
        });
        Assert.assertFalse(this.accumulator.hasUndrained());
        Assert.assertTrue(this.accumulator.hasIncomplete());
        Assert.assertFalse(this.transactionManager.hasInFlightRequest());
        Assert.assertFalse(appendToAccumulator.isDone());
        sendProduceResponse(Errors.NONE, 13131L, (short) 1);
        appendToAccumulator.getClass();
        runUntil(appendToAccumulator::isDone);
        Assert.assertFalse(this.accumulator.hasUndrained());
        Assert.assertFalse(this.accumulator.hasIncomplete());
        Assert.assertFalse(this.transactionManager.hasInFlightRequest());
        TransactionManager transactionManager = this.transactionManager;
        transactionManager.getClass();
        runUntil(transactionManager::hasInFlightRequest);
        sendEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, 13131L, (short) 1);
        TransactionManager transactionManager2 = this.transactionManager;
        transactionManager2.getClass();
        runUntil(transactionManager2::isReady);
        Assert.assertFalse(this.transactionManager.hasInFlightRequest());
    }

    @Test
    public void testCommitTransactionWithInFlightProduceRequest() throws Exception {
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        this.transactionManager.failIfNotReadyForSend();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        FutureRecordMetadata appendToAccumulator = appendToAccumulator(this.tp0);
        prepareAddPartitionsToTxn(this.tp0, Errors.NONE);
        runUntil(() -> {
            return Boolean.valueOf(!this.transactionManager.hasPartitionsToAdd());
        });
        Assert.assertTrue(this.accumulator.hasUndrained());
        this.accumulator.beginFlush();
        runUntil(() -> {
            return Boolean.valueOf(!this.accumulator.hasUndrained());
        });
        Assert.assertFalse(this.accumulator.hasUndrained());
        Assert.assertTrue(this.accumulator.hasIncomplete());
        Assert.assertFalse(this.transactionManager.hasInFlightRequest());
        this.transactionManager.beginCommit();
        AtomicInteger atomicInteger = new AtomicInteger(0);
        runUntil(() -> {
            return Boolean.valueOf(atomicInteger.incrementAndGet() >= 4);
        });
        Assert.assertFalse(this.accumulator.hasUndrained());
        Assert.assertTrue(this.accumulator.hasIncomplete());
        Assert.assertFalse(this.transactionManager.hasInFlightRequest());
        Assert.assertFalse(appendToAccumulator.isDone());
        sendProduceResponse(Errors.NONE, 13131L, (short) 1);
        appendToAccumulator.getClass();
        runUntil(appendToAccumulator::isDone);
        Assert.assertFalse(this.accumulator.hasUndrained());
        Assert.assertFalse(this.accumulator.hasIncomplete());
        Assert.assertFalse(this.transactionManager.hasInFlightRequest());
        TransactionManager transactionManager = this.transactionManager;
        transactionManager.getClass();
        runUntil(transactionManager::hasInFlightRequest);
        sendEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, 13131L, (short) 1);
        TransactionManager transactionManager2 = this.transactionManager;
        transactionManager2.getClass();
        runUntil(transactionManager2::isReady);
        Assert.assertFalse(this.transactionManager.hasInFlightRequest());
    }

    @Test
    public void testFindCoordinatorAllowedInAbortableErrorState() throws InterruptedException {
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        this.transactionManager.failIfNotReadyForSend();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        Assert.assertFalse(appendToAccumulator(this.tp0).isDone());
        TransactionManager transactionManager = this.transactionManager;
        transactionManager.getClass();
        runUntil(transactionManager::hasInFlightRequest);
        this.transactionManager.transitionToAbortableError(new KafkaException());
        sendAddPartitionsToTxnResponse(Errors.NOT_COORDINATOR, this.tp0, (short) 1, 13131L);
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION) == null);
        });
        prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, "foobar");
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION) != null);
        });
        Assert.assertEquals(this.brokerNode, this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
        Assert.assertTrue(this.transactionManager.hasAbortableError());
    }

    @Test
    public void testCancelUnsentAddPartitionsAndProduceOnAbort() throws InterruptedException {
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        this.transactionManager.failIfNotReadyForSend();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        FutureRecordMetadata appendToAccumulator = appendToAccumulator(this.tp0);
        Assert.assertFalse(appendToAccumulator.isDone());
        TransactionalRequestResult beginAbort = this.transactionManager.beginAbort();
        beginAbort.getClass();
        runUntil(beginAbort::isCompleted);
        Assert.assertTrue(beginAbort.isSuccessful());
        Assert.assertTrue(this.transactionManager.isReady());
        try {
            appendToAccumulator.get();
            Assert.fail("Expected produce future to raise an exception");
        } catch (ExecutionException e) {
            Assert.assertTrue(e.getCause() instanceof KafkaException);
        }
    }

    @Test
    public void testAbortResendsAddPartitionErrorIfRetried() throws InterruptedException {
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        this.transactionManager.failIfNotReadyForSend();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        prepareAddPartitionsToTxnResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION, this.tp0, (short) 1, 13131L);
        FutureRecordMetadata appendToAccumulator = appendToAccumulator(this.tp0);
        runUntil(() -> {
            return Boolean.valueOf(!this.client.hasPendingResponses());
        });
        Assert.assertFalse(appendToAccumulator.isDone());
        TransactionalRequestResult beginAbort = this.transactionManager.beginAbort();
        prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short) 1, 13131L);
        prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, 13131L, (short) 1);
        beginAbort.getClass();
        runUntil(beginAbort::isCompleted);
        Assert.assertTrue(beginAbort.isSuccessful());
        Assert.assertTrue(this.transactionManager.isReady());
        try {
            appendToAccumulator.get();
            Assert.fail("Expected produce future to raise an exception");
        } catch (ExecutionException e) {
            Assert.assertTrue(e.getCause() instanceof KafkaException);
        }
    }

    @Test
    public void testAbortResendsProduceRequestIfRetried() throws Exception {
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        this.transactionManager.failIfNotReadyForSend();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short) 1, 13131L);
        prepareProduceResponse(Errors.REQUEST_TIMED_OUT, 13131L, (short) 1);
        FutureRecordMetadata appendToAccumulator = appendToAccumulator(this.tp0);
        runUntil(() -> {
            return Boolean.valueOf(!this.client.hasPendingResponses());
        });
        Assert.assertFalse(appendToAccumulator.isDone());
        TransactionalRequestResult beginAbort = this.transactionManager.beginAbort();
        prepareProduceResponse(Errors.NONE, 13131L, (short) 1);
        prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, 13131L, (short) 1);
        beginAbort.getClass();
        runUntil(beginAbort::isCompleted);
        Assert.assertTrue(beginAbort.isSuccessful());
        Assert.assertTrue(this.transactionManager.isReady());
        Assert.assertEquals(this.tp0.topic(), ((RecordMetadata) appendToAccumulator.get()).topic());
    }

    @Test
    public void testHandlingOfUnknownTopicPartitionErrorOnAddPartitions() throws InterruptedException {
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        this.transactionManager.failIfNotReadyForSend();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        FutureRecordMetadata appendToAccumulator = appendToAccumulator(this.tp0);
        Assert.assertFalse(appendToAccumulator.isDone());
        prepareAddPartitionsToTxnResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION, this.tp0, (short) 1, 13131L);
        runUntil(() -> {
            return Boolean.valueOf(!this.client.hasPendingResponses());
        });
        Assert.assertFalse(this.transactionManager.transactionContainsPartition(this.tp0));
        prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short) 1, 13131L);
        prepareProduceResponse(Errors.NONE, 13131L, (short) 1);
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.transactionContainsPartition(this.tp0));
        });
        appendToAccumulator.getClass();
        runUntil(appendToAccumulator::isDone);
    }

    @Test
    public void testHandlingOfUnknownTopicPartitionErrorOnTxnOffsetCommit() {
        testRetriableErrorInTxnOffsetCommit(Errors.UNKNOWN_TOPIC_OR_PARTITION);
    }

    @Test
    public void testHandlingOfCoordinatorLoadingErrorOnTxnOffsetCommit() {
        testRetriableErrorInTxnOffsetCommit(Errors.COORDINATOR_LOAD_IN_PROGRESS);
    }

    private void testRetriableErrorInTxnOffsetCommit(Errors errors) {
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        HashMap hashMap = new HashMap();
        hashMap.put(this.tp0, new OffsetAndMetadata(1L));
        hashMap.put(this.tp1, new OffsetAndMetadata(1L));
        TransactionalRequestResult sendOffsetsToTransaction = this.transactionManager.sendOffsetsToTransaction(hashMap, new ConsumerGroupMetadata("myconsumergroup"));
        prepareAddOffsetsToTxnResponse(Errors.NONE, "myconsumergroup", 13131L, (short) 1);
        runUntil(() -> {
            return Boolean.valueOf(!this.client.hasPendingResponses());
        });
        Assert.assertFalse(sendOffsetsToTransaction.isCompleted());
        HashMap hashMap2 = new HashMap();
        hashMap2.put(this.tp0, Errors.NONE);
        hashMap2.put(this.tp1, errors);
        prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.GROUP, "myconsumergroup");
        prepareTxnOffsetCommitResponse("myconsumergroup", 13131L, (short) 1, hashMap2);
        Assert.assertNull(this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.GROUP));
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.GROUP) != null);
        });
        Assert.assertTrue(this.transactionManager.hasPendingOffsetCommits());
        TransactionManager transactionManager = this.transactionManager;
        transactionManager.getClass();
        runUntil(transactionManager::hasPendingOffsetCommits);
        Assert.assertFalse(sendOffsetsToTransaction.isCompleted());
        hashMap2.put(this.tp1, Errors.NONE);
        prepareTxnOffsetCommitResponse("myconsumergroup", 13131L, (short) 1, hashMap2);
        sendOffsetsToTransaction.getClass();
        runUntil(sendOffsetsToTransaction::isCompleted);
        Assert.assertTrue(sendOffsetsToTransaction.isSuccessful());
    }

    @Test
    public void shouldNotAddPartitionsToTransactionWhenTopicAuthorizationFailed() throws Exception {
        verifyAddPartitionsFailsWithPartitionLevelError(Errors.TOPIC_AUTHORIZATION_FAILED);
    }

    @Test
    public void shouldNotSendAbortTxnRequestWhenOnlyAddPartitionsRequestFailed() {
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        this.transactionManager.failIfNotReadyForSend();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        prepareAddPartitionsToTxnResponse(Errors.TOPIC_AUTHORIZATION_FAILED, this.tp0, (short) 1, 13131L);
        runUntil(() -> {
            return Boolean.valueOf(!this.client.hasPendingResponses());
        });
        TransactionalRequestResult beginAbort = this.transactionManager.beginAbort();
        Assert.assertFalse(beginAbort.isCompleted());
        beginAbort.getClass();
        runUntil(beginAbort::isCompleted);
        Assert.assertTrue(beginAbort.isSuccessful());
    }

    @Test
    public void shouldNotSendAbortTxnRequestWhenOnlyAddOffsetsRequestFailed() {
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        HashMap hashMap = new HashMap();
        hashMap.put(this.tp1, new OffsetAndMetadata(1L));
        this.transactionManager.sendOffsetsToTransaction(hashMap, new ConsumerGroupMetadata("myconsumergroup"));
        TransactionalRequestResult beginAbort = this.transactionManager.beginAbort();
        prepareAddOffsetsToTxnResponse(Errors.GROUP_AUTHORIZATION_FAILED, "myconsumergroup", 13131L, (short) 1);
        beginAbort.getClass();
        runUntil(beginAbort::isCompleted);
        Assert.assertTrue(this.transactionManager.isReady());
        Assert.assertTrue(beginAbort.isCompleted());
        Assert.assertTrue(beginAbort.isSuccessful());
    }

    @Test
    public void shouldFailAbortIfAddOffsetsFailsWithFatalError() {
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        HashMap hashMap = new HashMap();
        hashMap.put(this.tp1, new OffsetAndMetadata(1L));
        this.transactionManager.sendOffsetsToTransaction(hashMap, new ConsumerGroupMetadata("myconsumergroup"));
        TransactionalRequestResult beginAbort = this.transactionManager.beginAbort();
        prepareAddOffsetsToTxnResponse(Errors.UNKNOWN_SERVER_ERROR, "myconsumergroup", 13131L, (short) 1);
        beginAbort.getClass();
        runUntil(beginAbort::isCompleted);
        Assert.assertFalse(beginAbort.isSuccessful());
        Assert.assertTrue(this.transactionManager.hasFatalError());
    }

    @Test
    public void testSendOffsetsWithGroupMetadata() {
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        HashMap hashMap = new HashMap();
        hashMap.put(this.tp0, new OffsetAndMetadata(1L));
        hashMap.put(this.tp1, new OffsetAndMetadata(1L));
        TransactionalRequestResult sendOffsetsToTransaction = this.transactionManager.sendOffsetsToTransaction(hashMap, new ConsumerGroupMetadata("myconsumergroup", 5, "member", Optional.of("instance")));
        prepareAddOffsetsToTxnResponse(Errors.NONE, "myconsumergroup", 13131L, (short) 1);
        this.sender.runOnce();
        Assert.assertFalse(sendOffsetsToTransaction.isCompleted());
        HashMap hashMap2 = new HashMap();
        hashMap2.put(this.tp0, Errors.NONE);
        hashMap2.put(this.tp1, Errors.COORDINATOR_LOAD_IN_PROGRESS);
        prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.GROUP, "myconsumergroup");
        prepareTxnOffsetCommitResponse("myconsumergroup", 13131L, (short) 1, "instance", "member", 5, hashMap2);
        Assert.assertNull(this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.GROUP));
        this.sender.runOnce();
        this.sender.runOnce();
        Assert.assertNotNull(this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.GROUP));
        Assert.assertTrue(this.transactionManager.hasPendingOffsetCommits());
        this.sender.runOnce();
        Assert.assertTrue(this.transactionManager.hasPendingOffsetCommits());
        Assert.assertFalse(sendOffsetsToTransaction.isCompleted());
        hashMap2.put(this.tp1, Errors.NONE);
        prepareTxnOffsetCommitResponse("myconsumergroup", 13131L, (short) 1, "instance", "member", 5, hashMap2);
        this.sender.runOnce();
        Assert.assertTrue(sendOffsetsToTransaction.isCompleted());
        Assert.assertTrue(sendOffsetsToTransaction.isSuccessful());
    }

    @Test
    public void testNoDrainWhenPartitionsPending() throws InterruptedException {
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        this.transactionManager.failIfNotReadyForSend();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        appendToAccumulator(this.tp0);
        this.transactionManager.failIfNotReadyForSend();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp1);
        appendToAccumulator(this.tp1);
        Assert.assertFalse(this.transactionManager.isSendToPartitionAllowed(this.tp0));
        Assert.assertFalse(this.transactionManager.isSendToPartitionAllowed(this.tp1));
        Node node = new Node(0, "localhost", 1111);
        Node node2 = new Node(1, "localhost", 1112);
        Cluster cluster = new Cluster((String) null, Arrays.asList(node, node2), Arrays.asList(new PartitionInfo("test", 0, node, (Node[]) null, (Node[]) null), new PartitionInfo("test", 1, node2, (Node[]) null, (Node[]) null)), Collections.emptySet(), Collections.emptySet());
        HashSet hashSet = new HashSet();
        hashSet.add(node);
        hashSet.add(node2);
        Map drain = this.accumulator.drain(cluster, hashSet, MAX_RETRIES, this.time.milliseconds());
        Assert.assertTrue(drain.containsKey(Integer.valueOf(node.id())));
        Assert.assertTrue(((List) drain.get(Integer.valueOf(node.id()))).isEmpty());
        Assert.assertTrue(drain.containsKey(Integer.valueOf(node2.id())));
        Assert.assertTrue(((List) drain.get(Integer.valueOf(node2.id()))).isEmpty());
        Assert.assertFalse(this.transactionManager.hasError());
    }

    @Test
    public void testAllowDrainInAbortableErrorState() throws InterruptedException {
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        this.transactionManager.failIfNotReadyForSend();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp1);
        prepareAddPartitionsToTxn(this.tp1, Errors.NONE);
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.transactionContainsPartition(this.tp1));
        });
        this.transactionManager.failIfNotReadyForSend();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        prepareAddPartitionsToTxn(this.tp0, Errors.TOPIC_AUTHORIZATION_FAILED);
        TransactionManager transactionManager = this.transactionManager;
        transactionManager.getClass();
        runUntil(transactionManager::hasAbortableError);
        Assert.assertTrue(this.transactionManager.isSendToPartitionAllowed(this.tp1));
        Node node = new Node(1, "localhost", 1112);
        Cluster cluster = new Cluster((String) null, Collections.singletonList(node), Collections.singletonList(new PartitionInfo("test", 1, node, (Node[]) null, (Node[]) null)), Collections.emptySet(), Collections.emptySet());
        appendToAccumulator(this.tp1);
        Assert.assertTrue(this.accumulator.drain(cluster, Collections.singleton(node), MAX_RETRIES, this.time.milliseconds()).containsKey(Integer.valueOf(node.id())));
        Assert.assertEquals(1L, ((List) r0.get(Integer.valueOf(node.id()))).size());
        Assert.assertTrue(this.transactionManager.hasAbortableError());
    }

    @Test
    public void testRaiseErrorWhenNoPartitionsPendingOnDrain() throws InterruptedException {
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        appendToAccumulator(this.tp0);
        Node node = new Node(0, "localhost", 1111);
        Cluster cluster = new Cluster((String) null, Collections.singletonList(node), Collections.singletonList(new PartitionInfo("test", 0, node, (Node[]) null, (Node[]) null)), Collections.emptySet(), Collections.emptySet());
        HashSet hashSet = new HashSet();
        hashSet.add(node);
        Map drain = this.accumulator.drain(cluster, hashSet, MAX_RETRIES, this.time.milliseconds());
        Assert.assertTrue(drain.containsKey(Integer.valueOf(node.id())));
        Assert.assertTrue(((List) drain.get(Integer.valueOf(node.id()))).isEmpty());
    }

    @Test
    public void resendFailedProduceRequestAfterAbortableError() throws Exception {
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        this.transactionManager.failIfNotReadyForSend();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        FutureRecordMetadata appendToAccumulator = appendToAccumulator(this.tp0);
        prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short) 1, 13131L);
        prepareProduceResponse(Errors.NOT_LEADER_FOR_PARTITION, 13131L, (short) 1);
        runUntil(() -> {
            return Boolean.valueOf(!this.client.hasPendingResponses());
        });
        Assert.assertFalse(appendToAccumulator.isDone());
        this.transactionManager.transitionToAbortableError(new KafkaException());
        prepareProduceResponse(Errors.NONE, 13131L, (short) 1);
        appendToAccumulator.getClass();
        runUntil(appendToAccumulator::isDone);
        Assert.assertNotNull(appendToAccumulator.get());
    }

    @Test
    public void testTransitionToAbortableErrorOnBatchExpiry() throws InterruptedException {
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        this.transactionManager.failIfNotReadyForSend();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        FutureRecordMetadata appendToAccumulator = appendToAccumulator(this.tp0);
        Assert.assertFalse(appendToAccumulator.isDone());
        prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short) 1, 13131L);
        Assert.assertFalse(this.transactionManager.transactionContainsPartition(this.tp0));
        Assert.assertFalse(this.transactionManager.isSendToPartitionAllowed(this.tp0));
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.transactionContainsPartition(this.tp0));
        });
        Assert.assertTrue(this.transactionManager.isSendToPartitionAllowed(this.tp0));
        Assert.assertFalse(appendToAccumulator.isDone());
        this.time.sleep(10000L);
        Node node = (Node) this.metadata.fetch().nodes().get(0);
        this.client.disconnect(node.idString());
        this.client.blackout(node, 100L);
        appendToAccumulator.getClass();
        runUntil(appendToAccumulator::isDone);
        try {
            appendToAccumulator.get();
            Assert.fail("Expected to get a TimeoutException since the queued ProducerBatch should have been expired");
        } catch (ExecutionException e) {
            Assert.assertTrue(e.getCause() instanceof TimeoutException);
        }
        Assert.assertTrue(this.transactionManager.hasAbortableError());
    }

    @Test
    public void testTransitionToAbortableErrorOnMultipleBatchExpiry() throws InterruptedException {
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        this.transactionManager.failIfNotReadyForSend();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        this.transactionManager.failIfNotReadyForSend();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp1);
        FutureRecordMetadata appendToAccumulator = appendToAccumulator(this.tp0);
        FutureRecordMetadata appendToAccumulator2 = appendToAccumulator(this.tp1);
        Assert.assertFalse(appendToAccumulator.isDone());
        Assert.assertFalse(appendToAccumulator2.isDone());
        HashMap hashMap = new HashMap();
        hashMap.put(this.tp0, Errors.NONE);
        hashMap.put(this.tp1, Errors.NONE);
        prepareAddPartitionsToTxn(hashMap);
        Assert.assertFalse(this.transactionManager.transactionContainsPartition(this.tp0));
        Assert.assertFalse(this.transactionManager.isSendToPartitionAllowed(this.tp0));
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.transactionContainsPartition(this.tp0));
        });
        Assert.assertTrue(this.transactionManager.transactionContainsPartition(this.tp1));
        Assert.assertTrue(this.transactionManager.isSendToPartitionAllowed(this.tp1));
        Assert.assertTrue(this.transactionManager.isSendToPartitionAllowed(this.tp1));
        Assert.assertFalse(appendToAccumulator.isDone());
        Assert.assertFalse(appendToAccumulator2.isDone());
        this.time.sleep(10000L);
        Node node = (Node) this.metadata.fetch().nodes().get(0);
        this.client.disconnect(node.idString());
        this.client.blackout(node, 100L);
        appendToAccumulator.getClass();
        runUntil(appendToAccumulator::isDone);
        appendToAccumulator2.getClass();
        runUntil(appendToAccumulator2::isDone);
        try {
            appendToAccumulator.get();
            Assert.fail("Expected to get a TimeoutException since the queued ProducerBatch should have been expired");
        } catch (ExecutionException e) {
            Assert.assertTrue(e.getCause() instanceof TimeoutException);
        }
        try {
            appendToAccumulator2.get();
            Assert.fail("Expected to get a TimeoutException since the queued ProducerBatch should have been expired");
        } catch (ExecutionException e2) {
            Assert.assertTrue(e2.getCause() instanceof TimeoutException);
        }
        Assert.assertTrue(this.transactionManager.hasAbortableError());
    }

    @Test
    public void testDropCommitOnBatchExpiry() throws InterruptedException {
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        this.transactionManager.failIfNotReadyForSend();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        FutureRecordMetadata appendToAccumulator = appendToAccumulator(this.tp0);
        Assert.assertFalse(appendToAccumulator.isDone());
        prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short) 1, 13131L);
        Assert.assertFalse(this.transactionManager.transactionContainsPartition(this.tp0));
        Assert.assertFalse(this.transactionManager.isSendToPartitionAllowed(this.tp0));
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.transactionContainsPartition(this.tp0));
        });
        Assert.assertTrue(this.transactionManager.isSendToPartitionAllowed(this.tp0));
        Assert.assertFalse(appendToAccumulator.isDone());
        TransactionalRequestResult beginCommit = this.transactionManager.beginCommit();
        this.time.sleep(10000L);
        this.client.disconnect(((Node) this.metadata.fetch().nodes().get(0)).idString());
        appendToAccumulator.getClass();
        runUntil(appendToAccumulator::isDone);
        try {
            appendToAccumulator.get();
            Assert.fail("Expected to get a TimeoutException since the queued ProducerBatch should have been expired");
        } catch (ExecutionException e) {
            Assert.assertTrue(e.getCause() instanceof TimeoutException);
        }
        beginCommit.getClass();
        runUntil(beginCommit::isCompleted);
        Assert.assertFalse(beginCommit.isSuccessful());
        Assert.assertTrue(this.transactionManager.hasAbortableError());
        Assert.assertTrue(this.transactionManager.hasOngoingTransaction());
        Assert.assertFalse(this.transactionManager.isCompleting());
        Assert.assertTrue(this.transactionManager.transactionContainsPartition(this.tp0));
        TransactionalRequestResult beginAbort = this.transactionManager.beginAbort();
        prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, 13131L, (short) 1);
        prepareInitPidResponse(Errors.NONE, false, 13131L, (short) 2);
        beginAbort.getClass();
        runUntil(beginAbort::isCompleted);
        Assert.assertTrue(beginAbort.isSuccessful());
        Assert.assertFalse(this.transactionManager.hasOngoingTransaction());
        Assert.assertFalse(this.transactionManager.transactionContainsPartition(this.tp0));
    }

    @Test
    public void testTransitionToFatalErrorWhenRetriedBatchIsExpired() throws InterruptedException {
        this.apiVersions.update("0", new NodeApiVersions(Arrays.asList(new ApiVersion(ApiKeys.INIT_PRODUCER_ID.id, (short) 0, (short) 1), new ApiVersion(ApiKeys.PRODUCE.id, (short) 0, (short) 7))));
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        this.transactionManager.failIfNotReadyForSend();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        FutureRecordMetadata appendToAccumulator = appendToAccumulator(this.tp0);
        Assert.assertFalse(appendToAccumulator.isDone());
        prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short) 1, 13131L);
        Assert.assertFalse(this.transactionManager.transactionContainsPartition(this.tp0));
        Assert.assertFalse(this.transactionManager.isSendToPartitionAllowed(this.tp0));
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.transactionContainsPartition(this.tp0));
        });
        Assert.assertTrue(this.transactionManager.isSendToPartitionAllowed(this.tp0));
        prepareProduceResponse(Errors.NOT_LEADER_FOR_PARTITION, 13131L, (short) 1);
        runUntil(() -> {
            return Boolean.valueOf(!this.client.hasPendingResponses());
        });
        Assert.assertFalse(appendToAccumulator.isDone());
        TransactionalRequestResult beginCommit = this.transactionManager.beginCommit();
        this.time.sleep(10000L);
        Node node = (Node) this.metadata.fetch().nodes().get(0);
        this.client.disconnect(node.idString());
        this.client.blackout(node, 100L);
        appendToAccumulator.getClass();
        runUntil(appendToAccumulator::isDone);
        try {
            appendToAccumulator.get();
            Assert.fail("Expected to get a TimeoutException since the queued ProducerBatch should have been expired");
        } catch (ExecutionException e) {
            Assert.assertTrue(e.getCause() instanceof TimeoutException);
        }
        beginCommit.getClass();
        runUntil(beginCommit::isCompleted);
        Assert.assertFalse(beginCommit.isSuccessful());
        Assert.assertTrue(this.transactionManager.hasFatalError());
        Assert.assertFalse(this.transactionManager.hasOngoingTransaction());
    }

    @Test
    public void testBumpEpochAfterTimeoutWithoutPendingInflightRequests() {
        initializeTransactionManager(Optional.empty());
        ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(15L, (short) 5);
        initializeIdempotentProducerId(15L, (short) 5);
        this.transactionManager.bumpIdempotentEpochAndResetIdIfNeeded();
        Assert.assertEquals(producerIdAndEpoch, this.transactionManager.producerIdAndEpoch());
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        Assert.assertEquals(0, this.transactionManager.sequenceNumber(topicPartition));
        ProducerBatch writeIdempotentBatchWithValue = writeIdempotentBatchWithValue(this.transactionManager, topicPartition, "1");
        Assert.assertEquals(1, this.transactionManager.sequenceNumber(topicPartition));
        this.transactionManager.handleCompletedBatch(writeIdempotentBatchWithValue, new ProduceResponse.PartitionResponse(Errors.NONE, 500L, this.time.milliseconds(), 0L));
        Assert.assertEquals(OptionalInt.of(0), this.transactionManager.lastAckedSequence(topicPartition));
        this.transactionManager.markSequenceUnresolved(writeIdempotentBatchWithValue);
        this.transactionManager.maybeResolveSequences();
        Assert.assertEquals(producerIdAndEpoch, this.transactionManager.producerIdAndEpoch());
        Assert.assertFalse(this.transactionManager.hasUnresolvedSequences());
        ProducerBatch writeIdempotentBatchWithValue2 = writeIdempotentBatchWithValue(this.transactionManager, topicPartition, "2");
        Assert.assertEquals(2, this.transactionManager.sequenceNumber(topicPartition));
        this.transactionManager.markSequenceUnresolved(writeIdempotentBatchWithValue2);
        this.transactionManager.handleFailedBatch(writeIdempotentBatchWithValue2, new TimeoutException(), false);
        Assert.assertTrue(this.transactionManager.hasUnresolvedSequences());
        this.transactionManager.maybeResolveSequences();
        Assert.assertFalse(this.transactionManager.hasUnresolvedSequences());
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.producerIdAndEpoch().epoch == 6);
        });
    }

    @Test
    public void testNoProducerIdResetAfterLastInFlightBatchSucceeds() {
        initializeTransactionManager(Optional.empty());
        ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(15L, (short) 5);
        initializeIdempotentProducerId(15L, (short) 5);
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        ProducerBatch writeIdempotentBatchWithValue = writeIdempotentBatchWithValue(this.transactionManager, topicPartition, "1");
        ProducerBatch writeIdempotentBatchWithValue2 = writeIdempotentBatchWithValue(this.transactionManager, topicPartition, "2");
        ProducerBatch writeIdempotentBatchWithValue3 = writeIdempotentBatchWithValue(this.transactionManager, topicPartition, "3");
        Assert.assertEquals(3L, this.transactionManager.sequenceNumber(topicPartition).intValue());
        this.transactionManager.markSequenceUnresolved(writeIdempotentBatchWithValue);
        this.transactionManager.handleFailedBatch(writeIdempotentBatchWithValue, new TimeoutException(), false);
        Assert.assertTrue(this.transactionManager.hasUnresolvedSequences());
        this.transactionManager.bumpIdempotentEpochAndResetIdIfNeeded();
        Assert.assertEquals(producerIdAndEpoch, this.transactionManager.producerIdAndEpoch());
        Assert.assertTrue(this.transactionManager.hasUnresolvedSequences());
        this.transactionManager.handleFailedBatch(writeIdempotentBatchWithValue2, new TimeoutException(), false);
        this.transactionManager.bumpIdempotentEpochAndResetIdIfNeeded();
        Assert.assertEquals(producerIdAndEpoch, this.transactionManager.producerIdAndEpoch());
        Assert.assertTrue(this.transactionManager.hasUnresolvedSequences());
        this.transactionManager.handleCompletedBatch(writeIdempotentBatchWithValue3, new ProduceResponse.PartitionResponse(Errors.NONE, 500L, this.time.milliseconds(), 0L));
        this.transactionManager.maybeResolveSequences();
        Assert.assertEquals(producerIdAndEpoch, this.transactionManager.producerIdAndEpoch());
        Assert.assertFalse(this.transactionManager.hasUnresolvedSequences());
        Assert.assertEquals(3L, this.transactionManager.sequenceNumber(topicPartition).intValue());
    }

    @Test
    public void testEpochBumpAfterLastInflightBatchFails() {
        initializeTransactionManager(Optional.empty());
        ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(15L, (short) 5);
        initializeIdempotentProducerId(15L, (short) 5);
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        ProducerBatch writeIdempotentBatchWithValue = writeIdempotentBatchWithValue(this.transactionManager, topicPartition, "1");
        ProducerBatch writeIdempotentBatchWithValue2 = writeIdempotentBatchWithValue(this.transactionManager, topicPartition, "2");
        ProducerBatch writeIdempotentBatchWithValue3 = writeIdempotentBatchWithValue(this.transactionManager, topicPartition, "3");
        Assert.assertEquals(3, this.transactionManager.sequenceNumber(topicPartition));
        this.transactionManager.markSequenceUnresolved(writeIdempotentBatchWithValue);
        this.transactionManager.handleFailedBatch(writeIdempotentBatchWithValue, new TimeoutException(), false);
        Assert.assertTrue(this.transactionManager.hasUnresolvedSequences());
        this.transactionManager.handleCompletedBatch(writeIdempotentBatchWithValue2, new ProduceResponse.PartitionResponse(Errors.NONE, 500L, this.time.milliseconds(), 0L));
        this.transactionManager.bumpIdempotentEpochAndResetIdIfNeeded();
        Assert.assertEquals(producerIdAndEpoch, this.transactionManager.producerIdAndEpoch());
        Assert.assertTrue(this.transactionManager.hasUnresolvedSequences());
        this.transactionManager.handleFailedBatch(writeIdempotentBatchWithValue3, new TimeoutException(), false);
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.producerIdAndEpoch().epoch == 6);
        });
        Assert.assertFalse(this.transactionManager.hasUnresolvedSequences());
        Assert.assertEquals(0L, this.transactionManager.sequenceNumber(topicPartition).intValue());
    }

    @Test
    public void testAbortTransactionAndReuseSequenceNumberOnError() throws InterruptedException {
        this.apiVersions.update("0", new NodeApiVersions(Arrays.asList(new ApiVersion(ApiKeys.INIT_PRODUCER_ID.id, (short) 0, (short) 1), new ApiVersion(ApiKeys.PRODUCE.id, (short) 0, (short) 7))));
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        this.transactionManager.failIfNotReadyForSend();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        FutureRecordMetadata appendToAccumulator = appendToAccumulator(this.tp0);
        prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short) 1, 13131L);
        prepareProduceResponse(Errors.NONE, 13131L, (short) 1);
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.isPartitionAdded(this.tp0));
        });
        appendToAccumulator.getClass();
        runUntil(appendToAccumulator::isDone);
        FutureRecordMetadata appendToAccumulator2 = appendToAccumulator(this.tp0);
        prepareProduceResponse(Errors.NONE, 13131L, (short) 1);
        appendToAccumulator2.getClass();
        runUntil(appendToAccumulator2::isDone);
        FutureRecordMetadata appendToAccumulator3 = appendToAccumulator(this.tp0);
        prepareProduceResponse(Errors.TOPIC_AUTHORIZATION_FAILED, 13131L, (short) 1);
        appendToAccumulator3.getClass();
        runUntil(appendToAccumulator3::isDone);
        Assert.assertTrue(this.transactionManager.hasAbortableError());
        TransactionalRequestResult beginAbort = this.transactionManager.beginAbort();
        prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, 13131L, (short) 1);
        beginAbort.getClass();
        runUntil(beginAbort::isCompleted);
        Assert.assertTrue(beginAbort.isSuccessful());
        Assert.assertTrue(this.transactionManager.isReady());
        this.transactionManager.beginTransaction();
        this.transactionManager.failIfNotReadyForSend();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short) 1, 13131L);
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.isPartitionAdded(this.tp0));
        });
        Assert.assertEquals(2L, this.transactionManager.sequenceNumber(this.tp0).intValue());
    }

    @Test
    public void testAbortTransactionAndResetSequenceNumberOnUnknownProducerId() throws InterruptedException {
        this.apiVersions.update("0", new NodeApiVersions(Arrays.asList(new ApiVersion(ApiKeys.INIT_PRODUCER_ID.id, (short) 0, (short) 1), new ApiVersion(ApiKeys.PRODUCE.id, (short) 0, (short) 7))));
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        this.transactionManager.failIfNotReadyForSend();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp1);
        FutureRecordMetadata appendToAccumulator = appendToAccumulator(this.tp1);
        prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp1, (short) 1, 13131L);
        prepareProduceResponse(Errors.NONE, 13131L, (short) 1, this.tp1);
        appendToAccumulator.getClass();
        runUntil(appendToAccumulator::isDone);
        Assert.assertTrue(this.transactionManager.isPartitionAdded(this.tp1));
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        FutureRecordMetadata appendToAccumulator2 = appendToAccumulator(this.tp0);
        prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short) 1, 13131L);
        prepareProduceResponse(Errors.NONE, 13131L, (short) 1);
        appendToAccumulator2.getClass();
        runUntil(appendToAccumulator2::isDone);
        Assert.assertTrue(this.transactionManager.isPartitionAdded(this.tp0));
        FutureRecordMetadata appendToAccumulator3 = appendToAccumulator(this.tp0);
        prepareProduceResponse(Errors.NONE, 13131L, (short) 1);
        appendToAccumulator3.getClass();
        runUntil(appendToAccumulator3::isDone);
        FutureRecordMetadata appendToAccumulator4 = appendToAccumulator(this.tp0);
        this.client.prepareResponse(produceRequestMatcher(13131L, (short) 1, this.tp0), (AbstractResponse) produceResponse(this.tp0, 0L, Errors.UNKNOWN_PRODUCER_ID, 0, 0));
        appendToAccumulator4.getClass();
        runUntil(appendToAccumulator4::isDone);
        Assert.assertTrue(this.transactionManager.hasAbortableError());
        TransactionalRequestResult beginAbort = this.transactionManager.beginAbort();
        prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, 13131L, (short) 1);
        beginAbort.getClass();
        runUntil(beginAbort::isCompleted);
        Assert.assertTrue(beginAbort.isSuccessful());
        Assert.assertTrue(this.transactionManager.isReady());
        this.transactionManager.beginTransaction();
        this.transactionManager.failIfNotReadyForSend();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short) 1, 13131L);
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.isPartitionAdded(this.tp0));
        });
        Assert.assertEquals(0L, this.transactionManager.sequenceNumber(this.tp0).intValue());
        Assert.assertEquals(1L, this.transactionManager.sequenceNumber(this.tp1).intValue());
    }

    @Test
    public void testBumpTransactionalEpochOnAbortableError() throws InterruptedException {
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        this.transactionManager.failIfNotReadyForSend();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short) 1, 13131L);
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.isPartitionAdded(this.tp0));
        });
        FutureRecordMetadata appendToAccumulator = appendToAccumulator(this.tp0);
        prepareProduceResponse(Errors.NONE, 13131L, (short) 1);
        appendToAccumulator.getClass();
        runUntil(appendToAccumulator::isDone);
        FutureRecordMetadata appendToAccumulator2 = appendToAccumulator(this.tp0);
        prepareProduceResponse(Errors.NONE, 13131L, (short) 1);
        appendToAccumulator2.getClass();
        runUntil(appendToAccumulator2::isDone);
        FutureRecordMetadata appendToAccumulator3 = appendToAccumulator(this.tp0);
        prepareProduceResponse(Errors.TOPIC_AUTHORIZATION_FAILED, 13131L, (short) 1);
        appendToAccumulator3.getClass();
        runUntil(appendToAccumulator3::isDone);
        Assert.assertTrue(this.transactionManager.hasAbortableError());
        TransactionalRequestResult beginAbort = this.transactionManager.beginAbort();
        prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, 13131L, (short) 1);
        prepareInitPidResponse(Errors.NONE, false, 13131L, (short) 2);
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.producerIdAndEpoch().epoch == 2);
        });
        Assert.assertTrue(beginAbort.isCompleted());
        Assert.assertTrue(beginAbort.isSuccessful());
        Assert.assertTrue(this.transactionManager.isReady());
        this.transactionManager.beginTransaction();
        this.transactionManager.failIfNotReadyForSend();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short) 2, 13131L);
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.isPartitionAdded(this.tp0));
        });
        Assert.assertEquals(0L, this.transactionManager.sequenceNumber(this.tp0).intValue());
    }

    @Test
    public void testBumpTransactionalEpochOnUnknownProducerIdError() throws InterruptedException {
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        this.transactionManager.failIfNotReadyForSend();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short) 1, 13131L);
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.isPartitionAdded(this.tp0));
        });
        FutureRecordMetadata appendToAccumulator = appendToAccumulator(this.tp0);
        prepareProduceResponse(Errors.NONE, 13131L, (short) 1);
        appendToAccumulator.getClass();
        runUntil(appendToAccumulator::isDone);
        FutureRecordMetadata appendToAccumulator2 = appendToAccumulator(this.tp0);
        prepareProduceResponse(Errors.NONE, 13131L, (short) 1);
        appendToAccumulator2.getClass();
        runUntil(appendToAccumulator2::isDone);
        FutureRecordMetadata appendToAccumulator3 = appendToAccumulator(this.tp0);
        this.client.prepareResponse(produceRequestMatcher(13131L, (short) 1, this.tp0), (AbstractResponse) produceResponse(this.tp0, 0L, Errors.UNKNOWN_PRODUCER_ID, 0, 0));
        appendToAccumulator3.getClass();
        runUntil(appendToAccumulator3::isDone);
        Assert.assertTrue(this.transactionManager.hasAbortableError());
        TransactionalRequestResult beginAbort = this.transactionManager.beginAbort();
        prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, 13131L, (short) 1);
        prepareInitPidResponse(Errors.NONE, false, 13131L, (short) 2);
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.producerIdAndEpoch().epoch == 2);
        });
        Assert.assertTrue(beginAbort.isCompleted());
        Assert.assertTrue(beginAbort.isSuccessful());
        Assert.assertTrue(this.transactionManager.isReady());
        this.transactionManager.beginTransaction();
        this.transactionManager.failIfNotReadyForSend();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short) 2, 13131L);
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.isPartitionAdded(this.tp0));
        });
        Assert.assertEquals(0L, this.transactionManager.sequenceNumber(this.tp0).intValue());
    }

    @Test
    public void testBumpTransactionalEpochOnTimeout() throws InterruptedException {
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        this.transactionManager.failIfNotReadyForSend();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short) 1, 13131L);
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.isPartitionAdded(this.tp0));
        });
        FutureRecordMetadata appendToAccumulator = appendToAccumulator(this.tp0);
        prepareProduceResponse(Errors.NONE, 13131L, (short) 1);
        appendToAccumulator.getClass();
        runUntil(appendToAccumulator::isDone);
        FutureRecordMetadata appendToAccumulator2 = appendToAccumulator(this.tp0);
        prepareProduceResponse(Errors.NONE, 13131L, (short) 1);
        appendToAccumulator2.getClass();
        runUntil(appendToAccumulator2::isDone);
        FutureRecordMetadata appendToAccumulator3 = appendToAccumulator(this.tp0);
        MockClient mockClient = this.client;
        mockClient.getClass();
        runUntil(mockClient::hasInFlightRequests);
        this.time.sleep(10000L);
        Node node = (Node) this.metadata.fetch().nodes().get(0);
        this.client.disconnect(node.idString());
        this.client.blackout(node, 100L);
        appendToAccumulator3.getClass();
        runUntil(appendToAccumulator3::isDone);
        Assert.assertTrue(this.transactionManager.hasAbortableError());
        TransactionalRequestResult beginAbort = this.transactionManager.beginAbort();
        this.sender.runOnce();
        this.time.sleep(110L);
        prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, "foobar");
        prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, 13131L, (short) 1);
        prepareInitPidResponse(Errors.NONE, false, 13131L, (short) 2);
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.producerIdAndEpoch().epoch == 2);
        });
        Assert.assertTrue(beginAbort.isCompleted());
        Assert.assertTrue(beginAbort.isSuccessful());
        Assert.assertTrue(this.transactionManager.isReady());
        this.transactionManager.beginTransaction();
        this.transactionManager.failIfNotReadyForSend();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short) 2, 13131L);
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.isPartitionAdded(this.tp0));
        });
        Assert.assertEquals(0L, this.transactionManager.sequenceNumber(this.tp0).intValue());
    }

    @Test
    public void testBumpTransactionalEpochOnRecoverableAddPartitionRequestError() {
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        this.transactionManager.failIfNotReadyForSend();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        prepareAddPartitionsToTxnResponse(Errors.INVALID_PRODUCER_ID_MAPPING, this.tp0, (short) 1, 13131L);
        TransactionManager transactionManager = this.transactionManager;
        transactionManager.getClass();
        runUntil(transactionManager::hasAbortableError);
        TransactionalRequestResult beginAbort = this.transactionManager.beginAbort();
        prepareInitPidResponse(Errors.NONE, false, 13131L, (short) 2);
        beginAbort.getClass();
        runUntil(beginAbort::isCompleted);
        Assert.assertEquals(2L, this.transactionManager.producerIdAndEpoch().epoch);
        Assert.assertTrue(beginAbort.isSuccessful());
        Assert.assertTrue(this.transactionManager.isReady());
    }

    @Test
    public void testBumpTransactionalEpochOnRecoverableAddOffsetsRequestError() throws InterruptedException {
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        this.transactionManager.failIfNotReadyForSend();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        FutureRecordMetadata appendToAccumulator = appendToAccumulator(this.tp0);
        Assert.assertFalse(appendToAccumulator.isDone());
        prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short) 1, 13131L);
        prepareProduceResponse(Errors.NONE, 13131L, (short) 1);
        appendToAccumulator.getClass();
        runUntil(appendToAccumulator::isDone);
        HashMap hashMap = new HashMap();
        hashMap.put(this.tp0, new OffsetAndMetadata(1L));
        this.transactionManager.sendOffsetsToTransaction(hashMap, new ConsumerGroupMetadata("myconsumergroup"));
        Assert.assertFalse(this.transactionManager.hasPendingOffsetCommits());
        prepareAddOffsetsToTxnResponse(Errors.INVALID_PRODUCER_ID_MAPPING, "myconsumergroup", 13131L, (short) 1);
        TransactionManager transactionManager = this.transactionManager;
        transactionManager.getClass();
        runUntil(transactionManager::hasAbortableError);
        TransactionalRequestResult beginAbort = this.transactionManager.beginAbort();
        prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, 13131L, (short) 1);
        prepareInitPidResponse(Errors.NONE, false, 13131L, (short) 2);
        beginAbort.getClass();
        runUntil(beginAbort::isCompleted);
        Assert.assertEquals(2L, this.transactionManager.producerIdAndEpoch().epoch);
        Assert.assertTrue(beginAbort.isSuccessful());
        Assert.assertTrue(this.transactionManager.isReady());
    }

    @Test
    public void testHealthyPartitionRetriesDuringEpochBump() throws InterruptedException {
        initializeTransactionManager(Optional.empty());
        Sender sender = new Sender(this.logContext, this.client, this.metadata, this.accumulator, false, MAX_REQUEST_SIZE, (short) -1, MAX_RETRIES, new SenderMetricsRegistry(new Metrics(this.time)), this.time, 1000, 50L, this.transactionManager, this.apiVersions);
        initializeIdempotentProducerId(13131L, (short) 1);
        ProducerBatch writeIdempotentBatchWithValue = writeIdempotentBatchWithValue(this.transactionManager, this.tp0, "1");
        ProducerBatch writeIdempotentBatchWithValue2 = writeIdempotentBatchWithValue(this.transactionManager, this.tp0, "2");
        writeIdempotentBatchWithValue(this.transactionManager, this.tp0, "3");
        ProducerBatch writeIdempotentBatchWithValue3 = writeIdempotentBatchWithValue(this.transactionManager, this.tp1, "4");
        ProducerBatch writeIdempotentBatchWithValue4 = writeIdempotentBatchWithValue(this.transactionManager, this.tp1, "5");
        Assert.assertEquals(3L, this.transactionManager.sequenceNumber(this.tp0).intValue());
        Assert.assertEquals(2L, this.transactionManager.sequenceNumber(this.tp1).intValue());
        long milliseconds = this.time.milliseconds();
        ProduceResponse.PartitionResponse partitionResponse = new ProduceResponse.PartitionResponse(Errors.NONE, 500L, milliseconds, 0L);
        writeIdempotentBatchWithValue.done(500L, milliseconds, (RuntimeException) null);
        this.transactionManager.handleCompletedBatch(writeIdempotentBatchWithValue, partitionResponse);
        ProduceResponse.PartitionResponse partitionResponse2 = new ProduceResponse.PartitionResponse(Errors.NONE, 500L, milliseconds, 0L);
        writeIdempotentBatchWithValue3.done(500L, milliseconds, (RuntimeException) null);
        this.transactionManager.handleCompletedBatch(writeIdempotentBatchWithValue3, partitionResponse2);
        Assert.assertTrue(this.transactionManager.canRetry(new ProduceResponse.PartitionResponse(Errors.UNKNOWN_PRODUCER_ID, -1L, -1L, 500L), writeIdempotentBatchWithValue2));
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.producerIdAndEpoch().epoch == 2);
        });
        Assert.assertEquals(writeIdempotentBatchWithValue2, this.transactionManager.nextBatchBySequence(this.tp0));
        Assert.assertEquals(0L, this.transactionManager.firstInFlightSequence(this.tp0));
        Assert.assertEquals(0L, writeIdempotentBatchWithValue2.baseSequence());
        Assert.assertTrue(writeIdempotentBatchWithValue2.sequenceHasBeenReset());
        Assert.assertEquals(2L, writeIdempotentBatchWithValue2.producerEpoch());
        Assert.assertEquals(writeIdempotentBatchWithValue4, this.transactionManager.nextBatchBySequence(this.tp1));
        Assert.assertEquals(1L, this.transactionManager.firstInFlightSequence(this.tp1));
        Assert.assertEquals(1L, writeIdempotentBatchWithValue4.baseSequence());
        Assert.assertFalse(writeIdempotentBatchWithValue4.sequenceHasBeenReset());
        Assert.assertEquals(1L, writeIdempotentBatchWithValue4.producerEpoch());
        appendToAccumulator(this.tp1);
        sender.runOnce();
        Assert.assertEquals(1L, ((Deque) this.accumulator.batches().get(this.tp1)).size());
        Assert.assertTrue(this.transactionManager.canRetry(new ProduceResponse.PartitionResponse(Errors.NOT_LEADER_FOR_PARTITION, -1L, -1L, 600L), writeIdempotentBatchWithValue4));
        this.accumulator.reenqueue(writeIdempotentBatchWithValue4, this.time.milliseconds());
        sender.runOnce();
        Assert.assertEquals(1L, ((Deque) this.accumulator.batches().get(this.tp1)).size());
        Assert.assertNotEquals(writeIdempotentBatchWithValue4, ((Deque) this.accumulator.batches().get(this.tp1)).peek());
        Assert.assertEquals(1L, writeIdempotentBatchWithValue4.producerEpoch());
        ProduceResponse.PartitionResponse partitionResponse3 = new ProduceResponse.PartitionResponse(Errors.NONE, 500L, milliseconds, 0L);
        writeIdempotentBatchWithValue4.done(500L, milliseconds, (RuntimeException) null);
        this.transactionManager.handleCompletedBatch(writeIdempotentBatchWithValue4, partitionResponse3);
        Assert.assertFalse(this.transactionManager.hasInflightBatches(this.tp1));
        Assert.assertEquals(0L, this.transactionManager.sequenceNumber(this.tp1).intValue());
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.hasInflightBatches(this.tp1));
        });
        Assert.assertTrue(((Deque) this.accumulator.batches().get(this.tp1)).isEmpty());
        ProducerBatch nextBatchBySequence = this.transactionManager.nextBatchBySequence(this.tp1);
        Assert.assertEquals(2L, nextBatchBySequence.producerEpoch());
        ProduceResponse.PartitionResponse partitionResponse4 = new ProduceResponse.PartitionResponse(Errors.NONE, 500L, milliseconds, 0L);
        nextBatchBySequence.done(500L, milliseconds, (RuntimeException) null);
        this.transactionManager.handleCompletedBatch(nextBatchBySequence, partitionResponse4);
        Assert.assertFalse(this.transactionManager.hasInflightBatches(this.tp1));
        Assert.assertEquals(1L, this.transactionManager.sequenceNumber(this.tp1).intValue());
    }

    @Test
    public void testRetryAbortTransaction() throws InterruptedException {
        verifyCommitOrAbortTransactionRetriable(TransactionResult.ABORT, TransactionResult.ABORT);
    }

    @Test
    public void testRetryCommitTransaction() throws InterruptedException {
        verifyCommitOrAbortTransactionRetriable(TransactionResult.COMMIT, TransactionResult.COMMIT);
    }

    @Test(expected = KafkaException.class)
    public void testRetryAbortTransactionAfterCommitTimeout() throws InterruptedException {
        verifyCommitOrAbortTransactionRetriable(TransactionResult.COMMIT, TransactionResult.ABORT);
    }

    @Test(expected = KafkaException.class)
    public void testRetryCommitTransactionAfterAbortTimeout() throws InterruptedException {
        verifyCommitOrAbortTransactionRetriable(TransactionResult.ABORT, TransactionResult.COMMIT);
    }

    @Test
    public void testCanBumpEpochDuringCoordinatorDisconnect() {
        doInitTransactions(0L, (short) 0);
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION) != null);
        });
        Assert.assertTrue(this.transactionManager.canBumpEpoch());
        this.apiVersions.remove(this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION).idString());
        Assert.assertTrue(this.transactionManager.canBumpEpoch());
    }

    @Test
    public void testFailedInflightBatchAfterEpochBump() throws InterruptedException {
        initializeTransactionManager(Optional.empty());
        Sender sender = new Sender(this.logContext, this.client, this.metadata, this.accumulator, false, MAX_REQUEST_SIZE, (short) -1, MAX_RETRIES, new SenderMetricsRegistry(new Metrics(this.time)), this.time, 1000, 50L, this.transactionManager, this.apiVersions);
        initializeIdempotentProducerId(13131L, (short) 1);
        ProducerBatch writeIdempotentBatchWithValue = writeIdempotentBatchWithValue(this.transactionManager, this.tp0, "1");
        ProducerBatch writeIdempotentBatchWithValue2 = writeIdempotentBatchWithValue(this.transactionManager, this.tp0, "2");
        writeIdempotentBatchWithValue(this.transactionManager, this.tp0, "3");
        ProducerBatch writeIdempotentBatchWithValue3 = writeIdempotentBatchWithValue(this.transactionManager, this.tp1, "4");
        ProducerBatch writeIdempotentBatchWithValue4 = writeIdempotentBatchWithValue(this.transactionManager, this.tp1, "5");
        Assert.assertEquals(3L, this.transactionManager.sequenceNumber(this.tp0).intValue());
        Assert.assertEquals(2L, this.transactionManager.sequenceNumber(this.tp1).intValue());
        long milliseconds = this.time.milliseconds();
        ProduceResponse.PartitionResponse partitionResponse = new ProduceResponse.PartitionResponse(Errors.NONE, 500L, milliseconds, 0L);
        writeIdempotentBatchWithValue.done(500L, milliseconds, (RuntimeException) null);
        this.transactionManager.handleCompletedBatch(writeIdempotentBatchWithValue, partitionResponse);
        ProduceResponse.PartitionResponse partitionResponse2 = new ProduceResponse.PartitionResponse(Errors.NONE, 500L, milliseconds, 0L);
        writeIdempotentBatchWithValue3.done(500L, milliseconds, (RuntimeException) null);
        this.transactionManager.handleCompletedBatch(writeIdempotentBatchWithValue3, partitionResponse2);
        Assert.assertTrue(this.transactionManager.canRetry(new ProduceResponse.PartitionResponse(Errors.UNKNOWN_PRODUCER_ID, -1L, -1L, 500L), writeIdempotentBatchWithValue2));
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.producerIdAndEpoch().epoch == 2);
        });
        Assert.assertEquals(writeIdempotentBatchWithValue2, this.transactionManager.nextBatchBySequence(this.tp0));
        Assert.assertEquals(0L, this.transactionManager.firstInFlightSequence(this.tp0));
        Assert.assertEquals(0L, writeIdempotentBatchWithValue2.baseSequence());
        Assert.assertTrue(writeIdempotentBatchWithValue2.sequenceHasBeenReset());
        Assert.assertEquals(2L, writeIdempotentBatchWithValue2.producerEpoch());
        Assert.assertEquals(writeIdempotentBatchWithValue4, this.transactionManager.nextBatchBySequence(this.tp1));
        Assert.assertEquals(1L, this.transactionManager.firstInFlightSequence(this.tp1));
        Assert.assertEquals(1L, writeIdempotentBatchWithValue4.baseSequence());
        Assert.assertFalse(writeIdempotentBatchWithValue4.sequenceHasBeenReset());
        Assert.assertEquals(1L, writeIdempotentBatchWithValue4.producerEpoch());
        appendToAccumulator(this.tp1);
        sender.runOnce();
        Assert.assertEquals(1L, ((Deque) this.accumulator.batches().get(this.tp1)).size());
        Assert.assertTrue(this.transactionManager.canRetry(new ProduceResponse.PartitionResponse(Errors.NOT_LEADER_FOR_PARTITION, -1L, -1L, 600L), writeIdempotentBatchWithValue4));
        this.accumulator.reenqueue(writeIdempotentBatchWithValue4, this.time.milliseconds());
        sender.runOnce();
        Assert.assertEquals(1L, ((Deque) this.accumulator.batches().get(this.tp1)).size());
        Assert.assertNotEquals(writeIdempotentBatchWithValue4, ((Deque) this.accumulator.batches().get(this.tp1)).peek());
        Assert.assertEquals(1L, writeIdempotentBatchWithValue4.producerEpoch());
        ProduceResponse.PartitionResponse partitionResponse3 = new ProduceResponse.PartitionResponse(Errors.NONE, 500L, milliseconds, 0L);
        writeIdempotentBatchWithValue4.done(500L, milliseconds, (RuntimeException) null);
        this.transactionManager.handleCompletedBatch(writeIdempotentBatchWithValue4, partitionResponse3);
        Assert.assertFalse(this.transactionManager.hasInflightBatches(this.tp1));
        Assert.assertEquals(0L, this.transactionManager.sequenceNumber(this.tp1).intValue());
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.hasInflightBatches(this.tp1));
        });
        Assert.assertTrue(((Deque) this.accumulator.batches().get(this.tp1)).isEmpty());
        ProducerBatch nextBatchBySequence = this.transactionManager.nextBatchBySequence(this.tp1);
        Assert.assertEquals(2L, nextBatchBySequence.producerEpoch());
        ProduceResponse.PartitionResponse partitionResponse4 = new ProduceResponse.PartitionResponse(Errors.NONE, 500L, milliseconds, 0L);
        nextBatchBySequence.done(500L, milliseconds, (RuntimeException) null);
        this.transactionManager.handleCompletedBatch(nextBatchBySequence, partitionResponse4);
        Assert.assertFalse(this.transactionManager.hasInflightBatches(this.tp1));
        Assert.assertEquals(1L, this.transactionManager.sequenceNumber(this.tp1).intValue());
    }

    private FutureRecordMetadata appendToAccumulator(TopicPartition topicPartition) throws InterruptedException {
        long milliseconds = this.time.milliseconds();
        return this.accumulator.append(topicPartition, milliseconds, "key".getBytes(), "value".getBytes(), Record.EMPTY_HEADERS, (Callback) null, 1000L, false, milliseconds).future;
    }

    private void verifyCommitOrAbortTransactionRetriable(TransactionResult transactionResult, TransactionResult transactionResult2) throws InterruptedException {
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        this.transactionManager.failIfNotReadyForSend();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        appendToAccumulator(this.tp0);
        prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short) 1, 13131L);
        prepareProduceResponse(Errors.NONE, 13131L, (short) 1);
        runUntil(() -> {
            return Boolean.valueOf(!this.client.hasPendingResponses());
        });
        TransactionalRequestResult beginCommit = transactionResult == TransactionResult.COMMIT ? this.transactionManager.beginCommit() : this.transactionManager.beginAbort();
        prepareEndTxnResponse(Errors.NONE, transactionResult, 13131L, (short) 1, true);
        runUntil(() -> {
            return Boolean.valueOf(!this.client.hasPendingResponses());
        });
        Assert.assertFalse(beginCommit.isCompleted());
        try {
            beginCommit.await(1000L, TimeUnit.MILLISECONDS);
            Assert.fail("Should have raised TimeoutException");
        } catch (TimeoutException e) {
        }
        prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, "foobar");
        runUntil(() -> {
            return Boolean.valueOf(!this.client.hasPendingResponses());
        });
        TransactionalRequestResult beginCommit2 = transactionResult2 == TransactionResult.COMMIT ? this.transactionManager.beginCommit() : this.transactionManager.beginAbort();
        Assert.assertEquals(beginCommit2, beginCommit);
        prepareEndTxnResponse(Errors.NONE, transactionResult2, 13131L, (short) 1, false);
        beginCommit2.getClass();
        runUntil(beginCommit2::isCompleted);
        Assert.assertFalse(this.transactionManager.hasOngoingTransaction());
    }

    private void verifyAddPartitionsFailsWithPartitionLevelError(Errors errors) throws InterruptedException {
        doInitTransactions(1L, (short) 1);
        this.transactionManager.beginTransaction();
        this.transactionManager.failIfNotReadyForSend();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        Assert.assertFalse(appendToAccumulator(this.tp0).isDone());
        prepareAddPartitionsToTxn(this.tp0, errors);
        TransactionManager transactionManager = this.transactionManager;
        transactionManager.getClass();
        runUntil(transactionManager::hasError);
        Assert.assertFalse(this.transactionManager.transactionContainsPartition(this.tp0));
    }

    private void prepareAddPartitionsToTxn(Map<TopicPartition, Errors> map) {
        this.client.prepareResponse(abstractRequest -> {
            Assert.assertEquals(new HashSet(((AddPartitionsToTxnRequest) abstractRequest).partitions()), new HashSet(map.keySet()));
            return true;
        }, (AbstractResponse) new AddPartitionsToTxnResponse(0, map));
    }

    private void prepareAddPartitionsToTxn(TopicPartition topicPartition, Errors errors) {
        prepareAddPartitionsToTxn(Collections.singletonMap(topicPartition, errors));
    }

    private void prepareFindCoordinatorResponse(Errors errors, boolean z, FindCoordinatorRequest.CoordinatorType coordinatorType, String str) {
        this.client.prepareResponse(abstractRequest -> {
            FindCoordinatorRequest findCoordinatorRequest = (FindCoordinatorRequest) abstractRequest;
            Assert.assertEquals(FindCoordinatorRequest.CoordinatorType.forId(findCoordinatorRequest.data().keyType()), coordinatorType);
            Assert.assertEquals(findCoordinatorRequest.data().key(), str);
            return true;
        }, FindCoordinatorResponse.prepareResponse(errors, this.brokerNode), z);
    }

    private void prepareInitPidResponse(Errors errors, boolean z, long j, short s) {
        this.client.prepareResponse(abstractRequest -> {
            Assert.assertEquals("foobar", ((InitProducerIdRequest) abstractRequest).data.transactionalId());
            Assert.assertEquals(1121L, r0.data.transactionTimeoutMs());
            return true;
        }, new InitProducerIdResponse(new InitProducerIdResponseData().setErrorCode(errors.code()).setProducerEpoch(s).setProducerId(j).setThrottleTimeMs(0)), z);
    }

    private void sendProduceResponse(Errors errors, long j, short s) {
        sendProduceResponse(errors, j, s, this.tp0);
    }

    private void sendProduceResponse(Errors errors, long j, short s, TopicPartition topicPartition) {
        this.client.respond(produceRequestMatcher(j, s, topicPartition), (AbstractResponse) produceResponse(topicPartition, 0L, errors, 0));
    }

    private void prepareProduceResponse(Errors errors, long j, short s) {
        prepareProduceResponse(errors, j, s, this.tp0);
    }

    private void prepareProduceResponse(Errors errors, long j, short s, TopicPartition topicPartition) {
        this.client.prepareResponse(produceRequestMatcher(j, s, topicPartition), (AbstractResponse) produceResponse(topicPartition, 0L, errors, 0));
    }

    private MockClient.RequestMatcher produceRequestMatcher(long j, short s) {
        return produceRequestMatcher(j, s, this.tp0);
    }

    private MockClient.RequestMatcher produceRequestMatcher(long j, short s, TopicPartition topicPartition) {
        return abstractRequest -> {
            ProduceRequest produceRequest = (ProduceRequest) abstractRequest;
            MemoryRecords memoryRecords = (MemoryRecords) produceRequest.partitionRecordsOrFail().get(topicPartition);
            Assert.assertNotNull(memoryRecords);
            Iterator it = memoryRecords.batches().iterator();
            Assert.assertTrue(it.hasNext());
            MutableRecordBatch mutableRecordBatch = (MutableRecordBatch) it.next();
            Assert.assertFalse(it.hasNext());
            Assert.assertTrue(mutableRecordBatch.isTransactional());
            Assert.assertEquals(j, mutableRecordBatch.producerId());
            Assert.assertEquals(s, mutableRecordBatch.producerEpoch());
            Assert.assertEquals("foobar", produceRequest.transactionalId());
            return true;
        };
    }

    private void prepareAddPartitionsToTxnResponse(Errors errors, TopicPartition topicPartition, short s, long j) {
        this.client.prepareResponse(addPartitionsRequestMatcher(topicPartition, s, j), (AbstractResponse) new AddPartitionsToTxnResponse(0, Collections.singletonMap(topicPartition, errors)));
    }

    private void sendAddPartitionsToTxnResponse(Errors errors, TopicPartition topicPartition, short s, long j) {
        this.client.respond(addPartitionsRequestMatcher(topicPartition, s, j), (AbstractResponse) new AddPartitionsToTxnResponse(0, Collections.singletonMap(topicPartition, errors)));
    }

    private MockClient.RequestMatcher addPartitionsRequestMatcher(TopicPartition topicPartition, short s, long j) {
        return abstractRequest -> {
            AddPartitionsToTxnRequest addPartitionsToTxnRequest = (AddPartitionsToTxnRequest) abstractRequest;
            Assert.assertEquals(j, addPartitionsToTxnRequest.producerId());
            Assert.assertEquals(s, addPartitionsToTxnRequest.producerEpoch());
            Assert.assertEquals(Collections.singletonList(topicPartition), addPartitionsToTxnRequest.partitions());
            Assert.assertEquals("foobar", addPartitionsToTxnRequest.transactionalId());
            return true;
        };
    }

    private void prepareEndTxnResponse(Errors errors, TransactionResult transactionResult, long j, short s) {
        prepareEndTxnResponse(errors, transactionResult, j, s, false);
    }

    private void prepareEndTxnResponse(Errors errors, TransactionResult transactionResult, long j, short s, boolean z) {
        this.client.prepareResponse(endTxnMatcher(transactionResult, j, s), new EndTxnResponse(new EndTxnResponseData().setErrorCode(errors.code()).setThrottleTimeMs(0)), z);
    }

    private void sendEndTxnResponse(Errors errors, TransactionResult transactionResult, long j, short s) {
        this.client.respond(endTxnMatcher(transactionResult, j, s), (AbstractResponse) new EndTxnResponse(new EndTxnResponseData().setErrorCode(errors.code()).setThrottleTimeMs(0)));
    }

    private MockClient.RequestMatcher endTxnMatcher(TransactionResult transactionResult, long j, short s) {
        return abstractRequest -> {
            EndTxnRequest endTxnRequest = (EndTxnRequest) abstractRequest;
            Assert.assertEquals("foobar", endTxnRequest.data.transactionalId());
            Assert.assertEquals(j, endTxnRequest.data.producerId());
            Assert.assertEquals(s, endTxnRequest.data.producerEpoch());
            Assert.assertEquals(transactionResult, endTxnRequest.result());
            return true;
        };
    }

    private void prepareAddOffsetsToTxnResponse(Errors errors, String str, long j, short s) {
        this.client.prepareResponse(abstractRequest -> {
            AddOffsetsToTxnRequest addOffsetsToTxnRequest = (AddOffsetsToTxnRequest) abstractRequest;
            Assert.assertEquals(str, addOffsetsToTxnRequest.consumerGroupId());
            Assert.assertEquals("foobar", addOffsetsToTxnRequest.transactionalId());
            Assert.assertEquals(j, addOffsetsToTxnRequest.producerId());
            Assert.assertEquals(s, addOffsetsToTxnRequest.producerEpoch());
            return true;
        }, (AbstractResponse) new AddOffsetsToTxnResponse(0, errors));
    }

    private void prepareTxnOffsetCommitResponse(String str, long j, short s, Map<TopicPartition, Errors> map) {
        this.client.prepareResponse(abstractRequest -> {
            TxnOffsetCommitRequest txnOffsetCommitRequest = (TxnOffsetCommitRequest) abstractRequest;
            Assert.assertEquals(str, txnOffsetCommitRequest.data.groupId());
            Assert.assertEquals(j, txnOffsetCommitRequest.data.producerId());
            Assert.assertEquals(s, txnOffsetCommitRequest.data.producerEpoch());
            return true;
        }, (AbstractResponse) new TxnOffsetCommitResponse(0, map));
    }

    private void prepareTxnOffsetCommitResponse(String str, long j, short s, String str2, String str3, int i, Map<TopicPartition, Errors> map) {
        this.client.prepareResponse(abstractRequest -> {
            TxnOffsetCommitRequest txnOffsetCommitRequest = (TxnOffsetCommitRequest) abstractRequest;
            Assert.assertEquals(str, txnOffsetCommitRequest.data.groupId());
            Assert.assertEquals(j, txnOffsetCommitRequest.data.producerId());
            Assert.assertEquals(s, txnOffsetCommitRequest.data.producerEpoch());
            Assert.assertEquals(str2, txnOffsetCommitRequest.data.groupInstanceId());
            Assert.assertEquals(str3, txnOffsetCommitRequest.data.memberId());
            Assert.assertEquals(i, txnOffsetCommitRequest.data.generationId());
            return true;
        }, (AbstractResponse) new TxnOffsetCommitResponse(0, map));
    }

    private ProduceResponse produceResponse(TopicPartition topicPartition, long j, Errors errors, int i) {
        return produceResponse(topicPartition, j, errors, i, 10);
    }

    private ProduceResponse produceResponse(TopicPartition topicPartition, long j, Errors errors, int i, int i2) {
        return new ProduceResponse(Collections.singletonMap(topicPartition, new ProduceResponse.PartitionResponse(errors, j, -1L, i2)), i);
    }

    private void initializeIdempotentProducerId(long j, short s) {
        this.client.prepareResponse(abstractRequest -> {
            Assert.assertNull(((InitProducerIdRequest) abstractRequest).data.transactionalId());
            return true;
        }, new InitProducerIdResponse(new InitProducerIdResponseData().setErrorCode(Errors.NONE.code()).setProducerEpoch(s).setProducerId(j).setThrottleTimeMs(0)), false);
        TransactionManager transactionManager = this.transactionManager;
        transactionManager.getClass();
        runUntil(transactionManager::hasProducerId);
    }

    private void doInitTransactions(long j, short s) {
        this.transactionManager.initializeTransactions();
        prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, "foobar");
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION) != null);
        });
        Assert.assertEquals(this.brokerNode, this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
        prepareInitPidResponse(Errors.NONE, false, j, s);
        TransactionManager transactionManager = this.transactionManager;
        transactionManager.getClass();
        runUntil(transactionManager::hasProducerId);
    }

    private void assertAbortableError(Class<? extends RuntimeException> cls) {
        try {
            this.transactionManager.beginCommit();
            Assert.fail("Should have raised " + cls.getSimpleName());
        } catch (KafkaException e) {
            Assert.assertTrue(cls.isAssignableFrom(e.getCause().getClass()));
            Assert.assertTrue(this.transactionManager.hasError());
        }
        Assert.assertTrue(this.transactionManager.hasError());
        this.transactionManager.beginAbort();
        Assert.assertFalse(this.transactionManager.hasError());
    }

    private void assertFatalError(Class<? extends RuntimeException> cls) {
        Assert.assertTrue(this.transactionManager.hasError());
        try {
            this.transactionManager.beginAbort();
            Assert.fail("Should have raised " + cls.getSimpleName());
        } catch (KafkaException e) {
            Assert.assertTrue(cls.isAssignableFrom(e.getCause().getClass()));
            Assert.assertTrue(this.transactionManager.hasError());
        }
        try {
            this.transactionManager.beginAbort();
            Assert.fail("Should have raised " + cls.getSimpleName());
        } catch (KafkaException e2) {
            Assert.assertTrue(cls.isAssignableFrom(e2.getCause().getClass()));
            Assert.assertTrue(this.transactionManager.hasError());
        }
    }

    private void assertFutureFailed(Future<RecordMetadata> future) throws InterruptedException {
        Assert.assertTrue(future.isDone());
        try {
            future.get();
            Assert.fail("Expected produce future to throw");
        } catch (ExecutionException e) {
        }
    }

    private void runUntil(Supplier<Boolean> supplier) {
        for (int i = 0; i < 5 && !supplier.get().booleanValue(); i++) {
            this.sender.runOnce();
        }
        if (!supplier.get().booleanValue()) {
            throw new AssertionError("Condition was not satisfied after multiple runs");
        }
    }
}
