/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients.producer.internals;

import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.clients.producer.internals.BufferPool;
import org.apache.kafka.clients.producer.internals.FutureRecordMetadata;
import org.apache.kafka.clients.producer.internals.ProducerBatch;
import org.apache.kafka.clients.producer.internals.ProducerMetadata;
import org.apache.kafka.clients.producer.internals.ProducerTestUtils;
import org.apache.kafka.clients.producer.internals.RecordAccumulator;
import org.apache.kafka.clients.producer.internals.Sender;
import org.apache.kafka.clients.producer.internals.SenderMetricsRegistry;
import org.apache.kafka.clients.producer.internals.TransactionManager;
import org.apache.kafka.clients.producer.internals.TransactionalRequestResult;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.FencedInstanceIdException;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.TransactionalIdAuthorizationException;
import org.apache.kafka.common.errors.UnsupportedForMessageFormatException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.message.AddOffsetsToTxnResponseData;
import org.apache.kafka.common.message.ApiVersionsResponseData;
import org.apache.kafka.common.message.EndTxnResponseData;
import org.apache.kafka.common.message.InitProducerIdResponseData;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MemoryRecordsBuilder;
import org.apache.kafka.common.record.MutableRecordBatch;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.AddOffsetsToTxnRequest;
import org.apache.kafka.common.requests.AddOffsetsToTxnResponse;
import org.apache.kafka.common.requests.AddPartitionsToTxnRequest;
import org.apache.kafka.common.requests.AddPartitionsToTxnResponse;
import org.apache.kafka.common.requests.EndTxnRequest;
import org.apache.kafka.common.requests.EndTxnResponse;
import org.apache.kafka.common.requests.FindCoordinatorRequest;
import org.apache.kafka.common.requests.FindCoordinatorResponse;
import org.apache.kafka.common.requests.InitProducerIdRequest;
import org.apache.kafka.common.requests.InitProducerIdResponse;
import org.apache.kafka.common.requests.ProduceRequest;
import org.apache.kafka.common.requests.ProduceResponse;
import org.apache.kafka.common.requests.RequestTestUtils;
import org.apache.kafka.common.requests.TransactionResult;
import org.apache.kafka.common.requests.TxnOffsetCommitRequest;
import org.apache.kafka.common.requests.TxnOffsetCommitResponse;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.ProducerIdAndEpoch;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;

public class TransactionManagerTest {
    private static final int MAX_REQUEST_SIZE = 0x100000;
    private static final short ACKS_ALL = -1;
    private static final int MAX_RETRIES = Integer.MAX_VALUE;
    private static final int MAX_BLOCK_TIMEOUT = 1000;
    private static final int REQUEST_TIMEOUT = 1000;
    private static final long DEFAULT_RETRY_BACKOFF_MS = 100L;
    private final String transactionalId = "foobar";
    private final int transactionTimeoutMs = 1121;
    private final String topic = "test";
    private final TopicPartition tp0 = new TopicPartition("test", 0);
    private final TopicPartition tp1 = new TopicPartition("test", 1);
    private final long producerId = 13131L;
    private final short epoch = 1;
    private final String consumerGroupId = "myConsumerGroup";
    private final String memberId = "member";
    private final int generationId = 5;
    private final String groupInstanceId = "instance";
    private final LogContext logContext = new LogContext();
    private final MockTime time = new MockTime();
    private final ProducerMetadata metadata = new ProducerMetadata(0L, Long.MAX_VALUE, Long.MAX_VALUE, this.logContext, new ClusterResourceListeners(), (Time)this.time);
    private final MockClient client = new MockClient((Time)this.time, (Metadata)this.metadata);
    private final ApiVersions apiVersions = new ApiVersions();
    private RecordAccumulator accumulator = null;
    private Sender sender = null;
    private TransactionManager transactionManager = null;
    private Node brokerNode = null;

    @BeforeEach
    public void setup() {
        this.metadata.add("test", this.time.milliseconds());
        this.client.updateMetadata(RequestTestUtils.metadataUpdateWith(1, Collections.singletonMap("test", 2)));
        this.brokerNode = new Node(0, "localhost", 2211);
        this.initializeTransactionManager(Optional.of("foobar"));
    }

    private void initializeTransactionManager(Optional<String> transactionalId) {
        Metrics metrics = new Metrics((Time)this.time);
        this.apiVersions.update("0", NodeApiVersions.create(Arrays.asList(new ApiVersionsResponseData.ApiVersion().setApiKey(ApiKeys.INIT_PRODUCER_ID.id).setMinVersion((short)0).setMaxVersion((short)3), new ApiVersionsResponseData.ApiVersion().setApiKey(ApiKeys.PRODUCE.id).setMinVersion((short)0).setMaxVersion((short)7))));
        this.transactionManager = new TransactionManager(this.logContext, (String)transactionalId.orElse(null), 1121, 100L, this.apiVersions);
        int batchSize = 16384;
        int deliveryTimeoutMs = 3000;
        long totalSize = 0x100000L;
        String metricGrpName = "producer-metrics";
        this.brokerNode = new Node(0, "localhost", 2211);
        this.accumulator = new RecordAccumulator(this.logContext, batchSize, CompressionType.NONE, 0, 0L, deliveryTimeoutMs, metrics, metricGrpName, (Time)this.time, this.apiVersions, this.transactionManager, new BufferPool(totalSize, batchSize, metrics, (Time)this.time, metricGrpName));
        this.sender = new Sender(this.logContext, (KafkaClient)this.client, this.metadata, this.accumulator, true, 0x100000, -1, Integer.MAX_VALUE, new SenderMetricsRegistry(metrics), (Time)this.time, 1000, 50L, this.transactionManager, this.apiVersions);
    }

    @Test
    public void testSenderShutdownWithPendingTransactions() throws Exception {
        this.doInitTransactions();
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(this.tp0);
        FutureRecordMetadata sendFuture = this.appendToAccumulator(this.tp0);
        this.prepareAddPartitionsToTxn(this.tp0, Errors.NONE);
        this.prepareProduceResponse(Errors.NONE, 13131L, (short)1);
        this.runUntil(() -> !this.client.hasPendingResponses());
        this.sender.initiateClose();
        this.sender.runOnce();
        TransactionalRequestResult result = this.transactionManager.beginCommit();
        this.prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, 13131L, (short)1);
        this.runUntil(() -> ((TransactionalRequestResult)result).isCompleted());
        this.runUntil(() -> ((FutureRecordMetadata)sendFuture).isDone());
    }

    @Test
    public void testEndTxnNotSentIfIncompleteBatches() {
        this.doInitTransactions();
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(this.tp0);
        this.prepareAddPartitionsToTxn(this.tp0, Errors.NONE);
        this.runUntil(() -> this.transactionManager.isPartitionAdded(this.tp0));
        this.transactionManager.beginCommit();
        Assertions.assertNull((Object)this.transactionManager.nextRequest(true));
        Assertions.assertTrue((boolean)this.transactionManager.nextRequest(false).isEndTxn());
    }

    @Test
    public void testFailIfNotReadyForSendNoProducerId() {
        Assertions.assertThrows(IllegalStateException.class, () -> this.transactionManager.maybeAddPartition(this.tp0));
    }

    @Test
    public void testFailIfNotReadyForSendIdempotentProducer() {
        this.initializeTransactionManager(Optional.empty());
        this.transactionManager.maybeAddPartition(this.tp0);
    }

    @Test
    public void testFailIfNotReadyForSendIdempotentProducerFatalError() {
        this.initializeTransactionManager(Optional.empty());
        this.transactionManager.transitionToFatalError((RuntimeException)((Object)new KafkaException()));
        Assertions.assertThrows(KafkaException.class, () -> this.transactionManager.maybeAddPartition(this.tp0));
    }

    @Test
    public void testFailIfNotReadyForSendNoOngoingTransaction() {
        this.doInitTransactions();
        Assertions.assertThrows(IllegalStateException.class, () -> this.transactionManager.maybeAddPartition(this.tp0));
    }

    @Test
    public void testFailIfNotReadyForSendAfterAbortableError() {
        this.doInitTransactions();
        this.transactionManager.beginTransaction();
        this.transactionManager.transitionToAbortableError((RuntimeException)((Object)new KafkaException()));
        Assertions.assertThrows(KafkaException.class, () -> this.transactionManager.maybeAddPartition(this.tp0));
    }

    @Test
    public void testFailIfNotReadyForSendAfterFatalError() {
        this.doInitTransactions();
        this.transactionManager.transitionToFatalError((RuntimeException)((Object)new KafkaException()));
        Assertions.assertThrows(KafkaException.class, () -> this.transactionManager.maybeAddPartition(this.tp0));
    }

    @Test
    public void testHasOngoingTransactionSuccessfulAbort() {
        TopicPartition partition = new TopicPartition("foo", 0);
        Assertions.assertFalse((boolean)this.transactionManager.hasOngoingTransaction());
        this.doInitTransactions();
        Assertions.assertFalse((boolean)this.transactionManager.hasOngoingTransaction());
        this.transactionManager.beginTransaction();
        Assertions.assertTrue((boolean)this.transactionManager.hasOngoingTransaction());
        this.transactionManager.maybeAddPartition(partition);
        this.runUntil(() -> ((TransactionManager)this.transactionManager).hasOngoingTransaction());
        this.prepareAddPartitionsToTxn(partition, Errors.NONE);
        this.runUntil(() -> this.transactionManager.isPartitionAdded(partition));
        this.transactionManager.beginAbort();
        Assertions.assertTrue((boolean)this.transactionManager.hasOngoingTransaction());
        this.prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, 13131L, (short)1);
        this.runUntil(() -> !this.transactionManager.hasOngoingTransaction());
    }

    @Test
    public void testHasOngoingTransactionSuccessfulCommit() {
        TopicPartition partition = new TopicPartition("foo", 0);
        Assertions.assertFalse((boolean)this.transactionManager.hasOngoingTransaction());
        this.doInitTransactions();
        Assertions.assertFalse((boolean)this.transactionManager.hasOngoingTransaction());
        this.transactionManager.beginTransaction();
        Assertions.assertTrue((boolean)this.transactionManager.hasOngoingTransaction());
        this.transactionManager.maybeAddPartition(partition);
        Assertions.assertTrue((boolean)this.transactionManager.hasOngoingTransaction());
        this.prepareAddPartitionsToTxn(partition, Errors.NONE);
        this.runUntil(() -> this.transactionManager.isPartitionAdded(partition));
        this.transactionManager.beginCommit();
        Assertions.assertTrue((boolean)this.transactionManager.hasOngoingTransaction());
        this.prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, 13131L, (short)1);
        this.runUntil(() -> !this.transactionManager.hasOngoingTransaction());
    }

    @Test
    public void testHasOngoingTransactionAbortableError() {
        TopicPartition partition = new TopicPartition("foo", 0);
        Assertions.assertFalse((boolean)this.transactionManager.hasOngoingTransaction());
        this.doInitTransactions();
        Assertions.assertFalse((boolean)this.transactionManager.hasOngoingTransaction());
        this.transactionManager.beginTransaction();
        Assertions.assertTrue((boolean)this.transactionManager.hasOngoingTransaction());
        this.transactionManager.maybeAddPartition(partition);
        Assertions.assertTrue((boolean)this.transactionManager.hasOngoingTransaction());
        this.prepareAddPartitionsToTxn(partition, Errors.NONE);
        this.runUntil(() -> this.transactionManager.isPartitionAdded(partition));
        this.transactionManager.transitionToAbortableError((RuntimeException)((Object)new KafkaException()));
        Assertions.assertTrue((boolean)this.transactionManager.hasOngoingTransaction());
        this.transactionManager.beginAbort();
        Assertions.assertTrue((boolean)this.transactionManager.hasOngoingTransaction());
        this.prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, 13131L, (short)1);
        this.runUntil(() -> !this.transactionManager.hasOngoingTransaction());
    }

    @Test
    public void testHasOngoingTransactionFatalError() {
        TopicPartition partition = new TopicPartition("foo", 0);
        Assertions.assertFalse((boolean)this.transactionManager.hasOngoingTransaction());
        this.doInitTransactions();
        Assertions.assertFalse((boolean)this.transactionManager.hasOngoingTransaction());
        this.transactionManager.beginTransaction();
        Assertions.assertTrue((boolean)this.transactionManager.hasOngoingTransaction());
        this.transactionManager.maybeAddPartition(partition);
        Assertions.assertTrue((boolean)this.transactionManager.hasOngoingTransaction());
        this.prepareAddPartitionsToTxn(partition, Errors.NONE);
        this.runUntil(() -> this.transactionManager.isPartitionAdded(partition));
        this.transactionManager.transitionToFatalError((RuntimeException)((Object)new KafkaException()));
        Assertions.assertFalse((boolean)this.transactionManager.hasOngoingTransaction());
    }

    @Test
    public void testMaybeAddPartitionToTransaction() {
        TopicPartition partition = new TopicPartition("foo", 0);
        this.doInitTransactions();
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(partition);
        Assertions.assertTrue((boolean)this.transactionManager.hasPartitionsToAdd());
        Assertions.assertFalse((boolean)this.transactionManager.isPartitionAdded(partition));
        Assertions.assertTrue((boolean)this.transactionManager.isPartitionPendingAdd(partition));
        this.prepareAddPartitionsToTxn(partition, Errors.NONE);
        Assertions.assertTrue((boolean)this.transactionManager.hasPartitionsToAdd());
        this.runUntil(() -> this.transactionManager.isPartitionAdded(partition));
        Assertions.assertFalse((boolean)this.transactionManager.hasPartitionsToAdd());
        Assertions.assertFalse((boolean)this.transactionManager.isPartitionPendingAdd(partition));
        this.transactionManager.maybeAddPartition(partition);
        Assertions.assertFalse((boolean)this.transactionManager.hasPartitionsToAdd());
        Assertions.assertTrue((boolean)this.transactionManager.isPartitionAdded(partition));
        Assertions.assertFalse((boolean)this.transactionManager.isPartitionPendingAdd(partition));
    }

    @Test
    public void testAddPartitionToTransactionOverridesRetryBackoffForConcurrentTransactions() {
        TopicPartition partition = new TopicPartition("foo", 0);
        this.doInitTransactions();
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(partition);
        Assertions.assertTrue((boolean)this.transactionManager.hasPartitionsToAdd());
        Assertions.assertFalse((boolean)this.transactionManager.isPartitionAdded(partition));
        Assertions.assertTrue((boolean)this.transactionManager.isPartitionPendingAdd(partition));
        this.prepareAddPartitionsToTxn(partition, Errors.CONCURRENT_TRANSACTIONS);
        this.runUntil(() -> !this.client.hasPendingResponses());
        TransactionManager.TxnRequestHandler handler = this.transactionManager.nextRequest(false);
        Assertions.assertNotNull((Object)handler);
        Assertions.assertEquals((long)20L, (long)handler.retryBackoffMs());
    }

    @Test
    public void testAddPartitionToTransactionRetainsRetryBackoffForRegularRetriableError() {
        TopicPartition partition = new TopicPartition("foo", 0);
        this.doInitTransactions();
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(partition);
        Assertions.assertTrue((boolean)this.transactionManager.hasPartitionsToAdd());
        Assertions.assertFalse((boolean)this.transactionManager.isPartitionAdded(partition));
        Assertions.assertTrue((boolean)this.transactionManager.isPartitionPendingAdd(partition));
        this.prepareAddPartitionsToTxn(partition, Errors.COORDINATOR_NOT_AVAILABLE);
        this.runUntil(() -> !this.client.hasPendingResponses());
        TransactionManager.TxnRequestHandler handler = this.transactionManager.nextRequest(false);
        Assertions.assertNotNull((Object)handler);
        Assertions.assertEquals((long)100L, (long)handler.retryBackoffMs());
    }

    @Test
    public void testAddPartitionToTransactionRetainsRetryBackoffWhenPartitionsAlreadyAdded() {
        TopicPartition partition = new TopicPartition("foo", 0);
        this.doInitTransactions();
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(partition);
        Assertions.assertTrue((boolean)this.transactionManager.hasPartitionsToAdd());
        Assertions.assertFalse((boolean)this.transactionManager.isPartitionAdded(partition));
        Assertions.assertTrue((boolean)this.transactionManager.isPartitionPendingAdd(partition));
        this.prepareAddPartitionsToTxn(partition, Errors.NONE);
        this.runUntil(() -> this.transactionManager.isPartitionAdded(partition));
        TopicPartition otherPartition = new TopicPartition("foo", 1);
        this.transactionManager.maybeAddPartition(otherPartition);
        this.prepareAddPartitionsToTxn(otherPartition, Errors.CONCURRENT_TRANSACTIONS);
        TransactionManager.TxnRequestHandler handler = this.transactionManager.nextRequest(false);
        Assertions.assertNotNull((Object)handler);
        Assertions.assertEquals((long)100L, (long)handler.retryBackoffMs());
    }

    @Test
    public void testNotReadyForSendBeforeInitTransactions() {
        Assertions.assertThrows(IllegalStateException.class, () -> this.transactionManager.maybeAddPartition(this.tp0));
    }

    @Test
    public void testNotReadyForSendBeforeBeginTransaction() {
        this.doInitTransactions();
        Assertions.assertThrows(IllegalStateException.class, () -> this.transactionManager.maybeAddPartition(this.tp0));
    }

    @Test
    public void testNotReadyForSendAfterAbortableError() {
        this.doInitTransactions();
        this.transactionManager.beginTransaction();
        this.transactionManager.transitionToAbortableError((RuntimeException)((Object)new KafkaException()));
        Assertions.assertThrows(KafkaException.class, () -> this.transactionManager.maybeAddPartition(this.tp0));
    }

    @Test
    public void testNotReadyForSendAfterFatalError() {
        this.doInitTransactions();
        this.transactionManager.transitionToFatalError((RuntimeException)((Object)new KafkaException()));
        Assertions.assertThrows(KafkaException.class, () -> this.transactionManager.maybeAddPartition(this.tp0));
    }

    @Test
    public void testIsSendToPartitionAllowedWithPendingPartitionAfterAbortableError() {
        this.doInitTransactions();
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(this.tp0);
        this.transactionManager.transitionToAbortableError((RuntimeException)((Object)new KafkaException()));
        Assertions.assertFalse((boolean)this.transactionManager.isSendToPartitionAllowed(this.tp0));
        Assertions.assertTrue((boolean)this.transactionManager.hasAbortableError());
    }

    @Test
    public void testIsSendToPartitionAllowedWithInFlightPartitionAddAfterAbortableError() {
        this.doInitTransactions();
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(this.tp0);
        this.runUntil(() -> ((TransactionManager)this.transactionManager).hasInFlightRequest());
        this.transactionManager.transitionToAbortableError((RuntimeException)((Object)new KafkaException()));
        Assertions.assertFalse((boolean)this.transactionManager.isSendToPartitionAllowed(this.tp0));
        Assertions.assertTrue((boolean)this.transactionManager.hasAbortableError());
    }

    @Test
    public void testIsSendToPartitionAllowedWithPendingPartitionAfterFatalError() {
        this.doInitTransactions();
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(this.tp0);
        this.transactionManager.transitionToFatalError((RuntimeException)((Object)new KafkaException()));
        Assertions.assertFalse((boolean)this.transactionManager.isSendToPartitionAllowed(this.tp0));
        Assertions.assertTrue((boolean)this.transactionManager.hasFatalError());
    }

    @Test
    public void testIsSendToPartitionAllowedWithInFlightPartitionAddAfterFatalError() {
        this.doInitTransactions();
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(this.tp0);
        this.runUntil(() -> ((TransactionManager)this.transactionManager).hasInFlightRequest());
        this.transactionManager.transitionToFatalError((RuntimeException)((Object)new KafkaException()));
        Assertions.assertFalse((boolean)this.transactionManager.isSendToPartitionAllowed(this.tp0));
        Assertions.assertTrue((boolean)this.transactionManager.hasFatalError());
    }

    @Test
    public void testIsSendToPartitionAllowedWithAddedPartitionAfterAbortableError() {
        this.doInitTransactions();
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(this.tp0);
        this.prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short)1, 13131L);
        this.runUntil(() -> !this.transactionManager.hasPartitionsToAdd());
        this.transactionManager.transitionToAbortableError((RuntimeException)((Object)new KafkaException()));
        Assertions.assertTrue((boolean)this.transactionManager.isSendToPartitionAllowed(this.tp0));
        Assertions.assertTrue((boolean)this.transactionManager.hasAbortableError());
    }

    @Test
    public void testIsSendToPartitionAllowedWithAddedPartitionAfterFatalError() {
        this.doInitTransactions();
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(this.tp0);
        this.prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short)1, 13131L);
        this.runUntil(() -> !this.transactionManager.hasPartitionsToAdd());
        this.transactionManager.transitionToFatalError((RuntimeException)((Object)new KafkaException()));
        Assertions.assertFalse((boolean)this.transactionManager.isSendToPartitionAllowed(this.tp0));
        Assertions.assertTrue((boolean)this.transactionManager.hasFatalError());
    }

    @Test
    public void testIsSendToPartitionAllowedWithPartitionNotAdded() {
        this.doInitTransactions();
        this.transactionManager.beginTransaction();
        Assertions.assertFalse((boolean)this.transactionManager.isSendToPartitionAllowed(this.tp0));
    }

    @Test
    public void testDefaultSequenceNumber() {
        this.initializeTransactionManager(Optional.empty());
        Assertions.assertEquals((int)this.transactionManager.sequenceNumber(this.tp0), (int)0);
        this.transactionManager.incrementSequenceNumber(this.tp0, 3);
        Assertions.assertEquals((int)this.transactionManager.sequenceNumber(this.tp0), (int)3);
    }

    @Test
    public void testBumpEpochAndResetSequenceNumbersAfterUnknownProducerId() {
        this.initializeTransactionManager(Optional.empty());
        this.initializeIdempotentProducerId(13131L, (short)1);
        ProducerBatch b1 = this.writeIdempotentBatchWithValue(this.transactionManager, this.tp0, "1");
        ProducerBatch b2 = this.writeIdempotentBatchWithValue(this.transactionManager, this.tp0, "2");
        ProducerBatch b3 = this.writeIdempotentBatchWithValue(this.transactionManager, this.tp0, "3");
        ProducerBatch b4 = this.writeIdempotentBatchWithValue(this.transactionManager, this.tp0, "4");
        ProducerBatch b5 = this.writeIdempotentBatchWithValue(this.transactionManager, this.tp0, "5");
        Assertions.assertEquals((int)5, (int)this.transactionManager.sequenceNumber(this.tp0));
        long b1AppendTime = this.time.milliseconds();
        ProduceResponse.PartitionResponse b1Response = new ProduceResponse.PartitionResponse(Errors.NONE, 500L, b1AppendTime, 0L);
        b1.complete(500L, b1AppendTime);
        this.transactionManager.handleCompletedBatch(b1, b1Response);
        ProduceResponse.PartitionResponse b2Response = new ProduceResponse.PartitionResponse(Errors.UNKNOWN_PRODUCER_ID, -1L, -1L, 500L);
        Assertions.assertTrue((boolean)this.transactionManager.canRetry(b2Response, b2));
        this.runUntil(() -> this.transactionManager.producerIdAndEpoch().epoch == 2);
        Assertions.assertEquals((int)2, (int)b2.producerEpoch());
        Assertions.assertEquals((int)0, (int)b2.baseSequence());
        Assertions.assertEquals((int)1, (int)b3.baseSequence());
        Assertions.assertEquals((int)2, (int)b4.baseSequence());
        Assertions.assertEquals((int)3, (int)b5.baseSequence());
    }

    @Test
    public void testBatchFailureAfterProducerReset() {
        int epoch = Short.MAX_VALUE;
        this.initializeTransactionManager(Optional.empty());
        this.initializeIdempotentProducerId(13131L, (short)Short.MAX_VALUE);
        ProducerBatch tp0b1 = this.writeIdempotentBatchWithValue(this.transactionManager, this.tp0, "1");
        ProducerBatch tp1b1 = this.writeIdempotentBatchWithValue(this.transactionManager, this.tp1, "1");
        ProduceResponse.PartitionResponse tp0b1Response = new ProduceResponse.PartitionResponse(Errors.NONE, -1L, -1L, 400L);
        this.transactionManager.handleCompletedBatch(tp0b1, tp0b1Response);
        ProduceResponse.PartitionResponse tp1b1Response = new ProduceResponse.PartitionResponse(Errors.NONE, -1L, -1L, 400L);
        this.transactionManager.handleCompletedBatch(tp1b1, tp1b1Response);
        ProducerBatch tp0b2 = this.writeIdempotentBatchWithValue(this.transactionManager, this.tp0, "2");
        ProducerBatch tp1b2 = this.writeIdempotentBatchWithValue(this.transactionManager, this.tp1, "2");
        Assertions.assertEquals((int)2, (int)this.transactionManager.sequenceNumber(this.tp0));
        Assertions.assertEquals((int)2, (int)this.transactionManager.sequenceNumber(this.tp1));
        ProduceResponse.PartitionResponse b1Response = new ProduceResponse.PartitionResponse(Errors.UNKNOWN_PRODUCER_ID, -1L, -1L, 400L);
        Assertions.assertTrue((boolean)this.transactionManager.canRetry(b1Response, tp0b1));
        ProduceResponse.PartitionResponse b2Response = new ProduceResponse.PartitionResponse(Errors.NONE, -1L, -1L, 400L);
        this.transactionManager.handleCompletedBatch(tp1b1, b2Response);
        this.transactionManager.bumpIdempotentEpochAndResetIdIfNeeded();
        Assertions.assertEquals((int)1, (int)this.transactionManager.sequenceNumber(this.tp0));
        Assertions.assertEquals((Object)tp0b2, (Object)this.transactionManager.nextBatchBySequence(this.tp0));
        Assertions.assertEquals((int)2, (int)this.transactionManager.sequenceNumber(this.tp1));
        Assertions.assertEquals((Object)tp1b2, (Object)this.transactionManager.nextBatchBySequence(this.tp1));
    }

    @Test
    public void testBatchCompletedAfterProducerReset() {
        int epoch = Short.MAX_VALUE;
        this.initializeTransactionManager(Optional.empty());
        this.initializeIdempotentProducerId(13131L, (short)Short.MAX_VALUE);
        ProducerBatch b1 = this.writeIdempotentBatchWithValue(this.transactionManager, this.tp0, "1");
        this.writeIdempotentBatchWithValue(this.transactionManager, this.tp1, "1");
        ProducerBatch b2 = this.writeIdempotentBatchWithValue(this.transactionManager, this.tp0, "2");
        Assertions.assertEquals((int)2, (int)this.transactionManager.sequenceNumber(this.tp0));
        this.transactionManager.requestEpochBumpForPartition(this.tp1);
        this.transactionManager.bumpIdempotentEpochAndResetIdIfNeeded();
        this.initializeIdempotentProducerId(13132L, (short)0);
        ProduceResponse.PartitionResponse b1Response = new ProduceResponse.PartitionResponse(Errors.NONE, 500L, this.time.milliseconds(), 0L);
        this.transactionManager.handleCompletedBatch(b1, b1Response);
        Assertions.assertEquals((int)2, (int)this.transactionManager.sequenceNumber(this.tp0));
        Assertions.assertEquals((int)0, (int)this.transactionManager.lastAckedSequence(this.tp0).getAsInt());
        Assertions.assertEquals((Object)b2, (Object)this.transactionManager.nextBatchBySequence(this.tp0));
        Assertions.assertEquals((short)Short.MAX_VALUE, (short)this.transactionManager.nextBatchBySequence(this.tp0).producerEpoch());
        ProduceResponse.PartitionResponse b2Response = new ProduceResponse.PartitionResponse(Errors.NONE, 500L, this.time.milliseconds(), 0L);
        this.transactionManager.handleCompletedBatch(b2, b2Response);
        this.transactionManager.maybeUpdateProducerIdAndEpoch(this.tp0);
        Assertions.assertEquals((int)0, (int)this.transactionManager.sequenceNumber(this.tp0));
        Assertions.assertFalse((boolean)this.transactionManager.lastAckedSequence(this.tp0).isPresent());
        Assertions.assertNull((Object)this.transactionManager.nextBatchBySequence(this.tp0));
    }

    @Test
    public void testDuplicateSequenceAfterProducerReset() throws Exception {
        this.initializeTransactionManager(Optional.empty());
        this.initializeIdempotentProducerId(13131L, (short)1);
        Metrics metrics = new Metrics((Time)this.time);
        int requestTimeout = 10000;
        int deliveryTimeout = 15000;
        RecordAccumulator accumulator = new RecordAccumulator(this.logContext, 16384, CompressionType.NONE, 0, 0L, 15000, metrics, "", (Time)this.time, this.apiVersions, this.transactionManager, new BufferPool(0x100000L, 16384, metrics, (Time)this.time, ""));
        Sender sender = new Sender(this.logContext, (KafkaClient)this.client, this.metadata, accumulator, false, 0x100000, -1, Integer.MAX_VALUE, new SenderMetricsRegistry(metrics), (Time)this.time, 10000, 0L, this.transactionManager, this.apiVersions);
        Assertions.assertEquals((int)0, (int)this.transactionManager.sequenceNumber(this.tp0));
        FutureRecordMetadata responseFuture1 = accumulator.append((String)this.tp0.topic(), (int)this.tp0.partition(), (long)this.time.milliseconds(), (byte[])"1".getBytes(), (byte[])"1".getBytes(), (Header[])Record.EMPTY_HEADERS, null, (long)1000L, (boolean)false, (long)this.time.milliseconds(), (Cluster)TestUtils.singletonCluster()).future;
        sender.runOnce();
        Assertions.assertEquals((int)1, (int)this.transactionManager.sequenceNumber(this.tp0));
        this.time.sleep(10000L);
        sender.runOnce();
        Assertions.assertEquals((int)0, (int)this.client.inFlightRequestCount());
        Assertions.assertTrue((boolean)this.transactionManager.hasInflightBatches(this.tp0));
        Assertions.assertEquals((int)1, (int)this.transactionManager.sequenceNumber(this.tp0));
        sender.runOnce();
        Assertions.assertEquals((int)1, (int)this.client.inFlightRequestCount());
        Assertions.assertTrue((boolean)this.transactionManager.hasInflightBatches(this.tp0));
        Assertions.assertEquals((int)1, (int)this.transactionManager.sequenceNumber(this.tp0));
        this.time.sleep(5000L);
        sender.runOnce();
        Assertions.assertTrue((boolean)responseFuture1.isDone());
        TestUtils.assertFutureThrows(responseFuture1, TimeoutException.class);
        Assertions.assertFalse((boolean)this.transactionManager.hasInFlightRequest());
        Assertions.assertEquals((int)1, (int)this.client.inFlightRequestCount());
        sender.runOnce();
        Assertions.assertEquals((int)2, (int)this.transactionManager.producerIdAndEpoch().epoch);
        Assertions.assertEquals((int)0, (int)this.transactionManager.sequenceNumber(this.tp0));
        FutureRecordMetadata responseFuture2 = accumulator.append((String)this.tp0.topic(), (int)this.tp0.partition(), (long)this.time.milliseconds(), (byte[])"2".getBytes(), (byte[])"2".getBytes(), (Header[])Record.EMPTY_HEADERS, null, (long)1000L, (boolean)false, (long)this.time.milliseconds(), (Cluster)TestUtils.singletonCluster()).future;
        sender.runOnce();
        sender.runOnce();
        Assertions.assertEquals((int)0, (int)this.transactionManager.firstInFlightSequence(this.tp0));
        Assertions.assertEquals((int)1, (int)this.transactionManager.sequenceNumber(this.tp0));
        this.time.sleep(5000L);
        sender.runOnce();
        Assertions.assertTrue((boolean)this.transactionManager.hasInflightBatches(this.tp0));
        Assertions.assertFalse((boolean)responseFuture2.isDone());
    }

    private ProducerBatch writeIdempotentBatchWithValue(TransactionManager manager, TopicPartition tp, String value) {
        manager.maybeUpdateProducerIdAndEpoch(tp);
        int seq = manager.sequenceNumber(tp);
        manager.incrementSequenceNumber(tp, 1);
        ProducerBatch batch = this.batchWithValue(tp, value);
        batch.setProducerState(manager.producerIdAndEpoch(), seq, false);
        manager.addInFlightBatch(batch);
        batch.close();
        return batch;
    }

    private ProducerBatch batchWithValue(TopicPartition tp, String value) {
        MemoryRecordsBuilder builder = MemoryRecords.builder((ByteBuffer)ByteBuffer.allocate(64), (CompressionType)CompressionType.NONE, (TimestampType)TimestampType.CREATE_TIME, (long)0L);
        long currentTimeMs = this.time.milliseconds();
        ProducerBatch batch = new ProducerBatch(tp, builder, currentTimeMs);
        batch.tryAppend(currentTimeMs, new byte[0], value.getBytes(), new Header[0], null, currentTimeMs);
        return batch;
    }

    @Test
    public void testSequenceNumberOverflow() {
        this.initializeTransactionManager(Optional.empty());
        Assertions.assertEquals((int)this.transactionManager.sequenceNumber(this.tp0), (int)0);
        this.transactionManager.incrementSequenceNumber(this.tp0, Integer.MAX_VALUE);
        Assertions.assertEquals((int)this.transactionManager.sequenceNumber(this.tp0), (int)Integer.MAX_VALUE);
        this.transactionManager.incrementSequenceNumber(this.tp0, 100);
        Assertions.assertEquals((int)this.transactionManager.sequenceNumber(this.tp0), (int)99);
        this.transactionManager.incrementSequenceNumber(this.tp0, Integer.MAX_VALUE);
        Assertions.assertEquals((int)this.transactionManager.sequenceNumber(this.tp0), (int)98);
    }

    @Test
    public void testProducerIdReset() {
        this.initializeTransactionManager(Optional.empty());
        this.initializeIdempotentProducerId(15L, (short)Short.MAX_VALUE);
        Assertions.assertEquals((int)this.transactionManager.sequenceNumber(this.tp0), (int)0);
        Assertions.assertEquals((int)this.transactionManager.sequenceNumber(this.tp1), (int)0);
        this.transactionManager.incrementSequenceNumber(this.tp0, 3);
        Assertions.assertEquals((int)this.transactionManager.sequenceNumber(this.tp0), (int)3);
        this.transactionManager.incrementSequenceNumber(this.tp1, 3);
        Assertions.assertEquals((int)this.transactionManager.sequenceNumber(this.tp1), (int)3);
        this.transactionManager.requestEpochBumpForPartition(this.tp0);
        this.transactionManager.bumpIdempotentEpochAndResetIdIfNeeded();
        Assertions.assertEquals((int)this.transactionManager.sequenceNumber(this.tp0), (int)0);
        Assertions.assertEquals((int)this.transactionManager.sequenceNumber(this.tp1), (int)3);
    }

    @Test
    public void testBasicTransaction() throws InterruptedException {
        this.doInitTransactions();
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(this.tp0);
        FutureRecordMetadata responseFuture = this.appendToAccumulator(this.tp0);
        Assertions.assertFalse((boolean)responseFuture.isDone());
        this.prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short)1, 13131L);
        this.prepareProduceResponse(Errors.NONE, 13131L, (short)1);
        Assertions.assertFalse((boolean)this.transactionManager.transactionContainsPartition(this.tp0));
        Assertions.assertFalse((boolean)this.transactionManager.isSendToPartitionAllowed(this.tp0));
        this.runUntil(() -> this.transactionManager.transactionContainsPartition(this.tp0));
        Assertions.assertTrue((boolean)this.transactionManager.isSendToPartitionAllowed(this.tp0));
        Assertions.assertFalse((boolean)responseFuture.isDone());
        this.runUntil(((Future)responseFuture)::isDone);
        HashMap<TopicPartition, OffsetAndMetadata> offsets = new HashMap<TopicPartition, OffsetAndMetadata>();
        offsets.put(this.tp1, new OffsetAndMetadata(1L));
        TransactionalRequestResult addOffsetsResult = this.transactionManager.sendOffsetsToTransaction(offsets, new ConsumerGroupMetadata("myConsumerGroup"));
        Assertions.assertFalse((boolean)this.transactionManager.hasPendingOffsetCommits());
        this.prepareAddOffsetsToTxnResponse(Errors.NONE, "myConsumerGroup", 13131L, (short)1);
        this.runUntil(() -> ((TransactionManager)this.transactionManager).hasPendingOffsetCommits());
        Assertions.assertFalse((boolean)addOffsetsResult.isCompleted());
        HashMap<TopicPartition, Errors> txnOffsetCommitResponse = new HashMap<TopicPartition, Errors>();
        txnOffsetCommitResponse.put(this.tp1, Errors.NONE);
        this.prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.GROUP, "myConsumerGroup");
        this.prepareTxnOffsetCommitResponse("myConsumerGroup", 13131L, (short)1, txnOffsetCommitResponse);
        Assertions.assertNull((Object)this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.GROUP));
        this.runUntil(() -> this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.GROUP) != null);
        Assertions.assertTrue((boolean)this.transactionManager.hasPendingOffsetCommits());
        this.runUntil(() -> !this.transactionManager.hasPendingOffsetCommits());
        Assertions.assertTrue((boolean)addOffsetsResult.isCompleted());
        this.transactionManager.beginCommit();
        this.prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, 13131L, (short)1);
        this.runUntil(() -> !this.transactionManager.hasOngoingTransaction());
        Assertions.assertFalse((boolean)this.transactionManager.isCompleting());
        Assertions.assertFalse((boolean)this.transactionManager.transactionContainsPartition(this.tp0));
    }

    @Test
    public void testDisconnectAndRetry() {
        this.transactionManager.initializeTransactions();
        this.prepareFindCoordinatorResponse(Errors.NONE, true, FindCoordinatorRequest.CoordinatorType.TRANSACTION, "foobar");
        this.runUntil(() -> this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION) == null);
        this.prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, "foobar");
        this.runUntil(() -> this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION) != null);
        Assertions.assertEquals((Object)this.brokerNode, (Object)this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
    }

    @Test
    public void testInitializeTransactionsTwiceRaisesError() {
        this.doInitTransactions(13131L, (short)1);
        Assertions.assertTrue((boolean)this.transactionManager.hasProducerId());
        Assertions.assertThrows(IllegalStateException.class, () -> this.transactionManager.initializeTransactions());
    }

    @Test
    public void testUnsupportedFindCoordinator() {
        this.transactionManager.initializeTransactions();
        this.client.prepareUnsupportedVersionResponse(body -> {
            FindCoordinatorRequest findCoordinatorRequest = (FindCoordinatorRequest)body;
            Assertions.assertEquals((Object)FindCoordinatorRequest.CoordinatorType.forId((byte)findCoordinatorRequest.data().keyType()), (Object)FindCoordinatorRequest.CoordinatorType.TRANSACTION);
            Assertions.assertEquals((Object)findCoordinatorRequest.data().key(), (Object)"foobar");
            return true;
        });
        this.runUntil(() -> ((TransactionManager)this.transactionManager).hasFatalError());
        Assertions.assertTrue((boolean)this.transactionManager.hasFatalError());
        Assertions.assertTrue((boolean)(this.transactionManager.lastError() instanceof UnsupportedVersionException));
    }

    @Test
    public void testUnsupportedInitTransactions() {
        this.transactionManager.initializeTransactions();
        this.prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, "foobar");
        this.runUntil(() -> this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION) != null);
        Assertions.assertFalse((boolean)this.transactionManager.hasError());
        this.client.prepareUnsupportedVersionResponse(body -> {
            InitProducerIdRequest initProducerIdRequest = (InitProducerIdRequest)body;
            Assertions.assertEquals((Object)initProducerIdRequest.data().transactionalId(), (Object)"foobar");
            Assertions.assertEquals((int)initProducerIdRequest.data().transactionTimeoutMs(), (int)1121);
            return true;
        });
        this.runUntil(() -> ((TransactionManager)this.transactionManager).hasFatalError());
        Assertions.assertTrue((boolean)this.transactionManager.hasFatalError());
        Assertions.assertTrue((boolean)(this.transactionManager.lastError() instanceof UnsupportedVersionException));
    }

    @Test
    public void testUnsupportedForMessageFormatInTxnOffsetCommit() {
        TopicPartition tp = new TopicPartition("foo", 0);
        this.doInitTransactions();
        this.transactionManager.beginTransaction();
        TransactionalRequestResult sendOffsetsResult = this.transactionManager.sendOffsetsToTransaction(Collections.singletonMap(tp, new OffsetAndMetadata(39L)), new ConsumerGroupMetadata("myConsumerGroup"));
        this.prepareAddOffsetsToTxnResponse(Errors.NONE, "myConsumerGroup", 13131L, (short)1);
        this.prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.GROUP, "myConsumerGroup");
        this.prepareTxnOffsetCommitResponse("myConsumerGroup", 13131L, (short)1, Collections.singletonMap(tp, Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT));
        this.runUntil(() -> ((TransactionManager)this.transactionManager).hasError());
        Assertions.assertTrue((boolean)(this.transactionManager.lastError() instanceof UnsupportedForMessageFormatException));
        Assertions.assertTrue((boolean)sendOffsetsResult.isCompleted());
        Assertions.assertFalse((boolean)sendOffsetsResult.isSuccessful());
        Assertions.assertTrue((boolean)(sendOffsetsResult.error() instanceof UnsupportedForMessageFormatException));
        this.assertFatalError(UnsupportedForMessageFormatException.class);
    }

    @Test
    public void testFencedInstanceIdInTxnOffsetCommitByGroupMetadata() {
        TopicPartition tp = new TopicPartition("foo", 0);
        String fencedMemberId = "fenced_member";
        this.doInitTransactions();
        this.transactionManager.beginTransaction();
        TransactionalRequestResult sendOffsetsResult = this.transactionManager.sendOffsetsToTransaction(Collections.singletonMap(tp, new OffsetAndMetadata(39L)), new ConsumerGroupMetadata("myConsumerGroup", 5, "fenced_member", Optional.of("instance")));
        this.prepareAddOffsetsToTxnResponse(Errors.NONE, "myConsumerGroup", 13131L, (short)1);
        this.prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.GROUP, "myConsumerGroup");
        this.runUntil(() -> this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.GROUP) != null);
        this.client.prepareResponse(request -> {
            TxnOffsetCommitRequest txnOffsetCommitRequest = (TxnOffsetCommitRequest)request;
            Assertions.assertEquals((Object)"myConsumerGroup", (Object)txnOffsetCommitRequest.data().groupId());
            Assertions.assertEquals((long)13131L, (long)txnOffsetCommitRequest.data().producerId());
            Assertions.assertEquals((short)1, (short)txnOffsetCommitRequest.data().producerEpoch());
            return txnOffsetCommitRequest.data().groupInstanceId().equals("instance") && !txnOffsetCommitRequest.data().memberId().equals("member");
        }, (AbstractResponse)new TxnOffsetCommitResponse(0, Collections.singletonMap(tp, Errors.FENCED_INSTANCE_ID)));
        this.runUntil(() -> ((TransactionManager)this.transactionManager).hasError());
        Assertions.assertTrue((boolean)(this.transactionManager.lastError() instanceof FencedInstanceIdException));
        Assertions.assertTrue((boolean)sendOffsetsResult.isCompleted());
        Assertions.assertFalse((boolean)sendOffsetsResult.isSuccessful());
        Assertions.assertTrue((boolean)(sendOffsetsResult.error() instanceof FencedInstanceIdException));
        this.assertAbortableError(FencedInstanceIdException.class);
    }

    @Test
    public void testUnknownMemberIdInTxnOffsetCommitByGroupMetadata() {
        TopicPartition tp = new TopicPartition("foo", 0);
        String unknownMemberId = "unknownMember";
        this.doInitTransactions();
        this.transactionManager.beginTransaction();
        TransactionalRequestResult sendOffsetsResult = this.transactionManager.sendOffsetsToTransaction(Collections.singletonMap(tp, new OffsetAndMetadata(39L)), new ConsumerGroupMetadata("myConsumerGroup", 5, "unknownMember", Optional.empty()));
        this.prepareAddOffsetsToTxnResponse(Errors.NONE, "myConsumerGroup", 13131L, (short)1);
        this.prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.GROUP, "myConsumerGroup");
        this.runUntil(() -> this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.GROUP) != null);
        this.client.prepareResponse(request -> {
            TxnOffsetCommitRequest txnOffsetCommitRequest = (TxnOffsetCommitRequest)request;
            Assertions.assertEquals((Object)"myConsumerGroup", (Object)txnOffsetCommitRequest.data().groupId());
            Assertions.assertEquals((long)13131L, (long)txnOffsetCommitRequest.data().producerId());
            Assertions.assertEquals((short)1, (short)txnOffsetCommitRequest.data().producerEpoch());
            return !txnOffsetCommitRequest.data().memberId().equals("member");
        }, (AbstractResponse)new TxnOffsetCommitResponse(0, Collections.singletonMap(tp, Errors.UNKNOWN_MEMBER_ID)));
        this.runUntil(() -> ((TransactionManager)this.transactionManager).hasError());
        Assertions.assertTrue((boolean)(this.transactionManager.lastError() instanceof CommitFailedException));
        Assertions.assertTrue((boolean)sendOffsetsResult.isCompleted());
        Assertions.assertFalse((boolean)sendOffsetsResult.isSuccessful());
        Assertions.assertTrue((boolean)(sendOffsetsResult.error() instanceof CommitFailedException));
        this.assertAbortableError(CommitFailedException.class);
    }

    @Test
    public void testIllegalGenerationInTxnOffsetCommitByGroupMetadata() {
        TopicPartition tp = new TopicPartition("foo", 0);
        boolean illegalGenerationId = true;
        this.doInitTransactions();
        this.transactionManager.beginTransaction();
        TransactionalRequestResult sendOffsetsResult = this.transactionManager.sendOffsetsToTransaction(Collections.singletonMap(tp, new OffsetAndMetadata(39L)), new ConsumerGroupMetadata("myConsumerGroup", 1, "", Optional.empty()));
        this.prepareAddOffsetsToTxnResponse(Errors.NONE, "myConsumerGroup", 13131L, (short)1);
        this.prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.GROUP, "myConsumerGroup");
        this.runUntil(() -> this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.GROUP) != null);
        this.prepareTxnOffsetCommitResponse("myConsumerGroup", 13131L, (short)1, Collections.singletonMap(tp, Errors.ILLEGAL_GENERATION));
        this.client.prepareResponse(request -> {
            TxnOffsetCommitRequest txnOffsetCommitRequest = (TxnOffsetCommitRequest)request;
            Assertions.assertEquals((Object)"myConsumerGroup", (Object)txnOffsetCommitRequest.data().groupId());
            Assertions.assertEquals((long)13131L, (long)txnOffsetCommitRequest.data().producerId());
            Assertions.assertEquals((short)1, (short)txnOffsetCommitRequest.data().producerEpoch());
            return txnOffsetCommitRequest.data().generationId() != 5;
        }, (AbstractResponse)new TxnOffsetCommitResponse(0, Collections.singletonMap(tp, Errors.ILLEGAL_GENERATION)));
        this.runUntil(() -> ((TransactionManager)this.transactionManager).hasError());
        Assertions.assertTrue((boolean)(this.transactionManager.lastError() instanceof CommitFailedException));
        Assertions.assertTrue((boolean)sendOffsetsResult.isCompleted());
        Assertions.assertFalse((boolean)sendOffsetsResult.isSuccessful());
        Assertions.assertTrue((boolean)(sendOffsetsResult.error() instanceof CommitFailedException));
        this.assertAbortableError(CommitFailedException.class);
    }

    @Test
    public void testLookupCoordinatorOnDisconnectAfterSend() {
        TransactionalRequestResult initPidResult = this.transactionManager.initializeTransactions();
        this.prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, "foobar");
        this.runUntil(() -> this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION) != null);
        Assertions.assertEquals((Object)this.brokerNode, (Object)this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
        this.prepareInitPidResponse(Errors.NONE, true, 13131L, (short)1);
        this.runUntil(() -> this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION) == null);
        Assertions.assertNull((Object)this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
        Assertions.assertFalse((boolean)initPidResult.isCompleted());
        Assertions.assertFalse((boolean)this.transactionManager.hasProducerId());
        this.prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, "foobar");
        this.runUntil(() -> this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION) != null);
        Assertions.assertEquals((Object)this.brokerNode, (Object)this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
        Assertions.assertFalse((boolean)initPidResult.isCompleted());
        this.prepareInitPidResponse(Errors.NONE, false, 13131L, (short)1);
        this.runUntil(() -> ((TransactionalRequestResult)initPidResult).isCompleted());
        Assertions.assertTrue((boolean)initPidResult.isCompleted());
        Assertions.assertTrue((boolean)this.transactionManager.hasProducerId());
        Assertions.assertEquals((long)13131L, (long)this.transactionManager.producerIdAndEpoch().producerId);
        Assertions.assertEquals((short)1, (short)this.transactionManager.producerIdAndEpoch().epoch);
    }

    @Test
    public void testLookupCoordinatorOnDisconnectBeforeSend() {
        TransactionalRequestResult initPidResult = this.transactionManager.initializeTransactions();
        this.prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, "foobar");
        this.runUntil(() -> this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION) != null);
        Assertions.assertEquals((Object)this.brokerNode, (Object)this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
        this.client.disconnect(this.brokerNode.idString());
        this.client.backoff(this.brokerNode, 100L);
        this.runUntil(() -> this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION) == null);
        this.time.sleep(110L);
        Assertions.assertFalse((boolean)initPidResult.isCompleted());
        Assertions.assertFalse((boolean)this.transactionManager.hasProducerId());
        this.prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, "foobar");
        this.runUntil(() -> this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION) != null);
        Assertions.assertEquals((Object)this.brokerNode, (Object)this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
        Assertions.assertFalse((boolean)initPidResult.isCompleted());
        this.prepareInitPidResponse(Errors.NONE, false, 13131L, (short)1);
        this.runUntil(() -> ((TransactionalRequestResult)initPidResult).isCompleted());
        Assertions.assertTrue((boolean)this.transactionManager.hasProducerId());
        Assertions.assertEquals((long)13131L, (long)this.transactionManager.producerIdAndEpoch().producerId);
        Assertions.assertEquals((short)1, (short)this.transactionManager.producerIdAndEpoch().epoch);
    }

    @Test
    public void testLookupCoordinatorOnNotCoordinatorError() {
        TransactionalRequestResult initPidResult = this.transactionManager.initializeTransactions();
        this.prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, "foobar");
        this.runUntil(() -> this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION) != null);
        Assertions.assertEquals((Object)this.brokerNode, (Object)this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
        this.prepareInitPidResponse(Errors.NOT_COORDINATOR, false, 13131L, (short)1);
        this.runUntil(() -> this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION) == null);
        Assertions.assertFalse((boolean)initPidResult.isCompleted());
        Assertions.assertFalse((boolean)this.transactionManager.hasProducerId());
        this.prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, "foobar");
        this.runUntil(() -> this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION) != null);
        Assertions.assertEquals((Object)this.brokerNode, (Object)this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
        Assertions.assertFalse((boolean)initPidResult.isCompleted());
        this.prepareInitPidResponse(Errors.NONE, false, 13131L, (short)1);
        this.runUntil(() -> ((TransactionalRequestResult)initPidResult).isCompleted());
        Assertions.assertTrue((boolean)this.transactionManager.hasProducerId());
        Assertions.assertEquals((long)13131L, (long)this.transactionManager.producerIdAndEpoch().producerId);
        Assertions.assertEquals((short)1, (short)this.transactionManager.producerIdAndEpoch().epoch);
    }

    @Test
    public void testTransactionalIdAuthorizationFailureInFindCoordinator() {
        TransactionalRequestResult initPidResult = this.transactionManager.initializeTransactions();
        this.prepareFindCoordinatorResponse(Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, "foobar");
        this.runUntil(() -> ((TransactionManager)this.transactionManager).hasError());
        Assertions.assertTrue((boolean)this.transactionManager.hasFatalError());
        Assertions.assertTrue((boolean)(this.transactionManager.lastError() instanceof TransactionalIdAuthorizationException));
        Assertions.assertFalse((boolean)initPidResult.isSuccessful());
        Assertions.assertThrows(TransactionalIdAuthorizationException.class, () -> ((TransactionalRequestResult)initPidResult).await());
        this.assertFatalError(TransactionalIdAuthorizationException.class);
    }

    @Test
    public void testTransactionalIdAuthorizationFailureInInitProducerId() {
        TransactionalRequestResult initPidResult = this.transactionManager.initializeTransactions();
        this.prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, "foobar");
        this.runUntil(() -> this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION) != null);
        Assertions.assertEquals((Object)this.brokerNode, (Object)this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
        this.prepareInitPidResponse(Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED, false, 13131L, (short)-1);
        this.runUntil(() -> ((TransactionManager)this.transactionManager).hasError());
        Assertions.assertTrue((boolean)initPidResult.isCompleted());
        Assertions.assertFalse((boolean)initPidResult.isSuccessful());
        Assertions.assertThrows(TransactionalIdAuthorizationException.class, () -> ((TransactionalRequestResult)initPidResult).await());
        this.assertFatalError(TransactionalIdAuthorizationException.class);
    }

    @Test
    public void testGroupAuthorizationFailureInFindCoordinator() {
        this.doInitTransactions();
        this.transactionManager.beginTransaction();
        TransactionalRequestResult sendOffsetsResult = this.transactionManager.sendOffsetsToTransaction(Collections.singletonMap(new TopicPartition("foo", 0), new OffsetAndMetadata(39L)), new ConsumerGroupMetadata("myConsumerGroup"));
        this.prepareAddOffsetsToTxnResponse(Errors.NONE, "myConsumerGroup", 13131L, (short)1);
        this.runUntil(() -> !this.transactionManager.hasPartitionsToAdd());
        this.prepareFindCoordinatorResponse(Errors.GROUP_AUTHORIZATION_FAILED, false, FindCoordinatorRequest.CoordinatorType.GROUP, "myConsumerGroup");
        this.runUntil(() -> ((TransactionManager)this.transactionManager).hasError());
        Assertions.assertTrue((boolean)(this.transactionManager.lastError() instanceof GroupAuthorizationException));
        this.runUntil(() -> ((TransactionalRequestResult)sendOffsetsResult).isCompleted());
        Assertions.assertFalse((boolean)sendOffsetsResult.isSuccessful());
        Assertions.assertTrue((boolean)(sendOffsetsResult.error() instanceof GroupAuthorizationException));
        GroupAuthorizationException exception = (GroupAuthorizationException)((Object)sendOffsetsResult.error());
        Assertions.assertEquals((Object)"myConsumerGroup", (Object)exception.groupId());
        this.assertAbortableError(GroupAuthorizationException.class);
    }

    @Test
    public void testGroupAuthorizationFailureInTxnOffsetCommit() {
        TopicPartition tp1 = new TopicPartition("foo", 0);
        this.doInitTransactions();
        this.transactionManager.beginTransaction();
        TransactionalRequestResult sendOffsetsResult = this.transactionManager.sendOffsetsToTransaction(Collections.singletonMap(tp1, new OffsetAndMetadata(39L)), new ConsumerGroupMetadata("myConsumerGroup"));
        this.prepareAddOffsetsToTxnResponse(Errors.NONE, "myConsumerGroup", 13131L, (short)1);
        this.runUntil(() -> !this.transactionManager.hasPartitionsToAdd());
        this.prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.GROUP, "myConsumerGroup");
        this.prepareTxnOffsetCommitResponse("myConsumerGroup", 13131L, (short)1, Collections.singletonMap(tp1, Errors.GROUP_AUTHORIZATION_FAILED));
        this.runUntil(() -> ((TransactionManager)this.transactionManager).hasError());
        Assertions.assertTrue((boolean)(this.transactionManager.lastError() instanceof GroupAuthorizationException));
        Assertions.assertTrue((boolean)sendOffsetsResult.isCompleted());
        Assertions.assertFalse((boolean)sendOffsetsResult.isSuccessful());
        Assertions.assertTrue((boolean)(sendOffsetsResult.error() instanceof GroupAuthorizationException));
        Assertions.assertFalse((boolean)this.transactionManager.hasPendingOffsetCommits());
        GroupAuthorizationException exception = (GroupAuthorizationException)((Object)sendOffsetsResult.error());
        Assertions.assertEquals((Object)"myConsumerGroup", (Object)exception.groupId());
        this.assertAbortableError(GroupAuthorizationException.class);
    }

    @Test
    public void testTransactionalIdAuthorizationFailureInAddOffsetsToTxn() {
        TopicPartition tp = new TopicPartition("foo", 0);
        this.doInitTransactions();
        this.transactionManager.beginTransaction();
        TransactionalRequestResult sendOffsetsResult = this.transactionManager.sendOffsetsToTransaction(Collections.singletonMap(tp, new OffsetAndMetadata(39L)), new ConsumerGroupMetadata("myConsumerGroup"));
        this.prepareAddOffsetsToTxnResponse(Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED, "myConsumerGroup", 13131L, (short)1);
        this.runUntil(() -> ((TransactionManager)this.transactionManager).hasError());
        Assertions.assertTrue((boolean)(this.transactionManager.lastError() instanceof TransactionalIdAuthorizationException));
        Assertions.assertTrue((boolean)sendOffsetsResult.isCompleted());
        Assertions.assertFalse((boolean)sendOffsetsResult.isSuccessful());
        Assertions.assertTrue((boolean)(sendOffsetsResult.error() instanceof TransactionalIdAuthorizationException));
        this.assertFatalError(TransactionalIdAuthorizationException.class);
    }

    @Test
    public void testTransactionalIdAuthorizationFailureInTxnOffsetCommit() {
        TopicPartition tp = new TopicPartition("foo", 0);
        this.doInitTransactions();
        this.transactionManager.beginTransaction();
        TransactionalRequestResult sendOffsetsResult = this.transactionManager.sendOffsetsToTransaction(Collections.singletonMap(tp, new OffsetAndMetadata(39L)), new ConsumerGroupMetadata("myConsumerGroup"));
        this.prepareAddOffsetsToTxnResponse(Errors.NONE, "myConsumerGroup", 13131L, (short)1);
        this.runUntil(() -> !this.transactionManager.hasPartitionsToAdd());
        this.prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.GROUP, "myConsumerGroup");
        this.prepareTxnOffsetCommitResponse("myConsumerGroup", 13131L, (short)1, Collections.singletonMap(tp, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED));
        this.runUntil(() -> ((TransactionManager)this.transactionManager).hasError());
        Assertions.assertTrue((boolean)(this.transactionManager.lastError() instanceof TransactionalIdAuthorizationException));
        Assertions.assertTrue((boolean)sendOffsetsResult.isCompleted());
        Assertions.assertFalse((boolean)sendOffsetsResult.isSuccessful());
        Assertions.assertTrue((boolean)(sendOffsetsResult.error() instanceof TransactionalIdAuthorizationException));
        this.assertFatalError(TransactionalIdAuthorizationException.class);
    }

    @Test
    public void testTopicAuthorizationFailureInAddPartitions() throws InterruptedException {
        TopicPartition tp0 = new TopicPartition("foo", 0);
        TopicPartition tp1 = new TopicPartition("bar", 0);
        this.doInitTransactions();
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(tp0);
        this.transactionManager.maybeAddPartition(tp1);
        FutureRecordMetadata firstPartitionAppend = this.appendToAccumulator(tp0);
        FutureRecordMetadata secondPartitionAppend = this.appendToAccumulator(tp1);
        HashMap<TopicPartition, Errors> errors = new HashMap<TopicPartition, Errors>();
        errors.put(tp0, Errors.TOPIC_AUTHORIZATION_FAILED);
        errors.put(tp1, Errors.OPERATION_NOT_ATTEMPTED);
        this.prepareAddPartitionsToTxn(errors);
        this.runUntil(() -> ((TransactionManager)this.transactionManager).hasError());
        Assertions.assertTrue((boolean)(this.transactionManager.lastError() instanceof TopicAuthorizationException));
        Assertions.assertFalse((boolean)this.transactionManager.isPartitionPendingAdd(tp0));
        Assertions.assertFalse((boolean)this.transactionManager.isPartitionPendingAdd(tp1));
        Assertions.assertFalse((boolean)this.transactionManager.isPartitionAdded(tp0));
        Assertions.assertFalse((boolean)this.transactionManager.isPartitionAdded(tp1));
        Assertions.assertFalse((boolean)this.transactionManager.hasPartitionsToAdd());
        TopicAuthorizationException exception = (TopicAuthorizationException)((Object)this.transactionManager.lastError());
        Assertions.assertEquals(Collections.singleton(tp0.topic()), (Object)exception.unauthorizedTopics());
        this.assertAbortableError(TopicAuthorizationException.class);
        this.sender.runOnce();
        TestUtils.assertFutureThrows(firstPartitionAppend, KafkaException.class);
        TestUtils.assertFutureThrows(secondPartitionAppend, KafkaException.class);
    }

    @Test
    public void testCommitWithTopicAuthorizationFailureInAddPartitionsInFlight() throws InterruptedException {
        TopicPartition tp0 = new TopicPartition("foo", 0);
        TopicPartition tp1 = new TopicPartition("bar", 0);
        this.doInitTransactions();
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(tp0);
        this.transactionManager.maybeAddPartition(tp1);
        FutureRecordMetadata firstPartitionAppend = this.appendToAccumulator(tp0);
        FutureRecordMetadata secondPartitionAppend = this.appendToAccumulator(tp1);
        TransactionalRequestResult commitResult = this.transactionManager.beginCommit();
        this.sender.runOnce();
        Assertions.assertFalse((boolean)this.transactionManager.hasError());
        Assertions.assertFalse((boolean)commitResult.isCompleted());
        Assertions.assertFalse((boolean)firstPartitionAppend.isDone());
        HashMap<TopicPartition, Errors> errors = new HashMap<TopicPartition, Errors>();
        errors.put(tp0, Errors.TOPIC_AUTHORIZATION_FAILED);
        errors.put(tp1, Errors.OPERATION_NOT_ATTEMPTED);
        this.client.respond(body -> {
            AddPartitionsToTxnRequest request = (AddPartitionsToTxnRequest)body;
            Assertions.assertEquals(new HashSet(request.partitions()), new HashSet(errors.keySet()));
            return true;
        }, (AbstractResponse)new AddPartitionsToTxnResponse(0, errors));
        this.sender.runOnce();
        Assertions.assertTrue((boolean)this.transactionManager.hasError());
        Assertions.assertFalse((boolean)commitResult.isCompleted());
        Assertions.assertFalse((boolean)firstPartitionAppend.isDone());
        Assertions.assertFalse((boolean)secondPartitionAppend.isDone());
        this.sender.runOnce();
        Assertions.assertTrue((boolean)commitResult.isCompleted());
        TestUtils.assertFutureThrows(firstPartitionAppend, KafkaException.class);
        TestUtils.assertFutureThrows(secondPartitionAppend, KafkaException.class);
        Assertions.assertTrue((boolean)(commitResult.error() instanceof TopicAuthorizationException));
    }

    @Test
    public void testRecoveryFromAbortableErrorTransactionNotStarted() throws Exception {
        TopicPartition unauthorizedPartition = new TopicPartition("foo", 0);
        this.doInitTransactions();
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(unauthorizedPartition);
        FutureRecordMetadata responseFuture = this.appendToAccumulator(unauthorizedPartition);
        this.prepareAddPartitionsToTxn(Collections.singletonMap(unauthorizedPartition, Errors.TOPIC_AUTHORIZATION_FAILED));
        this.runUntil(() -> !this.client.hasPendingResponses());
        Assertions.assertTrue((boolean)this.transactionManager.hasAbortableError());
        TransactionalRequestResult abortResult = this.transactionManager.beginAbort();
        this.runUntil(((Future)responseFuture)::isDone);
        this.assertProduceFutureFailed((Future<RecordMetadata>)responseFuture);
        this.runUntil(() -> ((TransactionManager)this.transactionManager).isReady());
        Assertions.assertFalse((boolean)this.transactionManager.hasPartitionsToAdd());
        Assertions.assertFalse((boolean)this.accumulator.hasIncomplete());
        Assertions.assertTrue((boolean)abortResult.isSuccessful());
        abortResult.await();
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(this.tp0);
        responseFuture = this.appendToAccumulator(this.tp0);
        this.prepareAddPartitionsToTxn(Collections.singletonMap(this.tp0, Errors.NONE));
        this.runUntil(() -> this.transactionManager.isPartitionAdded(this.tp0));
        Assertions.assertFalse((boolean)this.transactionManager.hasPartitionsToAdd());
        this.transactionManager.beginCommit();
        this.prepareProduceResponse(Errors.NONE, 13131L, (short)1);
        this.runUntil(((Future)responseFuture)::isDone);
        Assertions.assertNotNull(responseFuture.get());
        this.prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, 13131L, (short)1);
        this.runUntil(() -> ((TransactionManager)this.transactionManager).isReady());
    }

    @Test
    public void testRetryAbortTransactionAfterTimeout() throws Exception {
        this.doInitTransactions();
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(this.tp0);
        this.prepareAddPartitionsToTxn(this.tp0, Errors.NONE);
        this.appendToAccumulator(this.tp0);
        this.runUntil(() -> this.transactionManager.isPartitionAdded(this.tp0));
        TransactionalRequestResult result = this.transactionManager.beginAbort();
        Assertions.assertThrows(TimeoutException.class, () -> result.await(0L, TimeUnit.MILLISECONDS));
        this.prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, 13131L, (short)1);
        this.runUntil(() -> ((TransactionManager)this.transactionManager).isReady());
        Assertions.assertTrue((boolean)result.isSuccessful());
        Assertions.assertFalse((boolean)result.isAcked());
        Assertions.assertFalse((boolean)this.transactionManager.hasOngoingTransaction());
        Assertions.assertThrows(IllegalStateException.class, () -> ((TransactionManager)this.transactionManager).initializeTransactions());
        Assertions.assertThrows(IllegalStateException.class, () -> ((TransactionManager)this.transactionManager).beginTransaction());
        Assertions.assertThrows(IllegalStateException.class, () -> ((TransactionManager)this.transactionManager).beginCommit());
        Assertions.assertThrows(IllegalStateException.class, () -> this.transactionManager.maybeAddPartition(this.tp0));
        Assertions.assertSame((Object)result, (Object)this.transactionManager.beginAbort());
        result.await();
        this.transactionManager.beginTransaction();
        Assertions.assertTrue((boolean)this.transactionManager.hasOngoingTransaction());
    }

    @Test
    public void testRetryCommitTransactionAfterTimeout() throws Exception {
        this.doInitTransactions();
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(this.tp0);
        this.prepareAddPartitionsToTxn(this.tp0, Errors.NONE);
        this.prepareProduceResponse(Errors.NONE, 13131L, (short)1);
        this.appendToAccumulator(this.tp0);
        this.runUntil(() -> this.transactionManager.isPartitionAdded(this.tp0));
        TransactionalRequestResult result = this.transactionManager.beginCommit();
        Assertions.assertThrows(TimeoutException.class, () -> result.await(0L, TimeUnit.MILLISECONDS));
        this.prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, 13131L, (short)1);
        this.runUntil(() -> ((TransactionManager)this.transactionManager).isReady());
        Assertions.assertTrue((boolean)result.isSuccessful());
        Assertions.assertFalse((boolean)result.isAcked());
        Assertions.assertFalse((boolean)this.transactionManager.hasOngoingTransaction());
        Assertions.assertThrows(IllegalStateException.class, () -> ((TransactionManager)this.transactionManager).initializeTransactions());
        Assertions.assertThrows(IllegalStateException.class, () -> ((TransactionManager)this.transactionManager).beginTransaction());
        Assertions.assertThrows(IllegalStateException.class, () -> ((TransactionManager)this.transactionManager).beginAbort());
        Assertions.assertThrows(IllegalStateException.class, () -> this.transactionManager.maybeAddPartition(this.tp0));
        Assertions.assertSame((Object)result, (Object)this.transactionManager.beginCommit());
        result.await();
        this.transactionManager.beginTransaction();
        Assertions.assertTrue((boolean)this.transactionManager.hasOngoingTransaction());
    }

    @Test
    public void testRetryInitTransactionsAfterTimeout() {
        TransactionalRequestResult result = this.transactionManager.initializeTransactions();
        this.prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, "foobar");
        this.runUntil(() -> this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION) != null);
        Assertions.assertEquals((Object)this.brokerNode, (Object)this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
        Assertions.assertThrows(TimeoutException.class, () -> result.await(0L, TimeUnit.MILLISECONDS));
        this.prepareInitPidResponse(Errors.NONE, false, 13131L, (short)1);
        this.runUntil(() -> ((TransactionManager)this.transactionManager).hasProducerId());
        Assertions.assertTrue((boolean)result.isSuccessful());
        Assertions.assertFalse((boolean)result.isAcked());
        Assertions.assertThrows(IllegalStateException.class, () -> ((TransactionManager)this.transactionManager).beginTransaction());
        Assertions.assertThrows(IllegalStateException.class, () -> ((TransactionManager)this.transactionManager).beginAbort());
        Assertions.assertThrows(IllegalStateException.class, () -> ((TransactionManager)this.transactionManager).beginCommit());
        Assertions.assertThrows(IllegalStateException.class, () -> this.transactionManager.maybeAddPartition(this.tp0));
        Assertions.assertSame((Object)result, (Object)this.transactionManager.initializeTransactions());
        result.await();
        Assertions.assertTrue((boolean)result.isAcked());
        Assertions.assertThrows(IllegalStateException.class, () -> ((TransactionManager)this.transactionManager).initializeTransactions());
        this.transactionManager.beginTransaction();
        Assertions.assertTrue((boolean)this.transactionManager.hasOngoingTransaction());
    }

    @Test
    public void testRecoveryFromAbortableErrorTransactionStarted() throws Exception {
        TopicPartition unauthorizedPartition = new TopicPartition("foo", 0);
        this.doInitTransactions();
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(this.tp0);
        this.prepareAddPartitionsToTxn(this.tp0, Errors.NONE);
        FutureRecordMetadata authorizedTopicProduceFuture = this.appendToAccumulator(unauthorizedPartition);
        this.runUntil(() -> this.transactionManager.isPartitionAdded(this.tp0));
        this.transactionManager.maybeAddPartition(unauthorizedPartition);
        FutureRecordMetadata unauthorizedTopicProduceFuture = this.appendToAccumulator(unauthorizedPartition);
        this.prepareAddPartitionsToTxn(Collections.singletonMap(unauthorizedPartition, Errors.TOPIC_AUTHORIZATION_FAILED));
        this.runUntil(() -> ((TransactionManager)this.transactionManager).hasAbortableError());
        Assertions.assertTrue((boolean)this.transactionManager.isPartitionAdded(this.tp0));
        Assertions.assertFalse((boolean)this.transactionManager.isPartitionAdded(unauthorizedPartition));
        Assertions.assertFalse((boolean)authorizedTopicProduceFuture.isDone());
        Assertions.assertFalse((boolean)unauthorizedTopicProduceFuture.isDone());
        this.prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, 13131L, (short)1);
        TransactionalRequestResult result = this.transactionManager.beginAbort();
        this.runUntil(() -> ((TransactionManager)this.transactionManager).isReady());
        this.assertProduceFutureFailed((Future<RecordMetadata>)authorizedTopicProduceFuture);
        this.assertProduceFutureFailed((Future<RecordMetadata>)unauthorizedTopicProduceFuture);
        Assertions.assertFalse((boolean)this.transactionManager.hasPartitionsToAdd());
        Assertions.assertFalse((boolean)this.accumulator.hasIncomplete());
        Assertions.assertTrue((boolean)result.isSuccessful());
        result.await();
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(this.tp0);
        FutureRecordMetadata nextTransactionFuture = this.appendToAccumulator(this.tp0);
        this.prepareAddPartitionsToTxn(Collections.singletonMap(this.tp0, Errors.NONE));
        this.runUntil(() -> this.transactionManager.isPartitionAdded(this.tp0));
        Assertions.assertFalse((boolean)this.transactionManager.hasPartitionsToAdd());
        this.transactionManager.beginCommit();
        this.prepareProduceResponse(Errors.NONE, 13131L, (short)1);
        this.runUntil(() -> ((FutureRecordMetadata)nextTransactionFuture).isDone());
        Assertions.assertNotNull((Object)nextTransactionFuture.get());
        this.prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, 13131L, (short)1);
        this.runUntil(() -> ((TransactionManager)this.transactionManager).isReady());
    }

    @Test
    public void testRecoveryFromAbortableErrorProduceRequestInRetry() throws Exception {
        TopicPartition unauthorizedPartition = new TopicPartition("foo", 0);
        this.doInitTransactions();
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(this.tp0);
        this.prepareAddPartitionsToTxn(this.tp0, Errors.NONE);
        FutureRecordMetadata authorizedTopicProduceFuture = this.appendToAccumulator(this.tp0);
        this.runUntil(() -> this.transactionManager.isPartitionAdded(this.tp0));
        this.accumulator.beginFlush();
        this.prepareProduceResponse(Errors.REQUEST_TIMED_OUT, 13131L, (short)1);
        this.runUntil(() -> !this.client.hasPendingResponses());
        Assertions.assertFalse((boolean)authorizedTopicProduceFuture.isDone());
        Assertions.assertTrue((boolean)this.accumulator.hasIncomplete());
        this.transactionManager.maybeAddPartition(unauthorizedPartition);
        FutureRecordMetadata unauthorizedTopicProduceFuture = this.appendToAccumulator(unauthorizedPartition);
        this.prepareAddPartitionsToTxn(Collections.singletonMap(unauthorizedPartition, Errors.TOPIC_AUTHORIZATION_FAILED));
        this.runUntil(() -> ((TransactionManager)this.transactionManager).hasAbortableError());
        Assertions.assertTrue((boolean)this.transactionManager.isPartitionAdded(this.tp0));
        Assertions.assertFalse((boolean)this.transactionManager.isPartitionAdded(unauthorizedPartition));
        Assertions.assertFalse((boolean)authorizedTopicProduceFuture.isDone());
        this.prepareProduceResponse(Errors.NONE, 13131L, (short)1);
        this.runUntil(((Future)authorizedTopicProduceFuture)::isDone);
        this.assertProduceFutureFailed((Future<RecordMetadata>)unauthorizedTopicProduceFuture);
        Assertions.assertNotNull(authorizedTopicProduceFuture.get());
        Assertions.assertTrue((boolean)authorizedTopicProduceFuture.isDone());
        this.prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, 13131L, (short)1);
        TransactionalRequestResult abortResult = this.transactionManager.beginAbort();
        this.runUntil(() -> ((TransactionManager)this.transactionManager).isReady());
        Assertions.assertTrue((boolean)this.transactionManager.isReady());
        Assertions.assertFalse((boolean)this.transactionManager.hasPartitionsToAdd());
        Assertions.assertFalse((boolean)this.accumulator.hasIncomplete());
        Assertions.assertTrue((boolean)abortResult.isSuccessful());
        abortResult.await();
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(this.tp0);
        FutureRecordMetadata nextTransactionFuture = this.appendToAccumulator(this.tp0);
        this.prepareAddPartitionsToTxn(Collections.singletonMap(this.tp0, Errors.NONE));
        this.runUntil(() -> this.transactionManager.isPartitionAdded(this.tp0));
        Assertions.assertFalse((boolean)this.transactionManager.hasPartitionsToAdd());
        this.transactionManager.beginCommit();
        this.prepareProduceResponse(Errors.NONE, 13131L, (short)1);
        this.runUntil(() -> ((FutureRecordMetadata)nextTransactionFuture).isDone());
        Assertions.assertNotNull((Object)nextTransactionFuture.get());
        this.prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, 13131L, (short)1);
        this.runUntil(() -> ((TransactionManager)this.transactionManager).isReady());
    }

    @Test
    public void testTransactionalIdAuthorizationFailureInAddPartitions() {
        TopicPartition tp = new TopicPartition("foo", 0);
        this.doInitTransactions();
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(tp);
        this.prepareAddPartitionsToTxn(tp, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED);
        this.runUntil(() -> ((TransactionManager)this.transactionManager).hasError());
        Assertions.assertTrue((boolean)(this.transactionManager.lastError() instanceof TransactionalIdAuthorizationException));
        this.assertFatalError(TransactionalIdAuthorizationException.class);
    }

    @Test
    public void testFlushPendingPartitionsOnCommit() throws InterruptedException {
        this.doInitTransactions();
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(this.tp0);
        FutureRecordMetadata responseFuture = this.appendToAccumulator(this.tp0);
        Assertions.assertFalse((boolean)responseFuture.isDone());
        TransactionalRequestResult commitResult = this.transactionManager.beginCommit();
        Assertions.assertFalse((boolean)this.transactionManager.transactionContainsPartition(this.tp0));
        this.prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short)1, 13131L);
        this.runUntil(() -> this.transactionManager.transactionContainsPartition(this.tp0));
        Assertions.assertFalse((boolean)responseFuture.isDone());
        Assertions.assertFalse((boolean)commitResult.isCompleted());
        this.prepareProduceResponse(Errors.NONE, 13131L, (short)1);
        this.runUntil(((Future)responseFuture)::isDone);
        this.prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, 13131L, (short)1);
        Assertions.assertFalse((boolean)commitResult.isCompleted());
        Assertions.assertTrue((boolean)this.transactionManager.hasOngoingTransaction());
        Assertions.assertTrue((boolean)this.transactionManager.isCompleting());
        this.runUntil(() -> ((TransactionalRequestResult)commitResult).isCompleted());
        Assertions.assertFalse((boolean)this.transactionManager.hasOngoingTransaction());
    }

    @Test
    public void testMultipleAddPartitionsPerForOneProduce() throws InterruptedException {
        this.doInitTransactions();
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(this.tp0);
        FutureRecordMetadata responseFuture = this.appendToAccumulator(this.tp0);
        Assertions.assertFalse((boolean)responseFuture.isDone());
        this.prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short)1, 13131L);
        Assertions.assertFalse((boolean)this.transactionManager.transactionContainsPartition(this.tp0));
        this.runUntil(() -> this.transactionManager.transactionContainsPartition(this.tp0));
        this.transactionManager.maybeAddPartition(this.tp1);
        FutureRecordMetadata secondResponseFuture = this.appendToAccumulator(this.tp0);
        this.prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp1, (short)1, 13131L);
        this.prepareProduceResponse(Errors.NONE, 13131L, (short)1);
        Assertions.assertFalse((boolean)this.transactionManager.transactionContainsPartition(this.tp1));
        Assertions.assertFalse((boolean)responseFuture.isDone());
        Assertions.assertFalse((boolean)secondResponseFuture.isDone());
        this.runUntil(() -> this.transactionManager.transactionContainsPartition(this.tp1));
        Assertions.assertFalse((boolean)responseFuture.isDone());
        Assertions.assertFalse((boolean)secondResponseFuture.isDone());
        this.runUntil(((Future)responseFuture)::isDone);
        Assertions.assertTrue((boolean)secondResponseFuture.isDone());
    }

    @ParameterizedTest
    @EnumSource(names={"UNKNOWN_TOPIC_OR_PARTITION", "REQUEST_TIMED_OUT", "COORDINATOR_LOAD_IN_PROGRESS", "CONCURRENT_TRANSACTIONS"})
    public void testRetriableErrors2(Errors error) {
        TransactionalRequestResult result = this.transactionManager.initializeTransactions();
        this.prepareFindCoordinatorResponse(error, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, "foobar");
        this.prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, "foobar");
        this.runUntil(() -> this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION) != null);
        Assertions.assertEquals((Object)this.brokerNode, (Object)this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
        this.prepareInitPidResponse(error, false, 13131L, (short)1);
        this.prepareInitPidResponse(Errors.NONE, false, 13131L, (short)1);
        this.runUntil(() -> ((TransactionManager)this.transactionManager).hasProducerId());
        result.await();
        this.transactionManager.beginTransaction();
        Errors addPartitionsToTxnError = error.equals((Object)Errors.CONCURRENT_TRANSACTIONS) ? Errors.COORDINATOR_LOAD_IN_PROGRESS : error;
        this.transactionManager.maybeAddPartition(this.tp0);
        this.prepareAddPartitionsToTxnResponse(addPartitionsToTxnError, this.tp0, (short)1, 13131L);
        this.prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short)1, 13131L);
        this.runUntil(() -> this.transactionManager.transactionContainsPartition(this.tp0));
        TransactionalRequestResult abortResult = this.transactionManager.beginCommit();
        this.prepareEndTxnResponse(error, TransactionResult.COMMIT, 13131L, (short)1);
        this.prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, 13131L, (short)1);
        this.runUntil(() -> ((TransactionalRequestResult)abortResult).isCompleted());
        Assertions.assertTrue((boolean)abortResult.isSuccessful());
    }

    @Test
    public void testCoordinatorNotAvailable() {
        TransactionalRequestResult result = this.transactionManager.initializeTransactions();
        this.prepareFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, "foobar");
        this.prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, "foobar");
        this.runUntil(() -> this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION) != null);
        Assertions.assertEquals((Object)this.brokerNode, (Object)this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
        this.prepareInitPidResponse(Errors.NONE, false, 13131L, (short)1);
        this.runUntil(() -> ((TransactionManager)this.transactionManager).hasProducerId());
        result.await();
    }

    @Test
    public void testProducerFencedExceptionInInitProducerId() {
        this.verifyProducerFencedForInitProducerId(Errors.PRODUCER_FENCED);
    }

    @Test
    public void testInvalidProducerEpochConvertToProducerFencedInInitProducerId() {
        this.verifyProducerFencedForInitProducerId(Errors.INVALID_PRODUCER_EPOCH);
    }

    private void verifyProducerFencedForInitProducerId(Errors error) {
        TransactionalRequestResult result = this.transactionManager.initializeTransactions();
        this.prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, "foobar");
        this.runUntil(() -> this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION) != null);
        Assertions.assertEquals((Object)this.brokerNode, (Object)this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
        this.prepareInitPidResponse(error, false, 13131L, (short)1);
        this.runUntil(() -> ((TransactionManager)this.transactionManager).hasError());
        Assertions.assertThrows(ProducerFencedException.class, () -> ((TransactionalRequestResult)result).await());
        Assertions.assertThrows(ProducerFencedException.class, () -> this.transactionManager.beginTransaction());
        Assertions.assertThrows(ProducerFencedException.class, () -> this.transactionManager.beginCommit());
        Assertions.assertThrows(ProducerFencedException.class, () -> this.transactionManager.beginAbort());
        Assertions.assertThrows(ProducerFencedException.class, () -> this.transactionManager.sendOffsetsToTransaction(Collections.emptyMap(), new ConsumerGroupMetadata("dummyId")));
    }

    @Test
    public void testProducerFencedInAddPartitionToTxn() throws InterruptedException {
        this.verifyProducerFencedForAddPartitionsToTxn(Errors.PRODUCER_FENCED);
    }

    @Test
    public void testInvalidProducerEpochConvertToProducerFencedInAddPartitionToTxn() throws InterruptedException {
        this.verifyProducerFencedForAddPartitionsToTxn(Errors.INVALID_PRODUCER_EPOCH);
    }

    private void verifyProducerFencedForAddPartitionsToTxn(Errors error) throws InterruptedException {
        this.doInitTransactions();
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(this.tp0);
        FutureRecordMetadata responseFuture = this.appendToAccumulator(this.tp0);
        Assertions.assertFalse((boolean)responseFuture.isDone());
        this.prepareAddPartitionsToTxnResponse(error, this.tp0, (short)1, 13131L);
        this.verifyProducerFenced((Future<RecordMetadata>)responseFuture);
    }

    @Test
    public void testProducerFencedInAddOffSetsToTxn() throws InterruptedException {
        this.verifyProducerFencedForAddOffsetsToTxn(Errors.INVALID_PRODUCER_EPOCH);
    }

    @Test
    public void testInvalidProducerEpochConvertToProducerFencedInAddOffSetsToTxn() throws InterruptedException {
        this.verifyProducerFencedForAddOffsetsToTxn(Errors.INVALID_PRODUCER_EPOCH);
    }

    private void verifyProducerFencedForAddOffsetsToTxn(Errors error) throws InterruptedException {
        this.doInitTransactions();
        this.transactionManager.beginTransaction();
        this.transactionManager.sendOffsetsToTransaction(Collections.emptyMap(), new ConsumerGroupMetadata("myConsumerGroup"));
        FutureRecordMetadata responseFuture = this.appendToAccumulator(this.tp0);
        Assertions.assertFalse((boolean)responseFuture.isDone());
        this.prepareAddOffsetsToTxnResponse(error, "myConsumerGroup", 13131L, (short)1);
        this.verifyProducerFenced((Future<RecordMetadata>)responseFuture);
    }

    private void verifyProducerFenced(Future<RecordMetadata> responseFuture) throws InterruptedException {
        this.runUntil(responseFuture::isDone);
        Assertions.assertTrue((boolean)this.transactionManager.hasError());
        try {
            responseFuture.get();
            Assertions.fail((String)"Expected to get a ExecutionException from the response");
        }
        catch (ExecutionException e) {
            Assertions.assertTrue((boolean)(e.getCause() instanceof ProducerFencedException));
        }
        Assertions.assertThrows(ProducerFencedException.class, () -> this.transactionManager.beginTransaction());
        Assertions.assertThrows(ProducerFencedException.class, () -> this.transactionManager.beginCommit());
        Assertions.assertThrows(ProducerFencedException.class, () -> this.transactionManager.beginAbort());
        Assertions.assertThrows(ProducerFencedException.class, () -> this.transactionManager.sendOffsetsToTransaction(Collections.emptyMap(), new ConsumerGroupMetadata("dummyId")));
    }

    @Test
    public void testInvalidProducerEpochConvertToProducerFencedInEndTxn() throws InterruptedException {
        this.doInitTransactions();
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(this.tp0);
        TransactionalRequestResult commitResult = this.transactionManager.beginCommit();
        FutureRecordMetadata responseFuture = this.appendToAccumulator(this.tp0);
        Assertions.assertFalse((boolean)responseFuture.isDone());
        this.prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short)1, 13131L);
        this.prepareProduceResponse(Errors.NONE, 13131L, (short)1);
        this.prepareEndTxnResponse(Errors.INVALID_PRODUCER_EPOCH, TransactionResult.COMMIT, 13131L, (short)1);
        this.runUntil(() -> ((TransactionalRequestResult)commitResult).isCompleted());
        this.runUntil(((Future)responseFuture)::isDone);
        Assertions.assertThrows(KafkaException.class, () -> ((TransactionalRequestResult)commitResult).await());
        Assertions.assertFalse((boolean)commitResult.isSuccessful());
        Assertions.assertTrue((boolean)commitResult.isAcked());
        Assertions.assertThrows(KafkaException.class, () -> this.transactionManager.beginTransaction());
        Assertions.assertThrows(KafkaException.class, () -> this.transactionManager.beginCommit());
        Assertions.assertThrows(KafkaException.class, () -> this.transactionManager.beginAbort());
        Assertions.assertThrows(KafkaException.class, () -> this.transactionManager.sendOffsetsToTransaction(Collections.emptyMap(), new ConsumerGroupMetadata("dummyId")));
    }

    @Test
    public void testInvalidProducerEpochFromProduce() throws InterruptedException {
        this.doInitTransactions();
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(this.tp0);
        FutureRecordMetadata responseFuture = this.appendToAccumulator(this.tp0);
        Assertions.assertFalse((boolean)responseFuture.isDone());
        this.prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short)1, 13131L);
        this.prepareProduceResponse(Errors.INVALID_PRODUCER_EPOCH, 13131L, (short)1);
        this.prepareProduceResponse(Errors.NONE, 13131L, (short)1);
        this.sender.runOnce();
        this.runUntil(((Future)responseFuture)::isDone);
        Assertions.assertTrue((boolean)this.transactionManager.hasError());
        this.transactionManager.beginAbort();
        TransactionManager.TxnRequestHandler handler = this.transactionManager.nextRequest(false);
        Assertions.assertNotNull((Object)handler);
        Assertions.assertTrue((boolean)(handler.requestBuilder() instanceof EndTxnRequest.Builder));
        handler = this.transactionManager.nextRequest(false);
        Assertions.assertNotNull((Object)handler);
        Assertions.assertTrue((boolean)(handler.requestBuilder() instanceof InitProducerIdRequest.Builder));
    }

    @Test
    public void testDisallowCommitOnProduceFailure() throws InterruptedException {
        this.doInitTransactions();
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(this.tp0);
        FutureRecordMetadata responseFuture = this.appendToAccumulator(this.tp0);
        TransactionalRequestResult commitResult = this.transactionManager.beginCommit();
        Assertions.assertFalse((boolean)responseFuture.isDone());
        this.prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short)1, 13131L);
        this.prepareProduceResponse(Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, 13131L, (short)1);
        this.runUntil(() -> ((TransactionalRequestResult)commitResult).isCompleted());
        Assertions.assertThrows(KafkaException.class, () -> ((TransactionalRequestResult)commitResult).await());
        TestUtils.assertFutureThrows(responseFuture, OutOfOrderSequenceException.class);
        TransactionalRequestResult abortResult = this.transactionManager.beginAbort();
        this.prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, 13131L, (short)1);
        this.prepareInitPidResponse(Errors.NONE, false, 13131L, (short)2);
        this.runUntil(() -> ((TransactionalRequestResult)abortResult).isCompleted());
        Assertions.assertTrue((boolean)abortResult.isSuccessful());
        Assertions.assertTrue((boolean)this.transactionManager.isReady());
    }

    @Test
    public void testAllowAbortOnProduceFailure() throws InterruptedException {
        this.doInitTransactions();
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(this.tp0);
        FutureRecordMetadata responseFuture = this.appendToAccumulator(this.tp0);
        Assertions.assertFalse((boolean)responseFuture.isDone());
        this.prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short)1, 13131L);
        this.prepareProduceResponse(Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, 13131L, (short)1);
        this.runUntil(() -> ((TransactionManager)this.transactionManager).hasAbortableError());
        TransactionalRequestResult abortResult = this.transactionManager.beginAbort();
        this.prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, 13131L, (short)1);
        this.prepareInitPidResponse(Errors.NONE, false, 13131L, (short)2);
        this.runUntil(() -> ((TransactionalRequestResult)abortResult).isCompleted());
        Assertions.assertTrue((boolean)abortResult.isSuccessful());
        Assertions.assertTrue((boolean)this.transactionManager.isReady());
    }

    @Test
    public void testAbortableErrorWhileAbortInProgress() throws InterruptedException {
        this.doInitTransactions();
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(this.tp0);
        FutureRecordMetadata responseFuture = this.appendToAccumulator(this.tp0);
        Assertions.assertFalse((boolean)responseFuture.isDone());
        this.prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short)1, 13131L);
        this.runUntil(() -> !this.accumulator.hasUndrained());
        TransactionalRequestResult abortResult = this.transactionManager.beginAbort();
        Assertions.assertTrue((boolean)this.transactionManager.isAborting());
        Assertions.assertFalse((boolean)this.transactionManager.hasError());
        this.sendProduceResponse(Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, 13131L, (short)1);
        this.prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, 13131L, (short)1);
        this.runUntil(((Future)responseFuture)::isDone);
        Assertions.assertTrue((boolean)this.transactionManager.isAborting());
        Assertions.assertFalse((boolean)this.transactionManager.hasError());
        this.runUntil(() -> ((TransactionalRequestResult)abortResult).isCompleted());
        Assertions.assertTrue((boolean)abortResult.isSuccessful());
        Assertions.assertTrue((boolean)this.transactionManager.isReady());
    }

    @Test
    public void testCommitTransactionWithUnsentProduceRequest() throws Exception {
        this.doInitTransactions();
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(this.tp0);
        FutureRecordMetadata responseFuture = this.appendToAccumulator(this.tp0);
        this.prepareAddPartitionsToTxn(this.tp0, Errors.NONE);
        this.runUntil(() -> !this.client.hasPendingResponses());
        Assertions.assertTrue((boolean)this.accumulator.hasUndrained());
        this.transactionManager.beginCommit();
        this.runUntil(() -> !this.accumulator.hasUndrained());
        Assertions.assertTrue((boolean)this.accumulator.hasIncomplete());
        Assertions.assertFalse((boolean)this.transactionManager.hasInFlightRequest());
        Assertions.assertFalse((boolean)responseFuture.isDone());
        AtomicInteger numRuns = new AtomicInteger(0);
        this.runUntil(() -> numRuns.incrementAndGet() >= 4);
        Assertions.assertFalse((boolean)this.accumulator.hasUndrained());
        Assertions.assertTrue((boolean)this.accumulator.hasIncomplete());
        Assertions.assertFalse((boolean)this.transactionManager.hasInFlightRequest());
        Assertions.assertFalse((boolean)responseFuture.isDone());
        this.sendProduceResponse(Errors.NONE, 13131L, (short)1);
        this.runUntil(((Future)responseFuture)::isDone);
        Assertions.assertFalse((boolean)this.accumulator.hasUndrained());
        Assertions.assertFalse((boolean)this.accumulator.hasIncomplete());
        Assertions.assertFalse((boolean)this.transactionManager.hasInFlightRequest());
        this.runUntil(() -> ((TransactionManager)this.transactionManager).hasInFlightRequest());
        this.sendEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, 13131L, (short)1);
        this.runUntil(() -> ((TransactionManager)this.transactionManager).isReady());
        Assertions.assertFalse((boolean)this.transactionManager.hasInFlightRequest());
    }

    @Test
    public void testCommitTransactionWithInFlightProduceRequest() throws Exception {
        this.doInitTransactions();
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(this.tp0);
        FutureRecordMetadata responseFuture = this.appendToAccumulator(this.tp0);
        this.prepareAddPartitionsToTxn(this.tp0, Errors.NONE);
        this.runUntil(() -> !this.transactionManager.hasPartitionsToAdd());
        Assertions.assertTrue((boolean)this.accumulator.hasUndrained());
        this.accumulator.beginFlush();
        this.runUntil(() -> !this.accumulator.hasUndrained());
        Assertions.assertFalse((boolean)this.accumulator.hasUndrained());
        Assertions.assertTrue((boolean)this.accumulator.hasIncomplete());
        Assertions.assertFalse((boolean)this.transactionManager.hasInFlightRequest());
        this.transactionManager.beginCommit();
        AtomicInteger numRuns = new AtomicInteger(0);
        this.runUntil(() -> numRuns.incrementAndGet() >= 4);
        Assertions.assertFalse((boolean)this.accumulator.hasUndrained());
        Assertions.assertTrue((boolean)this.accumulator.hasIncomplete());
        Assertions.assertFalse((boolean)this.transactionManager.hasInFlightRequest());
        Assertions.assertFalse((boolean)responseFuture.isDone());
        this.sendProduceResponse(Errors.NONE, 13131L, (short)1);
        this.runUntil(((Future)responseFuture)::isDone);
        Assertions.assertFalse((boolean)this.accumulator.hasUndrained());
        Assertions.assertFalse((boolean)this.accumulator.hasIncomplete());
        Assertions.assertFalse((boolean)this.transactionManager.hasInFlightRequest());
        this.runUntil(() -> ((TransactionManager)this.transactionManager).hasInFlightRequest());
        this.sendEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, 13131L, (short)1);
        this.runUntil(() -> ((TransactionManager)this.transactionManager).isReady());
        Assertions.assertFalse((boolean)this.transactionManager.hasInFlightRequest());
    }

    @Test
    public void testFindCoordinatorAllowedInAbortableErrorState() throws InterruptedException {
        this.doInitTransactions();
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(this.tp0);
        FutureRecordMetadata responseFuture = this.appendToAccumulator(this.tp0);
        Assertions.assertFalse((boolean)responseFuture.isDone());
        this.runUntil(() -> ((TransactionManager)this.transactionManager).hasInFlightRequest());
        this.transactionManager.transitionToAbortableError((RuntimeException)((Object)new KafkaException()));
        this.sendAddPartitionsToTxnResponse(Errors.NOT_COORDINATOR, this.tp0, (short)1, 13131L);
        this.runUntil(() -> this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION) == null);
        this.prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, "foobar");
        this.runUntil(() -> this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION) != null);
        Assertions.assertEquals((Object)this.brokerNode, (Object)this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
        Assertions.assertTrue((boolean)this.transactionManager.hasAbortableError());
    }

    @Test
    public void testCancelUnsentAddPartitionsAndProduceOnAbort() throws InterruptedException {
        this.doInitTransactions();
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(this.tp0);
        FutureRecordMetadata responseFuture = this.appendToAccumulator(this.tp0);
        Assertions.assertFalse((boolean)responseFuture.isDone());
        TransactionalRequestResult abortResult = this.transactionManager.beginAbort();
        this.runUntil(() -> ((TransactionalRequestResult)abortResult).isCompleted());
        Assertions.assertTrue((boolean)abortResult.isSuccessful());
        Assertions.assertTrue((boolean)this.transactionManager.isReady());
        TestUtils.assertFutureThrows(responseFuture, KafkaException.class);
    }

    @Test
    public void testAbortResendsAddPartitionErrorIfRetried() throws InterruptedException {
        this.doInitTransactions(13131L, (short)1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(this.tp0);
        this.prepareAddPartitionsToTxnResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION, this.tp0, (short)1, 13131L);
        FutureRecordMetadata responseFuture = this.appendToAccumulator(this.tp0);
        this.runUntil(() -> !this.client.hasPendingResponses());
        Assertions.assertFalse((boolean)responseFuture.isDone());
        TransactionalRequestResult abortResult = this.transactionManager.beginAbort();
        this.prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short)1, 13131L);
        this.prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, 13131L, (short)1);
        this.runUntil(() -> ((TransactionalRequestResult)abortResult).isCompleted());
        Assertions.assertTrue((boolean)abortResult.isSuccessful());
        Assertions.assertTrue((boolean)this.transactionManager.isReady());
        TestUtils.assertFutureThrows(responseFuture, KafkaException.class);
    }

    @Test
    public void testAbortResendsProduceRequestIfRetried() throws Exception {
        this.doInitTransactions(13131L, (short)1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(this.tp0);
        this.prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short)1, 13131L);
        this.prepareProduceResponse(Errors.REQUEST_TIMED_OUT, 13131L, (short)1);
        FutureRecordMetadata responseFuture = this.appendToAccumulator(this.tp0);
        this.runUntil(() -> !this.client.hasPendingResponses());
        Assertions.assertFalse((boolean)responseFuture.isDone());
        TransactionalRequestResult abortResult = this.transactionManager.beginAbort();
        this.prepareProduceResponse(Errors.NONE, 13131L, (short)1);
        this.prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, 13131L, (short)1);
        this.runUntil(() -> ((TransactionalRequestResult)abortResult).isCompleted());
        Assertions.assertTrue((boolean)abortResult.isSuccessful());
        Assertions.assertTrue((boolean)this.transactionManager.isReady());
        RecordMetadata recordMetadata = (RecordMetadata)responseFuture.get();
        Assertions.assertEquals((Object)this.tp0.topic(), (Object)recordMetadata.topic());
    }

    @Test
    public void testHandlingOfUnknownTopicPartitionErrorOnAddPartitions() throws InterruptedException {
        this.doInitTransactions();
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(this.tp0);
        FutureRecordMetadata responseFuture = this.appendToAccumulator(this.tp0);
        Assertions.assertFalse((boolean)responseFuture.isDone());
        this.prepareAddPartitionsToTxnResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION, this.tp0, (short)1, 13131L);
        this.runUntil(() -> !this.client.hasPendingResponses());
        Assertions.assertFalse((boolean)this.transactionManager.transactionContainsPartition(this.tp0));
        this.prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short)1, 13131L);
        this.prepareProduceResponse(Errors.NONE, 13131L, (short)1);
        this.runUntil(() -> this.transactionManager.transactionContainsPartition(this.tp0));
        this.runUntil(((Future)responseFuture)::isDone);
    }

    @Test
    public void testHandlingOfUnknownTopicPartitionErrorOnTxnOffsetCommit() {
        this.testRetriableErrorInTxnOffsetCommit(Errors.UNKNOWN_TOPIC_OR_PARTITION);
    }

    @Test
    public void testHandlingOfCoordinatorLoadingErrorOnTxnOffsetCommit() {
        this.testRetriableErrorInTxnOffsetCommit(Errors.COORDINATOR_LOAD_IN_PROGRESS);
    }

    private void testRetriableErrorInTxnOffsetCommit(Errors error) {
        this.doInitTransactions();
        this.transactionManager.beginTransaction();
        HashMap<TopicPartition, OffsetAndMetadata> offsets = new HashMap<TopicPartition, OffsetAndMetadata>();
        offsets.put(this.tp0, new OffsetAndMetadata(1L));
        offsets.put(this.tp1, new OffsetAndMetadata(1L));
        TransactionalRequestResult addOffsetsResult = this.transactionManager.sendOffsetsToTransaction(offsets, new ConsumerGroupMetadata("myConsumerGroup"));
        this.prepareAddOffsetsToTxnResponse(Errors.NONE, "myConsumerGroup", 13131L, (short)1);
        this.runUntil(() -> !this.client.hasPendingResponses());
        Assertions.assertFalse((boolean)addOffsetsResult.isCompleted());
        HashMap<TopicPartition, Errors> txnOffsetCommitResponse = new HashMap<TopicPartition, Errors>();
        txnOffsetCommitResponse.put(this.tp0, Errors.NONE);
        txnOffsetCommitResponse.put(this.tp1, error);
        this.prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.GROUP, "myConsumerGroup");
        this.prepareTxnOffsetCommitResponse("myConsumerGroup", 13131L, (short)1, txnOffsetCommitResponse);
        Assertions.assertNull((Object)this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.GROUP));
        this.runUntil(() -> this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.GROUP) != null);
        Assertions.assertTrue((boolean)this.transactionManager.hasPendingOffsetCommits());
        this.runUntil(() -> ((TransactionManager)this.transactionManager).hasPendingOffsetCommits());
        Assertions.assertFalse((boolean)addOffsetsResult.isCompleted());
        txnOffsetCommitResponse.put(this.tp1, Errors.NONE);
        this.prepareTxnOffsetCommitResponse("myConsumerGroup", 13131L, (short)1, txnOffsetCommitResponse);
        this.runUntil(() -> ((TransactionalRequestResult)addOffsetsResult).isCompleted());
        Assertions.assertTrue((boolean)addOffsetsResult.isSuccessful());
    }

    @Test
    public void testHandlingOfProducerFencedErrorOnTxnOffsetCommit() {
        this.testFatalErrorInTxnOffsetCommit(Errors.PRODUCER_FENCED);
    }

    @Test
    public void testHandlingOfTransactionalIdAuthorizationFailedErrorOnTxnOffsetCommit() {
        this.testFatalErrorInTxnOffsetCommit(Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED);
    }

    @Test
    public void testHandlingOfInvalidProducerEpochErrorOnTxnOffsetCommit() {
        this.testFatalErrorInTxnOffsetCommit(Errors.INVALID_PRODUCER_EPOCH);
    }

    @Test
    public void testHandlingOfUnsupportedForMessageFormatErrorOnTxnOffsetCommit() {
        this.testFatalErrorInTxnOffsetCommit(Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT);
    }

    private void testFatalErrorInTxnOffsetCommit(Errors error) {
        this.doInitTransactions();
        this.transactionManager.beginTransaction();
        HashMap<TopicPartition, OffsetAndMetadata> offsets = new HashMap<TopicPartition, OffsetAndMetadata>();
        offsets.put(this.tp0, new OffsetAndMetadata(1L));
        offsets.put(this.tp1, new OffsetAndMetadata(1L));
        TransactionalRequestResult addOffsetsResult = this.transactionManager.sendOffsetsToTransaction(offsets, new ConsumerGroupMetadata("myConsumerGroup"));
        this.prepareAddOffsetsToTxnResponse(Errors.NONE, "myConsumerGroup", 13131L, (short)1);
        this.runUntil(() -> !this.client.hasPendingResponses());
        Assertions.assertFalse((boolean)addOffsetsResult.isCompleted());
        HashMap<TopicPartition, Errors> txnOffsetCommitResponse = new HashMap<TopicPartition, Errors>();
        txnOffsetCommitResponse.put(this.tp0, Errors.NONE);
        txnOffsetCommitResponse.put(this.tp1, error);
        this.prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.GROUP, "myConsumerGroup");
        this.prepareTxnOffsetCommitResponse("myConsumerGroup", 13131L, (short)1, txnOffsetCommitResponse);
        this.runUntil(() -> ((TransactionalRequestResult)addOffsetsResult).isCompleted());
        Assertions.assertFalse((boolean)addOffsetsResult.isSuccessful());
        Assertions.assertEquals(error.exception().getClass(), addOffsetsResult.error().getClass());
    }

    @Test
    public void shouldNotAddPartitionsToTransactionWhenTopicAuthorizationFailed() throws Exception {
        this.doInitTransactions();
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(this.tp0);
        FutureRecordMetadata responseFuture = this.appendToAccumulator(this.tp0);
        Assertions.assertFalse((boolean)responseFuture.isDone());
        this.prepareAddPartitionsToTxn(this.tp0, Errors.TOPIC_AUTHORIZATION_FAILED);
        this.runUntil(() -> ((TransactionManager)this.transactionManager).hasError());
        Assertions.assertFalse((boolean)this.transactionManager.transactionContainsPartition(this.tp0));
    }

    @Test
    public void shouldNotSendAbortTxnRequestWhenOnlyAddPartitionsRequestFailed() {
        this.doInitTransactions();
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(this.tp0);
        this.prepareAddPartitionsToTxnResponse(Errors.TOPIC_AUTHORIZATION_FAILED, this.tp0, (short)1, 13131L);
        this.runUntil(() -> !this.client.hasPendingResponses());
        TransactionalRequestResult abortResult = this.transactionManager.beginAbort();
        Assertions.assertFalse((boolean)abortResult.isCompleted());
        this.runUntil(() -> ((TransactionalRequestResult)abortResult).isCompleted());
        Assertions.assertTrue((boolean)abortResult.isSuccessful());
    }

    @Test
    public void shouldNotSendAbortTxnRequestWhenOnlyAddOffsetsRequestFailed() {
        this.doInitTransactions();
        this.transactionManager.beginTransaction();
        HashMap<TopicPartition, OffsetAndMetadata> offsets = new HashMap<TopicPartition, OffsetAndMetadata>();
        offsets.put(this.tp1, new OffsetAndMetadata(1L));
        this.transactionManager.sendOffsetsToTransaction(offsets, new ConsumerGroupMetadata("myConsumerGroup"));
        TransactionalRequestResult abortResult = this.transactionManager.beginAbort();
        this.prepareAddOffsetsToTxnResponse(Errors.GROUP_AUTHORIZATION_FAILED, "myConsumerGroup", 13131L, (short)1);
        this.runUntil(() -> ((TransactionalRequestResult)abortResult).isCompleted());
        Assertions.assertTrue((boolean)this.transactionManager.isReady());
        Assertions.assertTrue((boolean)abortResult.isCompleted());
        Assertions.assertTrue((boolean)abortResult.isSuccessful());
    }

    @Test
    public void shouldFailAbortIfAddOffsetsFailsWithFatalError() {
        this.doInitTransactions();
        this.transactionManager.beginTransaction();
        HashMap<TopicPartition, OffsetAndMetadata> offsets = new HashMap<TopicPartition, OffsetAndMetadata>();
        offsets.put(this.tp1, new OffsetAndMetadata(1L));
        this.transactionManager.sendOffsetsToTransaction(offsets, new ConsumerGroupMetadata("myConsumerGroup"));
        TransactionalRequestResult abortResult = this.transactionManager.beginAbort();
        this.prepareAddOffsetsToTxnResponse(Errors.UNKNOWN_SERVER_ERROR, "myConsumerGroup", 13131L, (short)1);
        this.runUntil(() -> ((TransactionalRequestResult)abortResult).isCompleted());
        Assertions.assertFalse((boolean)abortResult.isSuccessful());
        Assertions.assertTrue((boolean)this.transactionManager.hasFatalError());
    }

    @Test
    public void testSendOffsetsWithGroupMetadata() {
        HashMap<TopicPartition, Errors> txnOffsetCommitResponse = new HashMap<TopicPartition, Errors>();
        txnOffsetCommitResponse.put(this.tp0, Errors.NONE);
        txnOffsetCommitResponse.put(this.tp1, Errors.COORDINATOR_LOAD_IN_PROGRESS);
        TransactionalRequestResult addOffsetsResult = this.prepareGroupMetadataCommit(() -> this.prepareTxnOffsetCommitResponse("myConsumerGroup", 13131L, (short)1, "instance", "member", 5, txnOffsetCommitResponse));
        this.sender.runOnce();
        Assertions.assertTrue((boolean)this.transactionManager.hasPendingOffsetCommits());
        Assertions.assertFalse((boolean)addOffsetsResult.isCompleted());
        txnOffsetCommitResponse.put(this.tp1, Errors.NONE);
        this.prepareTxnOffsetCommitResponse("myConsumerGroup", 13131L, (short)1, "instance", "member", 5, txnOffsetCommitResponse);
        this.sender.runOnce();
        Assertions.assertTrue((boolean)addOffsetsResult.isCompleted());
        Assertions.assertTrue((boolean)addOffsetsResult.isSuccessful());
    }

    @Test
    public void testSendOffsetWithGroupMetadataFailAsAutoDowngradeTxnCommitNotEnabled() {
        this.client.setNodeApiVersions(NodeApiVersions.create((short)ApiKeys.TXN_OFFSET_COMMIT.id, (short)0, (short)2));
        HashMap<TopicPartition, Errors> txnOffsetCommitResponse = new HashMap<TopicPartition, Errors>();
        txnOffsetCommitResponse.put(this.tp0, Errors.NONE);
        txnOffsetCommitResponse.put(this.tp1, Errors.COORDINATOR_LOAD_IN_PROGRESS);
        TransactionalRequestResult addOffsetsResult = this.prepareGroupMetadataCommit(() -> this.prepareTxnOffsetCommitResponse("myConsumerGroup", 13131L, (short)1, txnOffsetCommitResponse));
        this.sender.runOnce();
        Assertions.assertTrue((boolean)addOffsetsResult.isCompleted());
        Assertions.assertFalse((boolean)addOffsetsResult.isSuccessful());
        Assertions.assertTrue((boolean)(addOffsetsResult.error() instanceof UnsupportedVersionException));
        this.assertFatalError(UnsupportedVersionException.class);
    }

    private TransactionalRequestResult prepareGroupMetadataCommit(Runnable prepareTxnCommitResponse) {
        this.doInitTransactions();
        this.transactionManager.beginTransaction();
        HashMap<TopicPartition, OffsetAndMetadata> offsets = new HashMap<TopicPartition, OffsetAndMetadata>();
        offsets.put(this.tp0, new OffsetAndMetadata(1L));
        offsets.put(this.tp1, new OffsetAndMetadata(1L));
        TransactionalRequestResult addOffsetsResult = this.transactionManager.sendOffsetsToTransaction(offsets, new ConsumerGroupMetadata("myConsumerGroup", 5, "member", Optional.of("instance")));
        this.prepareAddOffsetsToTxnResponse(Errors.NONE, "myConsumerGroup", 13131L, (short)1);
        this.sender.runOnce();
        Assertions.assertFalse((boolean)addOffsetsResult.isCompleted());
        this.prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.GROUP, "myConsumerGroup");
        prepareTxnCommitResponse.run();
        Assertions.assertNull((Object)this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.GROUP));
        this.sender.runOnce();
        this.sender.runOnce();
        Assertions.assertNotNull((Object)this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.GROUP));
        Assertions.assertTrue((boolean)this.transactionManager.hasPendingOffsetCommits());
        return addOffsetsResult;
    }

    @Test
    public void testNoDrainWhenPartitionsPending() throws InterruptedException {
        this.doInitTransactions();
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(this.tp0);
        this.appendToAccumulator(this.tp0);
        this.transactionManager.maybeAddPartition(this.tp1);
        this.appendToAccumulator(this.tp1);
        Assertions.assertFalse((boolean)this.transactionManager.isSendToPartitionAllowed(this.tp0));
        Assertions.assertFalse((boolean)this.transactionManager.isSendToPartitionAllowed(this.tp1));
        Node node1 = new Node(0, "localhost", 1111);
        Node node2 = new Node(1, "localhost", 1112);
        PartitionInfo part1 = new PartitionInfo("test", 0, node1, null, null);
        PartitionInfo part2 = new PartitionInfo("test", 1, node2, null, null);
        Cluster cluster = new Cluster(null, Arrays.asList(node1, node2), Arrays.asList(part1, part2), Collections.emptySet(), Collections.emptySet());
        HashSet<Node> nodes = new HashSet<Node>();
        nodes.add(node1);
        nodes.add(node2);
        Map drainedBatches = this.accumulator.drain(cluster, nodes, Integer.MAX_VALUE, this.time.milliseconds());
        Assertions.assertTrue((boolean)drainedBatches.containsKey(node1.id()));
        Assertions.assertTrue((boolean)((List)drainedBatches.get(node1.id())).isEmpty());
        Assertions.assertTrue((boolean)drainedBatches.containsKey(node2.id()));
        Assertions.assertTrue((boolean)((List)drainedBatches.get(node2.id())).isEmpty());
        Assertions.assertFalse((boolean)this.transactionManager.hasError());
    }

    @Test
    public void testAllowDrainInAbortableErrorState() throws InterruptedException {
        this.doInitTransactions();
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(this.tp1);
        this.prepareAddPartitionsToTxn(this.tp1, Errors.NONE);
        this.runUntil(() -> this.transactionManager.transactionContainsPartition(this.tp1));
        this.transactionManager.maybeAddPartition(this.tp0);
        this.prepareAddPartitionsToTxn(this.tp0, Errors.TOPIC_AUTHORIZATION_FAILED);
        this.runUntil(() -> ((TransactionManager)this.transactionManager).hasAbortableError());
        Assertions.assertTrue((boolean)this.transactionManager.isSendToPartitionAllowed(this.tp1));
        Node node1 = new Node(1, "localhost", 1112);
        PartitionInfo part1 = new PartitionInfo("test", 1, node1, null, null);
        Cluster cluster = new Cluster(null, Collections.singletonList(node1), Collections.singletonList(part1), Collections.emptySet(), Collections.emptySet());
        this.appendToAccumulator(this.tp1);
        Map drainedBatches = this.accumulator.drain(cluster, Collections.singleton(node1), Integer.MAX_VALUE, this.time.milliseconds());
        Assertions.assertTrue((boolean)drainedBatches.containsKey(node1.id()));
        Assertions.assertEquals((int)1, (int)((List)drainedBatches.get(node1.id())).size());
        Assertions.assertTrue((boolean)this.transactionManager.hasAbortableError());
    }

    @Test
    public void testRaiseErrorWhenNoPartitionsPendingOnDrain() throws InterruptedException {
        this.doInitTransactions();
        this.transactionManager.beginTransaction();
        this.appendToAccumulator(this.tp0);
        Node node1 = new Node(0, "localhost", 1111);
        PartitionInfo part1 = new PartitionInfo("test", 0, node1, null, null);
        Cluster cluster = new Cluster(null, Collections.singletonList(node1), Collections.singletonList(part1), Collections.emptySet(), Collections.emptySet());
        HashSet<Node> nodes = new HashSet<Node>();
        nodes.add(node1);
        Map drainedBatches = this.accumulator.drain(cluster, nodes, Integer.MAX_VALUE, this.time.milliseconds());
        Assertions.assertTrue((boolean)drainedBatches.containsKey(node1.id()));
        Assertions.assertTrue((boolean)((List)drainedBatches.get(node1.id())).isEmpty());
    }

    @Test
    public void resendFailedProduceRequestAfterAbortableError() throws Exception {
        this.doInitTransactions();
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(this.tp0);
        FutureRecordMetadata responseFuture = this.appendToAccumulator(this.tp0);
        this.prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short)1, 13131L);
        this.prepareProduceResponse(Errors.NOT_LEADER_OR_FOLLOWER, 13131L, (short)1);
        this.runUntil(() -> !this.client.hasPendingResponses());
        Assertions.assertFalse((boolean)responseFuture.isDone());
        this.transactionManager.transitionToAbortableError((RuntimeException)((Object)new KafkaException()));
        this.prepareProduceResponse(Errors.NONE, 13131L, (short)1);
        this.runUntil(((Future)responseFuture)::isDone);
        Assertions.assertNotNull(responseFuture.get());
    }

    @Test
    public void testTransitionToAbortableErrorOnBatchExpiry() throws InterruptedException {
        this.doInitTransactions();
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(this.tp0);
        FutureRecordMetadata responseFuture = this.appendToAccumulator(this.tp0);
        Assertions.assertFalse((boolean)responseFuture.isDone());
        this.prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short)1, 13131L);
        Assertions.assertFalse((boolean)this.transactionManager.transactionContainsPartition(this.tp0));
        Assertions.assertFalse((boolean)this.transactionManager.isSendToPartitionAllowed(this.tp0));
        this.runUntil(() -> this.transactionManager.transactionContainsPartition(this.tp0));
        Assertions.assertTrue((boolean)this.transactionManager.isSendToPartitionAllowed(this.tp0));
        Assertions.assertFalse((boolean)responseFuture.isDone());
        this.time.sleep(10000L);
        Node clusterNode = (Node)this.metadata.fetch().nodes().get(0);
        this.client.disconnect(clusterNode.idString());
        this.client.backoff(clusterNode, 100L);
        this.runUntil(((Future)responseFuture)::isDone);
        try {
            responseFuture.get();
            Assertions.fail((String)"Expected to get a TimeoutException since the queued ProducerBatch should have been expired");
        }
        catch (ExecutionException e) {
            Assertions.assertTrue((boolean)(e.getCause() instanceof TimeoutException));
        }
        Assertions.assertTrue((boolean)this.transactionManager.hasAbortableError());
    }

    @Test
    public void testTransitionToAbortableErrorOnMultipleBatchExpiry() throws InterruptedException {
        this.doInitTransactions();
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(this.tp0);
        this.transactionManager.maybeAddPartition(this.tp1);
        FutureRecordMetadata firstBatchResponse = this.appendToAccumulator(this.tp0);
        FutureRecordMetadata secondBatchResponse = this.appendToAccumulator(this.tp1);
        Assertions.assertFalse((boolean)firstBatchResponse.isDone());
        Assertions.assertFalse((boolean)secondBatchResponse.isDone());
        HashMap<TopicPartition, Errors> partitionErrors = new HashMap<TopicPartition, Errors>();
        partitionErrors.put(this.tp0, Errors.NONE);
        partitionErrors.put(this.tp1, Errors.NONE);
        this.prepareAddPartitionsToTxn(partitionErrors);
        Assertions.assertFalse((boolean)this.transactionManager.transactionContainsPartition(this.tp0));
        Assertions.assertFalse((boolean)this.transactionManager.isSendToPartitionAllowed(this.tp0));
        this.runUntil(() -> this.transactionManager.transactionContainsPartition(this.tp0));
        Assertions.assertTrue((boolean)this.transactionManager.transactionContainsPartition(this.tp1));
        Assertions.assertTrue((boolean)this.transactionManager.isSendToPartitionAllowed(this.tp1));
        Assertions.assertTrue((boolean)this.transactionManager.isSendToPartitionAllowed(this.tp1));
        Assertions.assertFalse((boolean)firstBatchResponse.isDone());
        Assertions.assertFalse((boolean)secondBatchResponse.isDone());
        this.time.sleep(10000L);
        Node clusterNode = (Node)this.metadata.fetch().nodes().get(0);
        this.client.disconnect(clusterNode.idString());
        this.client.backoff(clusterNode, 100L);
        this.runUntil(((Future)firstBatchResponse)::isDone);
        this.runUntil(((Future)secondBatchResponse)::isDone);
        try {
            firstBatchResponse.get();
            Assertions.fail((String)"Expected to get a TimeoutException since the queued ProducerBatch should have been expired");
        }
        catch (ExecutionException e) {
            Assertions.assertTrue((boolean)(e.getCause() instanceof TimeoutException));
        }
        try {
            secondBatchResponse.get();
            Assertions.fail((String)"Expected to get a TimeoutException since the queued ProducerBatch should have been expired");
        }
        catch (ExecutionException e) {
            Assertions.assertTrue((boolean)(e.getCause() instanceof TimeoutException));
        }
        Assertions.assertTrue((boolean)this.transactionManager.hasAbortableError());
    }

    @Test
    public void testDropCommitOnBatchExpiry() throws InterruptedException {
        this.doInitTransactions();
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(this.tp0);
        FutureRecordMetadata responseFuture = this.appendToAccumulator(this.tp0);
        Assertions.assertFalse((boolean)responseFuture.isDone());
        this.prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short)1, 13131L);
        Assertions.assertFalse((boolean)this.transactionManager.transactionContainsPartition(this.tp0));
        Assertions.assertFalse((boolean)this.transactionManager.isSendToPartitionAllowed(this.tp0));
        this.runUntil(() -> this.transactionManager.transactionContainsPartition(this.tp0));
        Assertions.assertTrue((boolean)this.transactionManager.isSendToPartitionAllowed(this.tp0));
        Assertions.assertFalse((boolean)responseFuture.isDone());
        TransactionalRequestResult commitResult = this.transactionManager.beginCommit();
        this.time.sleep(10000L);
        Node clusterNode = (Node)this.metadata.fetch().nodes().get(0);
        this.client.disconnect(clusterNode.idString());
        this.runUntil(((Future)responseFuture)::isDone);
        try {
            responseFuture.get();
            Assertions.fail((String)"Expected to get a TimeoutException since the queued ProducerBatch should have been expired");
        }
        catch (ExecutionException e) {
            Assertions.assertTrue((boolean)(e.getCause() instanceof TimeoutException));
        }
        this.runUntil(() -> ((TransactionalRequestResult)commitResult).isCompleted());
        Assertions.assertFalse((boolean)commitResult.isSuccessful());
        Assertions.assertThrows(TimeoutException.class, () -> ((TransactionalRequestResult)commitResult).await());
        Assertions.assertTrue((boolean)this.transactionManager.hasAbortableError());
        Assertions.assertTrue((boolean)this.transactionManager.hasOngoingTransaction());
        Assertions.assertFalse((boolean)this.transactionManager.isCompleting());
        Assertions.assertTrue((boolean)this.transactionManager.transactionContainsPartition(this.tp0));
        TransactionalRequestResult abortResult = this.transactionManager.beginAbort();
        this.prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, 13131L, (short)1);
        this.prepareInitPidResponse(Errors.NONE, false, 13131L, (short)2);
        this.runUntil(() -> ((TransactionalRequestResult)abortResult).isCompleted());
        Assertions.assertTrue((boolean)abortResult.isSuccessful());
        Assertions.assertFalse((boolean)this.transactionManager.hasOngoingTransaction());
        Assertions.assertFalse((boolean)this.transactionManager.transactionContainsPartition(this.tp0));
    }

    @Test
    public void testTransitionToFatalErrorWhenRetriedBatchIsExpired() throws InterruptedException {
        this.apiVersions.update("0", NodeApiVersions.create(Arrays.asList(new ApiVersionsResponseData.ApiVersion().setApiKey(ApiKeys.INIT_PRODUCER_ID.id).setMinVersion((short)0).setMaxVersion((short)1), new ApiVersionsResponseData.ApiVersion().setApiKey(ApiKeys.PRODUCE.id).setMinVersion((short)0).setMaxVersion((short)7))));
        this.doInitTransactions();
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(this.tp0);
        FutureRecordMetadata responseFuture = this.appendToAccumulator(this.tp0);
        Assertions.assertFalse((boolean)responseFuture.isDone());
        this.prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short)1, 13131L);
        Assertions.assertFalse((boolean)this.transactionManager.transactionContainsPartition(this.tp0));
        Assertions.assertFalse((boolean)this.transactionManager.isSendToPartitionAllowed(this.tp0));
        this.runUntil(() -> this.transactionManager.transactionContainsPartition(this.tp0));
        Assertions.assertTrue((boolean)this.transactionManager.isSendToPartitionAllowed(this.tp0));
        this.prepareProduceResponse(Errors.NOT_LEADER_OR_FOLLOWER, 13131L, (short)1);
        this.runUntil(() -> !this.client.hasPendingResponses());
        Assertions.assertFalse((boolean)responseFuture.isDone());
        TransactionalRequestResult commitResult = this.transactionManager.beginCommit();
        this.time.sleep(10000L);
        Node clusterNode = (Node)this.metadata.fetch().nodes().get(0);
        this.client.disconnect(clusterNode.idString());
        this.client.backoff(clusterNode, 100L);
        this.runUntil(((Future)responseFuture)::isDone);
        try {
            responseFuture.get();
            Assertions.fail((String)"Expected to get a TimeoutException since the queued ProducerBatch should have been expired");
        }
        catch (ExecutionException e) {
            Assertions.assertTrue((boolean)(e.getCause() instanceof TimeoutException));
        }
        this.runUntil(() -> ((TransactionalRequestResult)commitResult).isCompleted());
        Assertions.assertFalse((boolean)commitResult.isSuccessful());
        Assertions.assertTrue((boolean)this.transactionManager.hasFatalError());
        Assertions.assertFalse((boolean)this.transactionManager.hasOngoingTransaction());
    }

    @Test
    public void testBumpEpochAfterTimeoutWithoutPendingInflightRequests() {
        this.initializeTransactionManager(Optional.empty());
        long producerId = 15L;
        short epoch = 5;
        ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(producerId, epoch);
        this.initializeIdempotentProducerId(producerId, epoch);
        this.transactionManager.bumpIdempotentEpochAndResetIdIfNeeded();
        Assertions.assertEquals((Object)producerIdAndEpoch, (Object)this.transactionManager.producerIdAndEpoch());
        TopicPartition tp0 = new TopicPartition("foo", 0);
        Assertions.assertEquals((Integer)0, (Integer)this.transactionManager.sequenceNumber(tp0));
        ProducerBatch b1 = this.writeIdempotentBatchWithValue(this.transactionManager, tp0, "1");
        Assertions.assertEquals((Integer)1, (Integer)this.transactionManager.sequenceNumber(tp0));
        this.transactionManager.handleCompletedBatch(b1, new ProduceResponse.PartitionResponse(Errors.NONE, 500L, this.time.milliseconds(), 0L));
        Assertions.assertEquals((Object)OptionalInt.of(0), (Object)this.transactionManager.lastAckedSequence(tp0));
        this.transactionManager.markSequenceUnresolved(b1);
        this.transactionManager.maybeResolveSequences();
        Assertions.assertEquals((Object)producerIdAndEpoch, (Object)this.transactionManager.producerIdAndEpoch());
        Assertions.assertFalse((boolean)this.transactionManager.hasUnresolvedSequences());
        ProducerBatch b2 = this.writeIdempotentBatchWithValue(this.transactionManager, tp0, "2");
        Assertions.assertEquals((Integer)2, (Integer)this.transactionManager.sequenceNumber(tp0));
        this.transactionManager.markSequenceUnresolved(b2);
        this.transactionManager.handleFailedBatch(b2, (RuntimeException)((Object)new TimeoutException()), false);
        Assertions.assertTrue((boolean)this.transactionManager.hasUnresolvedSequences());
        this.transactionManager.maybeResolveSequences();
        Assertions.assertFalse((boolean)this.transactionManager.hasUnresolvedSequences());
        this.runUntil(() -> this.transactionManager.producerIdAndEpoch().epoch == 6);
    }

    @Test
    public void testNoProducerIdResetAfterLastInFlightBatchSucceeds() {
        this.initializeTransactionManager(Optional.empty());
        long producerId = 15L;
        short epoch = 5;
        ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(producerId, epoch);
        this.initializeIdempotentProducerId(producerId, epoch);
        TopicPartition tp0 = new TopicPartition("foo", 0);
        ProducerBatch b1 = this.writeIdempotentBatchWithValue(this.transactionManager, tp0, "1");
        ProducerBatch b2 = this.writeIdempotentBatchWithValue(this.transactionManager, tp0, "2");
        ProducerBatch b3 = this.writeIdempotentBatchWithValue(this.transactionManager, tp0, "3");
        Assertions.assertEquals((int)3, (int)this.transactionManager.sequenceNumber(tp0));
        this.transactionManager.markSequenceUnresolved(b1);
        this.transactionManager.handleFailedBatch(b1, (RuntimeException)((Object)new TimeoutException()), false);
        Assertions.assertTrue((boolean)this.transactionManager.hasUnresolvedSequences());
        this.transactionManager.bumpIdempotentEpochAndResetIdIfNeeded();
        Assertions.assertEquals((Object)producerIdAndEpoch, (Object)this.transactionManager.producerIdAndEpoch());
        Assertions.assertTrue((boolean)this.transactionManager.hasUnresolvedSequences());
        this.transactionManager.handleFailedBatch(b2, (RuntimeException)((Object)new TimeoutException()), false);
        this.transactionManager.bumpIdempotentEpochAndResetIdIfNeeded();
        Assertions.assertEquals((Object)producerIdAndEpoch, (Object)this.transactionManager.producerIdAndEpoch());
        Assertions.assertTrue((boolean)this.transactionManager.hasUnresolvedSequences());
        this.transactionManager.handleCompletedBatch(b3, new ProduceResponse.PartitionResponse(Errors.NONE, 500L, this.time.milliseconds(), 0L));
        this.transactionManager.maybeResolveSequences();
        Assertions.assertEquals((Object)producerIdAndEpoch, (Object)this.transactionManager.producerIdAndEpoch());
        Assertions.assertFalse((boolean)this.transactionManager.hasUnresolvedSequences());
        Assertions.assertEquals((int)3, (int)this.transactionManager.sequenceNumber(tp0));
    }

    @Test
    public void testEpochBumpAfterLastInflightBatchFails() {
        this.initializeTransactionManager(Optional.empty());
        ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(13131L, 1);
        this.initializeIdempotentProducerId(13131L, (short)1);
        TopicPartition tp0 = new TopicPartition("foo", 0);
        ProducerBatch b1 = this.writeIdempotentBatchWithValue(this.transactionManager, tp0, "1");
        ProducerBatch b2 = this.writeIdempotentBatchWithValue(this.transactionManager, tp0, "2");
        ProducerBatch b3 = this.writeIdempotentBatchWithValue(this.transactionManager, tp0, "3");
        Assertions.assertEquals((Integer)3, (Integer)this.transactionManager.sequenceNumber(tp0));
        this.transactionManager.markSequenceUnresolved(b1);
        this.transactionManager.handleFailedBatch(b1, (RuntimeException)((Object)new TimeoutException()), false);
        Assertions.assertTrue((boolean)this.transactionManager.hasUnresolvedSequences());
        this.transactionManager.handleCompletedBatch(b2, new ProduceResponse.PartitionResponse(Errors.NONE, 500L, this.time.milliseconds(), 0L));
        this.transactionManager.bumpIdempotentEpochAndResetIdIfNeeded();
        Assertions.assertEquals((Object)producerIdAndEpoch, (Object)this.transactionManager.producerIdAndEpoch());
        Assertions.assertTrue((boolean)this.transactionManager.hasUnresolvedSequences());
        this.transactionManager.handleFailedBatch(b3, (RuntimeException)((Object)new TimeoutException()), false);
        this.runUntil(() -> this.transactionManager.producerIdAndEpoch().epoch == 2);
        Assertions.assertFalse((boolean)this.transactionManager.hasUnresolvedSequences());
        Assertions.assertEquals((int)0, (int)this.transactionManager.sequenceNumber(tp0));
    }

    @Test
    public void testNoFailedBatchHandlingWhenTxnManagerIsInFatalError() {
        this.initializeTransactionManager(Optional.empty());
        long producerId = 15L;
        short epoch = 5;
        this.initializeIdempotentProducerId(producerId, epoch);
        TopicPartition tp0 = new TopicPartition("foo", 0);
        ProducerBatch b1 = this.writeIdempotentBatchWithValue(this.transactionManager, tp0, "1");
        this.transactionManager.handleFailedBatch(b1, (RuntimeException)new OutOfOrderSequenceException("out of sequence"), false);
        this.transactionManager.bumpIdempotentEpochAndResetIdIfNeeded();
        ProducerIdAndEpoch idAndEpochAfterFirstBatch = new ProducerIdAndEpoch(producerId, (short)(epoch + 1));
        Assertions.assertEquals((Object)idAndEpochAfterFirstBatch, (Object)this.transactionManager.producerIdAndEpoch());
        this.transactionManager.transitionToFatalError((RuntimeException)((Object)new KafkaException()));
        ProducerBatch b2 = this.writeIdempotentBatchWithValue(this.transactionManager, tp0, "2");
        this.transactionManager.handleFailedBatch(b2, (RuntimeException)((Object)new TimeoutException()), true);
        this.transactionManager.bumpIdempotentEpochAndResetIdIfNeeded();
        Assertions.assertEquals((Object)idAndEpochAfterFirstBatch, (Object)this.transactionManager.producerIdAndEpoch());
    }

    @Test
    public void testAbortTransactionAndReuseSequenceNumberOnError() throws InterruptedException {
        this.apiVersions.update("0", NodeApiVersions.create(Arrays.asList(new ApiVersionsResponseData.ApiVersion().setApiKey(ApiKeys.INIT_PRODUCER_ID.id).setMinVersion((short)0).setMaxVersion((short)1), new ApiVersionsResponseData.ApiVersion().setApiKey(ApiKeys.PRODUCE.id).setMinVersion((short)0).setMaxVersion((short)7))));
        this.doInitTransactions();
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(this.tp0);
        FutureRecordMetadata responseFuture0 = this.appendToAccumulator(this.tp0);
        this.prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short)1, 13131L);
        this.prepareProduceResponse(Errors.NONE, 13131L, (short)1);
        this.runUntil(() -> this.transactionManager.isPartitionAdded(this.tp0));
        this.runUntil(((Future)responseFuture0)::isDone);
        FutureRecordMetadata responseFuture1 = this.appendToAccumulator(this.tp0);
        this.prepareProduceResponse(Errors.NONE, 13131L, (short)1);
        this.runUntil(((Future)responseFuture1)::isDone);
        FutureRecordMetadata responseFuture2 = this.appendToAccumulator(this.tp0);
        this.prepareProduceResponse(Errors.TOPIC_AUTHORIZATION_FAILED, 13131L, (short)1);
        this.runUntil(((Future)responseFuture2)::isDone);
        Assertions.assertTrue((boolean)this.transactionManager.hasAbortableError());
        TransactionalRequestResult abortResult = this.transactionManager.beginAbort();
        this.prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, 13131L, (short)1);
        this.runUntil(() -> ((TransactionalRequestResult)abortResult).isCompleted());
        Assertions.assertTrue((boolean)abortResult.isSuccessful());
        abortResult.await();
        Assertions.assertTrue((boolean)this.transactionManager.isReady());
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(this.tp0);
        this.prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short)1, 13131L);
        this.runUntil(() -> this.transactionManager.isPartitionAdded(this.tp0));
        Assertions.assertEquals((int)2, (int)this.transactionManager.sequenceNumber(this.tp0));
    }

    @Test
    public void testAbortTransactionAndResetSequenceNumberOnUnknownProducerId() throws InterruptedException {
        this.apiVersions.update("0", NodeApiVersions.create(Arrays.asList(new ApiVersionsResponseData.ApiVersion().setApiKey(ApiKeys.INIT_PRODUCER_ID.id).setMinVersion((short)0).setMaxVersion((short)1), new ApiVersionsResponseData.ApiVersion().setApiKey(ApiKeys.PRODUCE.id).setMinVersion((short)0).setMaxVersion((short)7))));
        this.doInitTransactions();
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(this.tp1);
        FutureRecordMetadata successPartitionResponseFuture = this.appendToAccumulator(this.tp1);
        this.prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp1, (short)1, 13131L);
        this.prepareProduceResponse(Errors.NONE, 13131L, (short)1, this.tp1);
        this.runUntil(((Future)successPartitionResponseFuture)::isDone);
        Assertions.assertTrue((boolean)this.transactionManager.isPartitionAdded(this.tp1));
        this.transactionManager.maybeAddPartition(this.tp0);
        FutureRecordMetadata responseFuture0 = this.appendToAccumulator(this.tp0);
        this.prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short)1, 13131L);
        this.prepareProduceResponse(Errors.NONE, 13131L, (short)1);
        this.runUntil(((Future)responseFuture0)::isDone);
        Assertions.assertTrue((boolean)this.transactionManager.isPartitionAdded(this.tp0));
        FutureRecordMetadata responseFuture1 = this.appendToAccumulator(this.tp0);
        this.prepareProduceResponse(Errors.NONE, 13131L, (short)1);
        this.runUntil(((Future)responseFuture1)::isDone);
        FutureRecordMetadata responseFuture2 = this.appendToAccumulator(this.tp0);
        this.client.prepareResponse(this.produceRequestMatcher(13131L, (short)1, this.tp0), (AbstractResponse)this.produceResponse(this.tp0, 0L, Errors.UNKNOWN_PRODUCER_ID, 0, 0));
        this.runUntil(((Future)responseFuture2)::isDone);
        Assertions.assertTrue((boolean)this.transactionManager.hasAbortableError());
        TransactionalRequestResult abortResult = this.transactionManager.beginAbort();
        this.prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, 13131L, (short)1);
        this.runUntil(() -> ((TransactionalRequestResult)abortResult).isCompleted());
        Assertions.assertTrue((boolean)abortResult.isSuccessful());
        abortResult.await();
        Assertions.assertTrue((boolean)this.transactionManager.isReady());
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(this.tp0);
        this.prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short)1, 13131L);
        this.runUntil(() -> this.transactionManager.isPartitionAdded(this.tp0));
        Assertions.assertEquals((int)0, (int)this.transactionManager.sequenceNumber(this.tp0));
        Assertions.assertEquals((int)1, (int)this.transactionManager.sequenceNumber(this.tp1));
    }

    @Test
    public void testBumpTransactionalEpochOnAbortableError() throws InterruptedException {
        boolean initialEpoch = true;
        int bumpedEpoch = 2;
        this.doInitTransactions(13131L, (short)1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(this.tp0);
        this.prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short)1, 13131L);
        this.runUntil(() -> this.transactionManager.isPartitionAdded(this.tp0));
        FutureRecordMetadata responseFuture0 = this.appendToAccumulator(this.tp0);
        this.prepareProduceResponse(Errors.NONE, 13131L, (short)1);
        this.runUntil(((Future)responseFuture0)::isDone);
        FutureRecordMetadata responseFuture1 = this.appendToAccumulator(this.tp0);
        this.prepareProduceResponse(Errors.NONE, 13131L, (short)1);
        this.runUntil(((Future)responseFuture1)::isDone);
        FutureRecordMetadata responseFuture2 = this.appendToAccumulator(this.tp0);
        this.prepareProduceResponse(Errors.TOPIC_AUTHORIZATION_FAILED, 13131L, (short)1);
        this.runUntil(((Future)responseFuture2)::isDone);
        Assertions.assertTrue((boolean)this.transactionManager.hasAbortableError());
        TransactionalRequestResult abortResult = this.transactionManager.beginAbort();
        this.prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, 13131L, (short)1);
        this.prepareInitPidResponse(Errors.NONE, false, 13131L, (short)2);
        this.runUntil(() -> this.transactionManager.producerIdAndEpoch().epoch == 2);
        Assertions.assertTrue((boolean)abortResult.isCompleted());
        Assertions.assertTrue((boolean)abortResult.isSuccessful());
        abortResult.await();
        Assertions.assertTrue((boolean)this.transactionManager.isReady());
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(this.tp0);
        this.prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short)2, 13131L);
        this.runUntil(() -> this.transactionManager.isPartitionAdded(this.tp0));
        Assertions.assertEquals((int)0, (int)this.transactionManager.sequenceNumber(this.tp0));
    }

    @Test
    public void testBumpTransactionalEpochOnUnknownProducerIdError() throws InterruptedException {
        boolean initialEpoch = true;
        int bumpedEpoch = 2;
        this.doInitTransactions(13131L, (short)1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(this.tp0);
        this.prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short)1, 13131L);
        this.runUntil(() -> this.transactionManager.isPartitionAdded(this.tp0));
        FutureRecordMetadata responseFuture0 = this.appendToAccumulator(this.tp0);
        this.prepareProduceResponse(Errors.NONE, 13131L, (short)1);
        this.runUntil(((Future)responseFuture0)::isDone);
        FutureRecordMetadata responseFuture1 = this.appendToAccumulator(this.tp0);
        this.prepareProduceResponse(Errors.NONE, 13131L, (short)1);
        this.runUntil(((Future)responseFuture1)::isDone);
        FutureRecordMetadata responseFuture2 = this.appendToAccumulator(this.tp0);
        this.client.prepareResponse(this.produceRequestMatcher(13131L, (short)1, this.tp0), (AbstractResponse)this.produceResponse(this.tp0, 0L, Errors.UNKNOWN_PRODUCER_ID, 0, 0));
        this.runUntil(((Future)responseFuture2)::isDone);
        Assertions.assertTrue((boolean)this.transactionManager.hasAbortableError());
        TransactionalRequestResult abortResult = this.transactionManager.beginAbort();
        this.prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, 13131L, (short)1);
        this.prepareInitPidResponse(Errors.NONE, false, 13131L, (short)2);
        this.runUntil(() -> this.transactionManager.producerIdAndEpoch().epoch == 2);
        Assertions.assertTrue((boolean)abortResult.isCompleted());
        Assertions.assertTrue((boolean)abortResult.isSuccessful());
        abortResult.await();
        Assertions.assertTrue((boolean)this.transactionManager.isReady());
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(this.tp0);
        this.prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short)2, 13131L);
        this.runUntil(() -> this.transactionManager.isPartitionAdded(this.tp0));
        Assertions.assertEquals((int)0, (int)this.transactionManager.sequenceNumber(this.tp0));
    }

    @Test
    public void testBumpTransactionalEpochOnTimeout() throws InterruptedException {
        boolean initialEpoch = true;
        int bumpedEpoch = 2;
        this.doInitTransactions(13131L, (short)1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(this.tp0);
        this.prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short)1, 13131L);
        this.runUntil(() -> this.transactionManager.isPartitionAdded(this.tp0));
        FutureRecordMetadata responseFuture0 = this.appendToAccumulator(this.tp0);
        this.prepareProduceResponse(Errors.NONE, 13131L, (short)1);
        this.runUntil(((Future)responseFuture0)::isDone);
        FutureRecordMetadata responseFuture1 = this.appendToAccumulator(this.tp0);
        this.prepareProduceResponse(Errors.NONE, 13131L, (short)1);
        this.runUntil(((Future)responseFuture1)::isDone);
        FutureRecordMetadata responseFuture2 = this.appendToAccumulator(this.tp0);
        this.runUntil(this.client::hasInFlightRequests);
        this.time.sleep(10000L);
        Node clusterNode = (Node)this.metadata.fetch().nodes().get(0);
        this.client.disconnect(clusterNode.idString());
        this.client.backoff(clusterNode, 100L);
        this.runUntil(((Future)responseFuture2)::isDone);
        Assertions.assertTrue((boolean)this.transactionManager.hasAbortableError());
        TransactionalRequestResult abortResult = this.transactionManager.beginAbort();
        this.sender.runOnce();
        this.time.sleep(110L);
        this.prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, "foobar");
        this.prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, 13131L, (short)1);
        this.prepareInitPidResponse(Errors.NONE, false, 13131L, (short)2);
        this.runUntil(() -> this.transactionManager.producerIdAndEpoch().epoch == 2);
        Assertions.assertTrue((boolean)abortResult.isCompleted());
        Assertions.assertTrue((boolean)abortResult.isSuccessful());
        abortResult.await();
        Assertions.assertTrue((boolean)this.transactionManager.isReady());
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(this.tp0);
        this.prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short)2, 13131L);
        this.runUntil(() -> this.transactionManager.isPartitionAdded(this.tp0));
        Assertions.assertEquals((int)0, (int)this.transactionManager.sequenceNumber(this.tp0));
    }

    @Test
    public void testBumpTransactionalEpochOnRecoverableAddPartitionRequestError() {
        boolean initialEpoch = true;
        int bumpedEpoch = 2;
        this.doInitTransactions(13131L, (short)1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(this.tp0);
        this.prepareAddPartitionsToTxnResponse(Errors.INVALID_PRODUCER_ID_MAPPING, this.tp0, (short)1, 13131L);
        this.runUntil(() -> ((TransactionManager)this.transactionManager).hasAbortableError());
        TransactionalRequestResult abortResult = this.transactionManager.beginAbort();
        this.prepareInitPidResponse(Errors.NONE, false, 13131L, (short)2);
        this.runUntil(() -> ((TransactionalRequestResult)abortResult).isCompleted());
        Assertions.assertEquals((short)2, (short)this.transactionManager.producerIdAndEpoch().epoch);
        Assertions.assertTrue((boolean)abortResult.isSuccessful());
        Assertions.assertTrue((boolean)this.transactionManager.isReady());
    }

    @Test
    public void testBumpTransactionalEpochOnRecoverableAddOffsetsRequestError() throws InterruptedException {
        boolean initialEpoch = true;
        int bumpedEpoch = 2;
        this.doInitTransactions(13131L, (short)1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(this.tp0);
        FutureRecordMetadata responseFuture = this.appendToAccumulator(this.tp0);
        Assertions.assertFalse((boolean)responseFuture.isDone());
        this.prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short)1, 13131L);
        this.prepareProduceResponse(Errors.NONE, 13131L, (short)1);
        this.runUntil(((Future)responseFuture)::isDone);
        HashMap<TopicPartition, OffsetAndMetadata> offsets = new HashMap<TopicPartition, OffsetAndMetadata>();
        offsets.put(this.tp0, new OffsetAndMetadata(1L));
        this.transactionManager.sendOffsetsToTransaction(offsets, new ConsumerGroupMetadata("myConsumerGroup"));
        Assertions.assertFalse((boolean)this.transactionManager.hasPendingOffsetCommits());
        this.prepareAddOffsetsToTxnResponse(Errors.INVALID_PRODUCER_ID_MAPPING, "myConsumerGroup", 13131L, (short)1);
        this.runUntil(() -> ((TransactionManager)this.transactionManager).hasAbortableError());
        TransactionalRequestResult abortResult = this.transactionManager.beginAbort();
        this.prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, 13131L, (short)1);
        this.prepareInitPidResponse(Errors.NONE, false, 13131L, (short)2);
        this.runUntil(() -> ((TransactionalRequestResult)abortResult).isCompleted());
        Assertions.assertEquals((short)2, (short)this.transactionManager.producerIdAndEpoch().epoch);
        Assertions.assertTrue((boolean)abortResult.isSuccessful());
        Assertions.assertTrue((boolean)this.transactionManager.isReady());
    }

    @Test
    public void testHealthyPartitionRetriesDuringEpochBump() throws InterruptedException {
        this.initializeTransactionManager(Optional.empty());
        Sender sender = new Sender(this.logContext, (KafkaClient)this.client, this.metadata, this.accumulator, false, 0x100000, -1, Integer.MAX_VALUE, new SenderMetricsRegistry(new Metrics((Time)this.time)), (Time)this.time, 1000, 50L, this.transactionManager, this.apiVersions);
        this.initializeIdempotentProducerId(13131L, (short)1);
        ProducerBatch tp0b1 = this.writeIdempotentBatchWithValue(this.transactionManager, this.tp0, "1");
        ProducerBatch tp0b2 = this.writeIdempotentBatchWithValue(this.transactionManager, this.tp0, "2");
        this.writeIdempotentBatchWithValue(this.transactionManager, this.tp0, "3");
        ProducerBatch tp1b1 = this.writeIdempotentBatchWithValue(this.transactionManager, this.tp1, "4");
        ProducerBatch tp1b2 = this.writeIdempotentBatchWithValue(this.transactionManager, this.tp1, "5");
        Assertions.assertEquals((int)3, (int)this.transactionManager.sequenceNumber(this.tp0));
        Assertions.assertEquals((int)2, (int)this.transactionManager.sequenceNumber(this.tp1));
        long b1AppendTime = this.time.milliseconds();
        ProduceResponse.PartitionResponse t0b1Response = new ProduceResponse.PartitionResponse(Errors.NONE, 500L, b1AppendTime, 0L);
        tp0b1.complete(500L, b1AppendTime);
        this.transactionManager.handleCompletedBatch(tp0b1, t0b1Response);
        ProduceResponse.PartitionResponse t1b1Response = new ProduceResponse.PartitionResponse(Errors.NONE, 500L, b1AppendTime, 0L);
        tp1b1.complete(500L, b1AppendTime);
        this.transactionManager.handleCompletedBatch(tp1b1, t1b1Response);
        ProduceResponse.PartitionResponse t0b2Response = new ProduceResponse.PartitionResponse(Errors.UNKNOWN_PRODUCER_ID, -1L, -1L, 500L);
        Assertions.assertTrue((boolean)this.transactionManager.canRetry(t0b2Response, tp0b2));
        this.runUntil(() -> this.transactionManager.producerIdAndEpoch().epoch == 2);
        Assertions.assertEquals((Object)tp0b2, (Object)this.transactionManager.nextBatchBySequence(this.tp0));
        Assertions.assertEquals((int)0, (int)this.transactionManager.firstInFlightSequence(this.tp0));
        Assertions.assertEquals((int)0, (int)tp0b2.baseSequence());
        Assertions.assertTrue((boolean)tp0b2.sequenceHasBeenReset());
        Assertions.assertEquals((int)2, (int)tp0b2.producerEpoch());
        Assertions.assertEquals((Object)tp1b2, (Object)this.transactionManager.nextBatchBySequence(this.tp1));
        Assertions.assertEquals((int)1, (int)this.transactionManager.firstInFlightSequence(this.tp1));
        Assertions.assertEquals((int)1, (int)tp1b2.baseSequence());
        Assertions.assertFalse((boolean)tp1b2.sequenceHasBeenReset());
        Assertions.assertEquals((int)1, (int)tp1b2.producerEpoch());
        this.appendToAccumulator(this.tp1);
        sender.runOnce();
        Assertions.assertEquals((int)1, (int)this.accumulator.getDeque(this.tp1).size());
        ProduceResponse.PartitionResponse t1b2Response = new ProduceResponse.PartitionResponse(Errors.NOT_LEADER_OR_FOLLOWER, -1L, -1L, 600L);
        Assertions.assertTrue((boolean)this.transactionManager.canRetry(t1b2Response, tp1b2));
        this.accumulator.reenqueue(tp1b2, this.time.milliseconds());
        sender.runOnce();
        Assertions.assertEquals((int)1, (int)this.accumulator.getDeque(this.tp1).size());
        Assertions.assertNotEquals((Object)tp1b2, this.accumulator.getDeque(this.tp1).peek());
        Assertions.assertEquals((short)1, (short)tp1b2.producerEpoch());
        t1b2Response = new ProduceResponse.PartitionResponse(Errors.NONE, 500L, b1AppendTime, 0L);
        tp1b2.complete(500L, b1AppendTime);
        this.transactionManager.handleCompletedBatch(tp1b2, t1b2Response);
        this.transactionManager.maybeUpdateProducerIdAndEpoch(this.tp1);
        Assertions.assertFalse((boolean)this.transactionManager.hasInflightBatches(this.tp1));
        Assertions.assertEquals((int)0, (int)this.transactionManager.sequenceNumber(this.tp1));
        this.runUntil(() -> this.transactionManager.hasInflightBatches(this.tp1));
        Assertions.assertTrue((boolean)this.accumulator.getDeque(this.tp1).isEmpty());
        ProducerBatch tp1b3 = this.transactionManager.nextBatchBySequence(this.tp1);
        Assertions.assertEquals((int)2, (int)tp1b3.producerEpoch());
        ProduceResponse.PartitionResponse t1b3Response = new ProduceResponse.PartitionResponse(Errors.NONE, 500L, b1AppendTime, 0L);
        tp1b3.complete(500L, b1AppendTime);
        this.transactionManager.handleCompletedBatch(tp1b3, t1b3Response);
        this.transactionManager.maybeUpdateProducerIdAndEpoch(this.tp1);
        Assertions.assertFalse((boolean)this.transactionManager.hasInflightBatches(this.tp1));
        Assertions.assertEquals((int)1, (int)this.transactionManager.sequenceNumber(this.tp1));
    }

    @Test
    public void testRetryAbortTransaction() throws InterruptedException {
        this.verifyCommitOrAbortTransactionRetriable(TransactionResult.ABORT, TransactionResult.ABORT);
    }

    @Test
    public void testRetryCommitTransaction() throws InterruptedException {
        this.verifyCommitOrAbortTransactionRetriable(TransactionResult.COMMIT, TransactionResult.COMMIT);
    }

    @Test
    public void testRetryAbortTransactionAfterCommitTimeout() {
        Assertions.assertThrows(IllegalStateException.class, () -> this.verifyCommitOrAbortTransactionRetriable(TransactionResult.COMMIT, TransactionResult.ABORT));
    }

    @Test
    public void testRetryCommitTransactionAfterAbortTimeout() {
        Assertions.assertThrows(IllegalStateException.class, () -> this.verifyCommitOrAbortTransactionRetriable(TransactionResult.ABORT, TransactionResult.COMMIT));
    }

    @Test
    public void testCanBumpEpochDuringCoordinatorDisconnect() {
        this.doInitTransactions(0L, (short)0);
        this.runUntil(() -> this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION) != null);
        Assertions.assertTrue((boolean)this.transactionManager.canBumpEpoch());
        this.apiVersions.remove(this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION).idString());
        Assertions.assertTrue((boolean)this.transactionManager.canBumpEpoch());
    }

    @Test
    public void testFailedInflightBatchAfterEpochBump() throws InterruptedException {
        this.initializeTransactionManager(Optional.empty());
        Sender sender = new Sender(this.logContext, (KafkaClient)this.client, this.metadata, this.accumulator, false, 0x100000, -1, Integer.MAX_VALUE, new SenderMetricsRegistry(new Metrics((Time)this.time)), (Time)this.time, 1000, 50L, this.transactionManager, this.apiVersions);
        this.initializeIdempotentProducerId(13131L, (short)1);
        ProducerBatch tp0b1 = this.writeIdempotentBatchWithValue(this.transactionManager, this.tp0, "1");
        ProducerBatch tp0b2 = this.writeIdempotentBatchWithValue(this.transactionManager, this.tp0, "2");
        this.writeIdempotentBatchWithValue(this.transactionManager, this.tp0, "3");
        ProducerBatch tp1b1 = this.writeIdempotentBatchWithValue(this.transactionManager, this.tp1, "4");
        ProducerBatch tp1b2 = this.writeIdempotentBatchWithValue(this.transactionManager, this.tp1, "5");
        Assertions.assertEquals((int)3, (int)this.transactionManager.sequenceNumber(this.tp0));
        Assertions.assertEquals((int)2, (int)this.transactionManager.sequenceNumber(this.tp1));
        long b1AppendTime = this.time.milliseconds();
        ProduceResponse.PartitionResponse t0b1Response = new ProduceResponse.PartitionResponse(Errors.NONE, 500L, b1AppendTime, 0L);
        tp0b1.complete(500L, b1AppendTime);
        this.transactionManager.handleCompletedBatch(tp0b1, t0b1Response);
        ProduceResponse.PartitionResponse t1b1Response = new ProduceResponse.PartitionResponse(Errors.NONE, 500L, b1AppendTime, 0L);
        tp1b1.complete(500L, b1AppendTime);
        this.transactionManager.handleCompletedBatch(tp1b1, t1b1Response);
        ProduceResponse.PartitionResponse t0b2Response = new ProduceResponse.PartitionResponse(Errors.UNKNOWN_PRODUCER_ID, -1L, -1L, 500L);
        Assertions.assertTrue((boolean)this.transactionManager.canRetry(t0b2Response, tp0b2));
        this.runUntil(() -> this.transactionManager.producerIdAndEpoch().epoch == 2);
        Assertions.assertEquals((Object)tp0b2, (Object)this.transactionManager.nextBatchBySequence(this.tp0));
        Assertions.assertEquals((int)0, (int)this.transactionManager.firstInFlightSequence(this.tp0));
        Assertions.assertEquals((int)0, (int)tp0b2.baseSequence());
        Assertions.assertTrue((boolean)tp0b2.sequenceHasBeenReset());
        Assertions.assertEquals((int)2, (int)tp0b2.producerEpoch());
        Assertions.assertEquals((Object)tp1b2, (Object)this.transactionManager.nextBatchBySequence(this.tp1));
        Assertions.assertEquals((int)1, (int)this.transactionManager.firstInFlightSequence(this.tp1));
        Assertions.assertEquals((int)1, (int)tp1b2.baseSequence());
        Assertions.assertFalse((boolean)tp1b2.sequenceHasBeenReset());
        Assertions.assertEquals((int)1, (int)tp1b2.producerEpoch());
        this.appendToAccumulator(this.tp1);
        sender.runOnce();
        Assertions.assertEquals((int)1, (int)this.accumulator.getDeque(this.tp1).size());
        ProduceResponse.PartitionResponse t1b2Response = new ProduceResponse.PartitionResponse(Errors.NOT_LEADER_OR_FOLLOWER, -1L, -1L, 600L);
        Assertions.assertTrue((boolean)this.transactionManager.canRetry(t1b2Response, tp1b2));
        this.accumulator.reenqueue(tp1b2, this.time.milliseconds());
        sender.runOnce();
        Assertions.assertEquals((int)1, (int)this.accumulator.getDeque(this.tp1).size());
        Assertions.assertNotEquals((Object)tp1b2, this.accumulator.getDeque(this.tp1).peek());
        Assertions.assertEquals((short)1, (short)tp1b2.producerEpoch());
        t1b2Response = new ProduceResponse.PartitionResponse(Errors.NONE, 500L, b1AppendTime, 0L);
        tp1b2.complete(500L, b1AppendTime);
        this.transactionManager.handleCompletedBatch(tp1b2, t1b2Response);
        this.transactionManager.maybeUpdateProducerIdAndEpoch(this.tp1);
        Assertions.assertFalse((boolean)this.transactionManager.hasInflightBatches(this.tp1));
        Assertions.assertEquals((int)0, (int)this.transactionManager.sequenceNumber(this.tp1));
        this.runUntil(() -> this.transactionManager.hasInflightBatches(this.tp1));
        Assertions.assertTrue((boolean)this.accumulator.getDeque(this.tp1).isEmpty());
        ProducerBatch tp1b3 = this.transactionManager.nextBatchBySequence(this.tp1);
        Assertions.assertEquals((int)2, (int)tp1b3.producerEpoch());
        ProduceResponse.PartitionResponse t1b3Response = new ProduceResponse.PartitionResponse(Errors.NONE, 500L, b1AppendTime, 0L);
        tp1b3.complete(500L, b1AppendTime);
        this.transactionManager.handleCompletedBatch(tp1b3, t1b3Response);
        Assertions.assertFalse((boolean)this.transactionManager.hasInflightBatches(this.tp1));
        Assertions.assertEquals((int)1, (int)this.transactionManager.sequenceNumber(this.tp1));
    }

    private FutureRecordMetadata appendToAccumulator(TopicPartition tp) throws InterruptedException {
        long nowMs = this.time.milliseconds();
        return this.accumulator.append((String)tp.topic(), (int)tp.partition(), (long)nowMs, (byte[])"key".getBytes(), (byte[])"value".getBytes(), (Header[])Record.EMPTY_HEADERS, null, (long)1000L, (boolean)false, (long)nowMs, (Cluster)TestUtils.singletonCluster()).future;
    }

    private void verifyCommitOrAbortTransactionRetriable(TransactionResult firstTransactionResult, TransactionResult retryTransactionResult) throws InterruptedException {
        this.doInitTransactions();
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(this.tp0);
        this.appendToAccumulator(this.tp0);
        this.prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short)1, 13131L);
        this.prepareProduceResponse(Errors.NONE, 13131L, (short)1);
        this.runUntil(() -> !this.client.hasPendingResponses());
        TransactionalRequestResult result = firstTransactionResult == TransactionResult.COMMIT ? this.transactionManager.beginCommit() : this.transactionManager.beginAbort();
        this.prepareEndTxnResponse(Errors.NONE, firstTransactionResult, 13131L, (short)1, true);
        this.runUntil(() -> !this.client.hasPendingResponses());
        Assertions.assertFalse((boolean)result.isCompleted());
        Assertions.assertThrows(TimeoutException.class, () -> result.await(1000L, TimeUnit.MILLISECONDS));
        this.prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, "foobar");
        this.runUntil(() -> !this.client.hasPendingResponses());
        TransactionalRequestResult retryResult = retryTransactionResult == TransactionResult.COMMIT ? this.transactionManager.beginCommit() : this.transactionManager.beginAbort();
        Assertions.assertEquals((Object)retryResult, (Object)result);
        this.prepareEndTxnResponse(Errors.NONE, retryTransactionResult, 13131L, (short)1, false);
        this.runUntil(() -> ((TransactionalRequestResult)retryResult).isCompleted());
        Assertions.assertFalse((boolean)this.transactionManager.hasOngoingTransaction());
    }

    private void prepareAddPartitionsToTxn(Map<TopicPartition, Errors> errors) {
        this.client.prepareResponse(body -> {
            AddPartitionsToTxnRequest request = (AddPartitionsToTxnRequest)body;
            Assertions.assertEquals(new HashSet(request.partitions()), new HashSet(errors.keySet()));
            return true;
        }, (AbstractResponse)new AddPartitionsToTxnResponse(0, errors));
    }

    private void prepareAddPartitionsToTxn(TopicPartition tp, Errors error) {
        this.prepareAddPartitionsToTxn(Collections.singletonMap(tp, error));
    }

    private void prepareFindCoordinatorResponse(Errors error, boolean shouldDisconnect, FindCoordinatorRequest.CoordinatorType coordinatorType, String coordinatorKey) {
        this.client.prepareResponse(body -> {
            FindCoordinatorRequest findCoordinatorRequest = (FindCoordinatorRequest)body;
            Assertions.assertEquals((Object)coordinatorType, (Object)FindCoordinatorRequest.CoordinatorType.forId((byte)findCoordinatorRequest.data().keyType()));
            String key = findCoordinatorRequest.data().coordinatorKeys().isEmpty() ? findCoordinatorRequest.data().key() : (String)findCoordinatorRequest.data().coordinatorKeys().get(0);
            Assertions.assertEquals((Object)coordinatorKey, (Object)key);
            return true;
        }, (AbstractResponse)FindCoordinatorResponse.prepareResponse((Errors)error, (String)coordinatorKey, (Node)this.brokerNode), shouldDisconnect);
    }

    private void prepareInitPidResponse(Errors error, boolean shouldDisconnect, long producerId, short producerEpoch) {
        InitProducerIdResponseData responseData = new InitProducerIdResponseData().setErrorCode(error.code()).setProducerEpoch(producerEpoch).setProducerId(producerId).setThrottleTimeMs(0);
        this.client.prepareResponse(body -> {
            InitProducerIdRequest initProducerIdRequest = (InitProducerIdRequest)body;
            Assertions.assertEquals((Object)"foobar", (Object)initProducerIdRequest.data().transactionalId());
            Assertions.assertEquals((int)1121, (int)initProducerIdRequest.data().transactionTimeoutMs());
            return true;
        }, (AbstractResponse)new InitProducerIdResponse(responseData), shouldDisconnect);
    }

    private void sendProduceResponse(Errors error, long producerId, short producerEpoch) {
        this.sendProduceResponse(error, producerId, producerEpoch, this.tp0);
    }

    private void sendProduceResponse(Errors error, long producerId, short producerEpoch, TopicPartition tp) {
        this.client.respond(this.produceRequestMatcher(producerId, producerEpoch, tp), (AbstractResponse)this.produceResponse(tp, 0L, error, 0));
    }

    private void prepareProduceResponse(Errors error, long producerId, short producerEpoch) {
        this.prepareProduceResponse(error, producerId, producerEpoch, this.tp0);
    }

    private void prepareProduceResponse(Errors error, long producerId, short producerEpoch, TopicPartition tp) {
        this.client.prepareResponse(this.produceRequestMatcher(producerId, producerEpoch, tp), (AbstractResponse)this.produceResponse(tp, 0L, error, 0));
    }

    private MockClient.RequestMatcher produceRequestMatcher(long producerId, short epoch, TopicPartition tp) {
        return body -> {
            ProduceRequest produceRequest = (ProduceRequest)body;
            MemoryRecords records = produceRequest.data().topicData().stream().filter(t -> t.name().equals(tp.topic())).findAny().get().partitionData().stream().filter(p -> p.index() == tp.partition()).map(p -> (MemoryRecords)p.records()).findAny().get();
            Assertions.assertNotNull((Object)records);
            Iterator batchIterator = records.batches().iterator();
            Assertions.assertTrue((boolean)batchIterator.hasNext());
            MutableRecordBatch batch = (MutableRecordBatch)batchIterator.next();
            Assertions.assertFalse((boolean)batchIterator.hasNext());
            Assertions.assertTrue((boolean)batch.isTransactional());
            Assertions.assertEquals((long)producerId, (long)batch.producerId());
            Assertions.assertEquals((short)epoch, (short)batch.producerEpoch());
            Assertions.assertEquals((Object)"foobar", (Object)produceRequest.transactionalId());
            return true;
        };
    }

    private void prepareAddPartitionsToTxnResponse(Errors error, TopicPartition topicPartition, short epoch, long producerId) {
        this.client.prepareResponse(this.addPartitionsRequestMatcher(topicPartition, epoch, producerId), (AbstractResponse)new AddPartitionsToTxnResponse(0, Collections.singletonMap(topicPartition, error)));
    }

    private void sendAddPartitionsToTxnResponse(Errors error, TopicPartition topicPartition, short epoch, long producerId) {
        this.client.respond(this.addPartitionsRequestMatcher(topicPartition, epoch, producerId), (AbstractResponse)new AddPartitionsToTxnResponse(0, Collections.singletonMap(topicPartition, error)));
    }

    private MockClient.RequestMatcher addPartitionsRequestMatcher(TopicPartition topicPartition, short epoch, long producerId) {
        return body -> {
            AddPartitionsToTxnRequest addPartitionsToTxnRequest = (AddPartitionsToTxnRequest)body;
            Assertions.assertEquals((long)producerId, (long)addPartitionsToTxnRequest.data().producerId());
            Assertions.assertEquals((short)epoch, (short)addPartitionsToTxnRequest.data().producerEpoch());
            Assertions.assertEquals(Collections.singletonList(topicPartition), (Object)addPartitionsToTxnRequest.partitions());
            Assertions.assertEquals((Object)"foobar", (Object)addPartitionsToTxnRequest.data().transactionalId());
            return true;
        };
    }

    private void prepareEndTxnResponse(Errors error, TransactionResult result, long producerId, short epoch) {
        this.prepareEndTxnResponse(error, result, producerId, epoch, false);
    }

    private void prepareEndTxnResponse(Errors error, TransactionResult result, long producerId, short epoch, boolean shouldDisconnect) {
        this.client.prepareResponse(this.endTxnMatcher(result, producerId, epoch), (AbstractResponse)new EndTxnResponse(new EndTxnResponseData().setErrorCode(error.code()).setThrottleTimeMs(0)), shouldDisconnect);
    }

    private void sendEndTxnResponse(Errors error, TransactionResult result, long producerId, short epoch) {
        this.client.respond(this.endTxnMatcher(result, producerId, epoch), (AbstractResponse)new EndTxnResponse(new EndTxnResponseData().setErrorCode(error.code()).setThrottleTimeMs(0)));
    }

    private MockClient.RequestMatcher endTxnMatcher(TransactionResult result, long producerId, short epoch) {
        return body -> {
            EndTxnRequest endTxnRequest = (EndTxnRequest)body;
            Assertions.assertEquals((Object)"foobar", (Object)endTxnRequest.data().transactionalId());
            Assertions.assertEquals((long)producerId, (long)endTxnRequest.data().producerId());
            Assertions.assertEquals((short)epoch, (short)endTxnRequest.data().producerEpoch());
            Assertions.assertEquals((Object)result, (Object)endTxnRequest.result());
            return true;
        };
    }

    private void prepareAddOffsetsToTxnResponse(Errors error, String consumerGroupId, long producerId, short producerEpoch) {
        this.client.prepareResponse(body -> {
            AddOffsetsToTxnRequest addOffsetsToTxnRequest = (AddOffsetsToTxnRequest)body;
            Assertions.assertEquals((Object)consumerGroupId, (Object)addOffsetsToTxnRequest.data().groupId());
            Assertions.assertEquals((Object)"foobar", (Object)addOffsetsToTxnRequest.data().transactionalId());
            Assertions.assertEquals((long)producerId, (long)addOffsetsToTxnRequest.data().producerId());
            Assertions.assertEquals((short)producerEpoch, (short)addOffsetsToTxnRequest.data().producerEpoch());
            return true;
        }, (AbstractResponse)new AddOffsetsToTxnResponse(new AddOffsetsToTxnResponseData().setErrorCode(error.code())));
    }

    private void prepareTxnOffsetCommitResponse(String consumerGroupId, long producerId, short producerEpoch, Map<TopicPartition, Errors> txnOffsetCommitResponse) {
        this.client.prepareResponse(request -> {
            TxnOffsetCommitRequest txnOffsetCommitRequest = (TxnOffsetCommitRequest)request;
            Assertions.assertEquals((Object)consumerGroupId, (Object)txnOffsetCommitRequest.data().groupId());
            Assertions.assertEquals((long)producerId, (long)txnOffsetCommitRequest.data().producerId());
            Assertions.assertEquals((short)producerEpoch, (short)txnOffsetCommitRequest.data().producerEpoch());
            return true;
        }, (AbstractResponse)new TxnOffsetCommitResponse(0, txnOffsetCommitResponse));
    }

    private void prepareTxnOffsetCommitResponse(String consumerGroupId, long producerId, short producerEpoch, String groupInstanceId, String memberId, int generationId, Map<TopicPartition, Errors> txnOffsetCommitResponse) {
        this.client.prepareResponse(request -> {
            TxnOffsetCommitRequest txnOffsetCommitRequest = (TxnOffsetCommitRequest)request;
            Assertions.assertEquals((Object)consumerGroupId, (Object)txnOffsetCommitRequest.data().groupId());
            Assertions.assertEquals((long)producerId, (long)txnOffsetCommitRequest.data().producerId());
            Assertions.assertEquals((short)producerEpoch, (short)txnOffsetCommitRequest.data().producerEpoch());
            Assertions.assertEquals((Object)groupInstanceId, (Object)txnOffsetCommitRequest.data().groupInstanceId());
            Assertions.assertEquals((Object)memberId, (Object)txnOffsetCommitRequest.data().memberId());
            Assertions.assertEquals((int)generationId, (int)txnOffsetCommitRequest.data().generationId());
            return true;
        }, (AbstractResponse)new TxnOffsetCommitResponse(0, txnOffsetCommitResponse));
    }

    private ProduceResponse produceResponse(TopicPartition tp, long offset, Errors error, int throttleTimeMs) {
        return this.produceResponse(tp, offset, error, throttleTimeMs, 10);
    }

    private ProduceResponse produceResponse(TopicPartition tp, long offset, Errors error, int throttleTimeMs, int logStartOffset) {
        ProduceResponse.PartitionResponse resp = new ProduceResponse.PartitionResponse(error, offset, -1L, (long)logStartOffset);
        Map<TopicPartition, ProduceResponse.PartitionResponse> partResp = Collections.singletonMap(tp, resp);
        return new ProduceResponse(partResp, throttleTimeMs);
    }

    private void initializeIdempotentProducerId(long producerId, short epoch) {
        InitProducerIdResponseData responseData = new InitProducerIdResponseData().setErrorCode(Errors.NONE.code()).setProducerEpoch(epoch).setProducerId(producerId).setThrottleTimeMs(0);
        this.client.prepareResponse(body -> {
            InitProducerIdRequest initProducerIdRequest = (InitProducerIdRequest)body;
            Assertions.assertNull((Object)initProducerIdRequest.data().transactionalId());
            return true;
        }, (AbstractResponse)new InitProducerIdResponse(responseData), false);
        this.runUntil(() -> ((TransactionManager)this.transactionManager).hasProducerId());
    }

    private void doInitTransactions() {
        this.doInitTransactions(13131L, (short)1);
    }

    private void doInitTransactions(long producerId, short epoch) {
        TransactionalRequestResult result = this.transactionManager.initializeTransactions();
        this.prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, "foobar");
        this.runUntil(() -> this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION) != null);
        Assertions.assertEquals((Object)this.brokerNode, (Object)this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
        this.prepareInitPidResponse(Errors.NONE, false, producerId, epoch);
        this.runUntil(() -> ((TransactionManager)this.transactionManager).hasProducerId());
        result.await();
        Assertions.assertTrue((boolean)result.isSuccessful());
        Assertions.assertTrue((boolean)result.isAcked());
    }

    private void assertAbortableError(Class<? extends RuntimeException> cause) {
        try {
            this.transactionManager.beginCommit();
            Assertions.fail((String)("Should have raised " + cause.getSimpleName()));
        }
        catch (KafkaException e) {
            Assertions.assertTrue((boolean)cause.isAssignableFrom(e.getCause().getClass()));
            Assertions.assertTrue((boolean)this.transactionManager.hasError());
        }
        Assertions.assertTrue((boolean)this.transactionManager.hasError());
        this.transactionManager.beginAbort();
        Assertions.assertFalse((boolean)this.transactionManager.hasError());
    }

    private void assertFatalError(Class<? extends RuntimeException> cause) {
        Assertions.assertTrue((boolean)this.transactionManager.hasError());
        try {
            this.transactionManager.beginAbort();
            Assertions.fail((String)("Should have raised " + cause.getSimpleName()));
        }
        catch (KafkaException e) {
            Assertions.assertTrue((boolean)cause.isAssignableFrom(e.getCause().getClass()));
            Assertions.assertTrue((boolean)this.transactionManager.hasError());
        }
        try {
            this.transactionManager.beginAbort();
            Assertions.fail((String)("Should have raised " + cause.getSimpleName()));
        }
        catch (KafkaException e) {
            Assertions.assertTrue((boolean)cause.isAssignableFrom(e.getCause().getClass()));
            Assertions.assertTrue((boolean)this.transactionManager.hasError());
        }
    }

    private void assertProduceFutureFailed(Future<RecordMetadata> future) throws InterruptedException {
        Assertions.assertTrue((boolean)future.isDone());
        try {
            future.get();
            Assertions.fail((String)"Expected produce future to throw");
        }
        catch (ExecutionException executionException) {
            // empty catch block
        }
    }

    private void runUntil(Supplier<Boolean> condition) {
        ProducerTestUtils.runUntil(this.sender, condition);
    }
}

