/*
 * Decompiled with CFR 0.152.
 */
package kafka.api;

import java.io.Serializable;
import java.util.Collection;
import java.util.Map;
import java.util.Properties;
import kafka.api.FixedPortTestUtils$;
import kafka.api.IntegrationTestHarness;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import kafka.server.KafkaServer;
import kafka.utils.ShutdownableThread;
import kafka.utils.TestUtils$;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import scala.Function0;
import scala.Function1;
import scala.Function3;
import scala.Predef$;
import scala.collection.GenTraversableOnce;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableOnce;
import scala.collection.immutable.;
import scala.collection.immutable.List;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Set;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.BufferLike;
import scala.collection.mutable.HashMap;
import scala.collection.mutable.ListBuffer;
import scala.jdk.CollectionConverters$;
import scala.math.Ordering;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.IntRef;
import scala.runtime.RichInt$;
import scala.runtime.java8.JFunction1;

@ScalaSignature(bytes="\u0006\u0001\u0005}e\u0001B\u000e\u001d\u0001\u0005BQA\n\u0001\u0005\u0002\u001dBq!\u000b\u0001C\u0002\u0013%!\u0006\u0003\u00042\u0001\u0001\u0006Ia\u000b\u0005\be\u0001\u0011\r\u0011\"\u0003+\u0011\u0019\u0019\u0004\u0001)A\u0005W!9A\u0007\u0001b\u0001\n\u0013Q\u0003BB\u001b\u0001A\u0003%1\u0006C\u00047\u0001\t\u0007I\u0011B\u001c\t\r\u0001\u0003\u0001\u0015!\u00039\u0011\u001d\t\u0005A1A\u0005\n]BaA\u0011\u0001!\u0002\u0013A\u0004bB\"\u0001\u0005\u0004%\t\u0001\u0012\u0005\u0007\u0017\u0002\u0001\u000b\u0011B#\t\u000b1\u0003A\u0011I'\t\u000bi\u0003A\u0011\u000b\u0016\t\u000bm\u0003A\u0011\u0001/\t\u000b1\u0004A\u0011\u0001/\t\u000b9\u0004A\u0011B8\t\u000f\u00055\u0002\u0001\"\u0003\u00020!9\u0011Q\u0007\u0001\u0005\n\u0005]\u0002\"CA/\u0001E\u0005I\u0011BA0\u0011\u001d\t)\b\u0001C\u0005\u0003o2a!!\"\u0001\t\u0005\u001d\u0005B\u0002\u0014\u0018\t\u0003\t)\n\u0003\u0004\u0002\u001c^!\t\u0005\u0018\u0005\u0007\u0003;;B\u0011\t/\u0003-Q\u0013\u0018M\\:bGRLwN\\:C_Vt7-\u001a+fgRT!!\b\u0010\u0002\u0007\u0005\u0004\u0018NC\u0001 \u0003\u0015Y\u0017MZ6b\u0007\u0001\u0019\"\u0001\u0001\u0012\u0011\u0005\r\"S\"\u0001\u000f\n\u0005\u0015b\"AF%oi\u0016<'/\u0019;j_:$Vm\u001d;ICJtWm]:\u0002\rqJg.\u001b;?)\u0005A\u0003CA\u0012\u0001\u0003I\u0001(o\u001c3vG\u0016\u0014()\u001e4gKJ\u001c\u0016N_3\u0016\u0003-\u0002\"\u0001L\u0018\u000e\u00035R\u0011AL\u0001\u0006g\u000e\fG.Y\u0005\u0003a5\u00121!\u00138u\u0003M\u0001(o\u001c3vG\u0016\u0014()\u001e4gKJ\u001c\u0016N_3!\u0003U\u0019XM\u001d<fe6+7o]1hK6\u000b\u0007PQ=uKN\fac]3sm\u0016\u0014X*Z:tC\u001e,W*\u0019=CsR,7\u000fI\u0001\u000e]Vl\u0007+\u0019:uSRLwN\\:\u0002\u001d9,X\u000eU1si&$\u0018n\u001c8tA\u0005Yq.\u001e;qkR$v\u000e]5d+\u0005A\u0004CA\u001d?\u001b\u0005Q$BA\u001e=\u0003\u0011a\u0017M\\4\u000b\u0003u\nAA[1wC&\u0011qH\u000f\u0002\u0007'R\u0014\u0018N\\4\u0002\u0019=,H\u000f];u)>\u0004\u0018n\u0019\u0011\u0002\u0015%t\u0007/\u001e;U_BL7-A\u0006j]B,H\u000fV8qS\u000e\u0004\u0013aD8wKJ\u0014\u0018\u000eZ5oOB\u0013x\u000e]:\u0016\u0003\u0015\u0003\"AR%\u000e\u0003\u001dS!\u0001\u0013\u001f\u0002\tU$\u0018\u000e\\\u0005\u0003\u0015\u001e\u0013!\u0002\u0015:pa\u0016\u0014H/[3t\u0003Ayg/\u001a:sS\u0012Lgn\u001a)s_B\u001c\b%A\bhK:,'/\u0019;f\u0007>tg-[4t+\u0005q\u0005cA(S)6\t\u0001K\u0003\u0002R[\u0005Q1m\u001c7mK\u000e$\u0018n\u001c8\n\u0005M\u0003&aA*fcB\u0011Q\u000bW\u0007\u0002-*\u0011qKH\u0001\u0007g\u0016\u0014h/\u001a:\n\u0005e3&aC&bM.\f7i\u001c8gS\u001e\f1B\u0019:pW\u0016\u00148i\\;oi\u0006yA/Z:u/&$\bn\u0012:pkBLE\rF\u0001^!\tac,\u0003\u0002`[\t!QK\\5uQ\t\u0001\u0012\r\u0005\u0002cU6\t1M\u0003\u0002\u001eI*\u0011QMZ\u0001\bUV\u0004\u0018\u000e^3s\u0015\t9\u0007.A\u0003kk:LGOC\u0001j\u0003\ry'oZ\u0005\u0003W\u000e\u0014A\u0001V3ti\u0006)B/Z:u/&$\bn\u0012:pkBlU\r^1eCR\f\u0007FA\tb\u0003E!Xm\u001d;Ce>\\WM\u001d$bS2,(/\u001a\u000b\u0003;BDQ!\u001d\nA\u0002I\faaY8n[&$\b\u0003\u0003\u0017tk\u00065\u0011\u0011E/\n\u0005Ql#!\u0003$v]\u000e$\u0018n\u001c84!\u00191h0!\u0001\u0002\u00025\tqO\u0003\u0002ys\u0006A\u0001O]8ek\u000e,'O\u0003\u0002{w\u000691\r\\5f]R\u001c(BA\u0010}\u0015\ti\b.\u0001\u0004ba\u0006\u001c\u0007.Z\u0005\u0003\u007f^\u0014QbS1gW\u0006\u0004&o\u001c3vG\u0016\u0014\b#\u0002\u0017\u0002\u0004\u0005\u001d\u0011bAA\u0003[\t)\u0011I\u001d:bsB\u0019A&!\u0003\n\u0007\u0005-QF\u0001\u0003CsR,\u0007\u0003BA\b\u0003;qA!!\u0005\u0002\u001aA\u0019\u00111C\u0017\u000e\u0005\u0005U!bAA\fA\u00051AH]8pizJ1!a\u0007.\u0003\u0019\u0001&/\u001a3fM&\u0019q(a\b\u000b\u0007\u0005mQ\u0006\u0005\u0005\u0002$\u0005%\u0012\u0011AA\u0001\u001b\t\t)CC\u0002\u0002(e\f\u0001bY8ogVlWM]\u0005\u0005\u0003W\t)CA\u0007LC\u001a\\\u0017mQ8ogVlWM]\u0001\u001cGJ,\u0017\r^3Ue\u0006t7/Y2uS>t\u0017\r\u001c)s_\u0012,8-\u001a:\u0015\u0007U\f\t\u0004C\u0004\u00024M\u0001\r!!\u0004\u0002\u001fQ\u0014\u0018M\\:bGRLwN\\1m\u0013\u0012\f!d\u0019:fCR,7i\u001c8tk6,'/\u00118e'V\u00147o\u0019:jE\u0016$\u0002\"!\t\u0002:\u0005u\u00121\u000b\u0005\b\u0003w!\u0002\u0019AA\u0007\u0003\u001d9'o\\;q\u0013\u0012Dq!a\u0010\u0015\u0001\u0004\t\t%\u0001\u0004u_BL7m\u001d\t\u0007\u0003\u0007\ni%!\u0004\u000f\t\u0005\u0015\u0013\u0011\n\b\u0005\u0003'\t9%C\u0001/\u0013\r\tY%L\u0001\ba\u0006\u001c7.Y4f\u0013\u0011\ty%!\u0015\u0003\t1K7\u000f\u001e\u0006\u0004\u0003\u0017j\u0003\"CA+)A\u0005\t\u0019AA,\u00035\u0011X-\u00193D_6l\u0017\u000e\u001e;fIB\u0019A&!\u0017\n\u0007\u0005mSFA\u0004C_>dW-\u00198\u0002I\r\u0014X-\u0019;f\u0007>t7/^7fe\u0006sGmU;cg\u000e\u0014\u0018NY3%I\u00164\u0017-\u001e7uIM*\"!!\u0019+\t\u0005]\u00131M\u0016\u0003\u0003K\u0002B!a\u001a\u0002r5\u0011\u0011\u0011\u000e\u0006\u0005\u0003W\ni'A\u0005v]\u000eDWmY6fI*\u0019\u0011qN\u0017\u0002\u0015\u0005tgn\u001c;bi&|g.\u0003\u0003\u0002t\u0005%$!E;oG\",7m[3e-\u0006\u0014\u0018.\u00198dK\u0006a1M]3bi\u0016$v\u000e]5dgR\u0011\u0011\u0011\u0010\t\u0007\u0003w\n\tiK\u0016\u000e\u0005\u0005u$bAA@!\u0006I\u0011.\\7vi\u0006\u0014G.Z\u0005\u0005\u0003\u0007\u000biHA\u0002NCB\u0014qBQ8v]\u000e,7k\u00195fIVdWM]\n\u0004/\u0005%\u0005\u0003BAF\u0003#k!!!$\u000b\u0007\u0005=e$A\u0003vi&d7/\u0003\u0003\u0002\u0014\u00065%AE*ikR$wn\u001e8bE2,G\u000b\u001b:fC\u0012$\"!a&\u0011\u0007\u0005eu#D\u0001\u0001\u0003\u0019!wnV8sW\u0006A1\u000f[;uI><h\u000e")
public class TransactionsBounceTest
extends IntegrationTestHarness {
    private final int producerBufferSize;
    private final int serverMessageMaxBytes = this.producerBufferSize() / 2;
    private final int kafka$api$TransactionsBounceTest$$numPartitions;
    private final String kafka$api$TransactionsBounceTest$$outputTopic;
    private final String inputTopic;
    private final Properties overridingProps = new Properties();

    private int producerBufferSize() {
        return this.producerBufferSize;
    }

    private int serverMessageMaxBytes() {
        return this.serverMessageMaxBytes;
    }

    public int kafka$api$TransactionsBounceTest$$numPartitions() {
        return this.kafka$api$TransactionsBounceTest$$numPartitions;
    }

    public String kafka$api$TransactionsBounceTest$$outputTopic() {
        return this.kafka$api$TransactionsBounceTest$$outputTopic;
    }

    private String inputTopic() {
        return this.inputTopic;
    }

    public Properties overridingProps() {
        return this.overridingProps;
    }

    @Override
    public Seq<KafkaConfig> generateConfigs() {
        return (Seq)FixedPortTestUtils$.MODULE$.createBrokerConfigs(this.brokerCount(), this.zkConnect(), true, FixedPortTestUtils$.MODULE$.createBrokerConfigs$default$4()).map((Function1 & Serializable & scala.Serializable)x$1 -> KafkaConfig$.MODULE$.fromProps(x$1, this.overridingProps()), Seq$.MODULE$.canBuildFrom());
    }

    @Override
    public int brokerCount() {
        return 4;
    }

    @Test
    public void testWithGroupId() {
        String testBrokerFailure_consumerGroup = "myGroup";
        int testBrokerFailure_numInputRecords = 10000;
        this.createTopics();
        TestUtils$.MODULE$.seedTopicWithNumberedRecords(this.inputTopic(), testBrokerFailure_numInputRecords, (Seq<KafkaServer>)this.servers());
        KafkaConsumer<byte[], byte[]> testBrokerFailure_consumer = this.createConsumerAndSubscribe(testBrokerFailure_consumerGroup, (List<String>)new .colon.colon((Object)this.inputTopic(), (List)Nil$.MODULE$), this.createConsumerAndSubscribe$default$3());
        KafkaProducer<byte[], byte[]> testBrokerFailure_producer = this.createTransactionalProducer("test-txn");
        testBrokerFailure_producer.initTransactions();
        BounceScheduler testBrokerFailure_scheduler = new BounceScheduler();
        testBrokerFailure_scheduler.start();
        try {
            IntRef testBrokerFailure_numMessagesProcessed = IntRef.create((int)0);
            IntRef testBrokerFailure_iteration = IntRef.create((int)0);
            while (testBrokerFailure_numMessagesProcessed.elem < testBrokerFailure_numInputRecords) {
                int testBrokerFailure_toRead = Math.min(200, testBrokerFailure_numInputRecords - testBrokerFailure_numMessagesProcessed.elem);
                this.trace((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(46).append(iteration$1.elem).append(": About to read ").append(testBrokerFailure_toRead).append(" messages, processed ").append(numMessagesProcessed$1.elem).append(" so far..").toString());
                Seq<ConsumerRecord<byte[], byte[]>> testBrokerFailure_records = TestUtils$.MODULE$.pollUntilAtLeastNumRecords(testBrokerFailure_consumer, testBrokerFailure_toRead, TestUtils$.MODULE$.pollUntilAtLeastNumRecords$default$3());
                this.trace((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(52).append("Received ").append(testBrokerFailure_records.size()).append(" messages, sending them transactionally to ").append(this.kafka$api$TransactionsBounceTest$$outputTopic()).toString());
                testBrokerFailure_producer.beginTransaction();
                boolean testBrokerFailure_shouldAbort = testBrokerFailure_iteration.elem % 3 == 0;
                testBrokerFailure_records.foreach((Function1 & Serializable & scala.Serializable)record -> testBrokerFailure_producer.send(TestUtils$.MODULE$.producerRecordWithExpectedTransactionStatus(this.kafka$api$TransactionsBounceTest$$outputTopic(), null, (byte[])record.key(), (byte[])record.value(), !testBrokerFailure_shouldAbort), (Callback)new ErrorLoggingCallback(this.kafka$api$TransactionsBounceTest$$outputTopic(), (byte[])record.key(), (byte[])record.value(), true)));
                this.trace((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(35).append("Sent ").append(testBrokerFailure_records.size()).append(" messages. Committing offsets.").toString());
                testBrokerFailure_producer.sendOffsetsToTransaction((Map)CollectionConverters$.MODULE$.mapAsJavaMapConverter(TestUtils$.MODULE$.consumerPositions((KafkaConsumer<byte[], byte[]>)testBrokerFailure_consumer)).asJava(), testBrokerFailure_consumerGroup);
                if (testBrokerFailure_shouldAbort) {
                    this.trace((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(53).append("Committed offsets. Aborting transaction of ").append(testBrokerFailure_records.size()).append(" messages.").toString());
                    testBrokerFailure_producer.abortTransaction();
                    TestUtils$.MODULE$.resetToCommittedPositions(testBrokerFailure_consumer);
                } else {
                    this.trace((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(55).append("Committed offsets. committing transaction of ").append(testBrokerFailure_records.size()).append(" messages.").toString());
                    testBrokerFailure_producer.commitTransaction();
                    testBrokerFailure_numMessagesProcessed.elem += testBrokerFailure_records.size();
                }
                ++testBrokerFailure_iteration.elem;
            }
        }
        finally {
            testBrokerFailure_scheduler.shutdown();
        }
        KafkaConsumer<byte[], byte[]> testBrokerFailure_verifyingConsumer = this.createConsumerAndSubscribe("randomGroup", (List<String>)new .colon.colon((Object)this.kafka$api$TransactionsBounceTest$$outputTopic(), (List)Nil$.MODULE$), true);
        HashMap testBrokerFailure_recordsByPartition = new HashMap();
        TestUtils$.MODULE$.pollUntilAtLeastNumRecords(testBrokerFailure_verifyingConsumer, testBrokerFailure_numInputRecords, TestUtils$.MODULE$.pollUntilAtLeastNumRecords$default$3()).foreach((Function1 & Serializable & scala.Serializable)record -> {
            TransactionsBounceTest.$anonfun$testBrokerFailure$7(testBrokerFailure_recordsByPartition, record);
            return BoxedUnit.UNIT;
        });
        ListBuffer testBrokerFailure_outputRecords = new ListBuffer();
        testBrokerFailure_recordsByPartition.values().foreach((Function1 & Serializable & scala.Serializable)partitionValues -> {
            TransactionsBounceTest.$anonfun$testBrokerFailure$9(testBrokerFailure_outputRecords, partitionValues);
            return BoxedUnit.UNIT;
        });
        Set testBrokerFailure_recordSet = testBrokerFailure_outputRecords.toSet();
        Assertions.assertEquals((int)testBrokerFailure_numInputRecords, (int)testBrokerFailure_recordSet.size());
        Set testBrokerFailure_expectedValues = RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), testBrokerFailure_numInputRecords).toSet();
        Assertions.assertEquals((Object)testBrokerFailure_expectedValues, (Object)testBrokerFailure_recordSet, (String)new StringBuilder(18).append("Missing messages: ").append(testBrokerFailure_expectedValues.$minus$minus((GenTraversableOnce)testBrokerFailure_recordSet)).toString());
    }

    @Test
    public void testWithGroupMetadata() {
        String testBrokerFailure_consumerGroup = "myGroup";
        int testBrokerFailure_numInputRecords = 10000;
        this.createTopics();
        TestUtils$.MODULE$.seedTopicWithNumberedRecords(this.inputTopic(), testBrokerFailure_numInputRecords, (Seq<KafkaServer>)this.servers());
        KafkaConsumer<byte[], byte[]> testBrokerFailure_consumer = this.createConsumerAndSubscribe(testBrokerFailure_consumerGroup, (List<String>)new .colon.colon((Object)this.inputTopic(), (List)Nil$.MODULE$), this.createConsumerAndSubscribe$default$3());
        KafkaProducer<byte[], byte[]> testBrokerFailure_producer = this.createTransactionalProducer("test-txn");
        testBrokerFailure_producer.initTransactions();
        BounceScheduler testBrokerFailure_scheduler = new BounceScheduler();
        testBrokerFailure_scheduler.start();
        try {
            IntRef testBrokerFailure_numMessagesProcessed = IntRef.create((int)0);
            IntRef testBrokerFailure_iteration = IntRef.create((int)0);
            while (testBrokerFailure_numMessagesProcessed.elem < testBrokerFailure_numInputRecords) {
                int testBrokerFailure_toRead = Math.min(200, testBrokerFailure_numInputRecords - testBrokerFailure_numMessagesProcessed.elem);
                this.trace((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(46).append(iteration$1.elem).append(": About to read ").append(testBrokerFailure_toRead).append(" messages, processed ").append(numMessagesProcessed$1.elem).append(" so far..").toString());
                Seq<ConsumerRecord<byte[], byte[]>> testBrokerFailure_records = TestUtils$.MODULE$.pollUntilAtLeastNumRecords(testBrokerFailure_consumer, testBrokerFailure_toRead, TestUtils$.MODULE$.pollUntilAtLeastNumRecords$default$3());
                this.trace((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(52).append("Received ").append(testBrokerFailure_records.size()).append(" messages, sending them transactionally to ").append(this.kafka$api$TransactionsBounceTest$$outputTopic()).toString());
                testBrokerFailure_producer.beginTransaction();
                boolean testBrokerFailure_shouldAbort = testBrokerFailure_iteration.elem % 3 == 0;
                testBrokerFailure_records.foreach((Function1 & Serializable & scala.Serializable)record -> testBrokerFailure_producer.send(TestUtils$.MODULE$.producerRecordWithExpectedTransactionStatus(this.kafka$api$TransactionsBounceTest$$outputTopic(), null, (byte[])record.key(), (byte[])record.value(), !testBrokerFailure_shouldAbort), (Callback)new ErrorLoggingCallback(this.kafka$api$TransactionsBounceTest$$outputTopic(), (byte[])record.key(), (byte[])record.value(), true)));
                this.trace((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(35).append("Sent ").append(testBrokerFailure_records.size()).append(" messages. Committing offsets.").toString());
                testBrokerFailure_producer.sendOffsetsToTransaction((Map)CollectionConverters$.MODULE$.mapAsJavaMapConverter(TestUtils$.MODULE$.consumerPositions((KafkaConsumer<byte[], byte[]>)testBrokerFailure_consumer)).asJava(), testBrokerFailure_consumer.groupMetadata());
                if (testBrokerFailure_shouldAbort) {
                    this.trace((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(53).append("Committed offsets. Aborting transaction of ").append(testBrokerFailure_records.size()).append(" messages.").toString());
                    testBrokerFailure_producer.abortTransaction();
                    TestUtils$.MODULE$.resetToCommittedPositions(testBrokerFailure_consumer);
                } else {
                    this.trace((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(55).append("Committed offsets. committing transaction of ").append(testBrokerFailure_records.size()).append(" messages.").toString());
                    testBrokerFailure_producer.commitTransaction();
                    testBrokerFailure_numMessagesProcessed.elem += testBrokerFailure_records.size();
                }
                ++testBrokerFailure_iteration.elem;
            }
        }
        finally {
            testBrokerFailure_scheduler.shutdown();
        }
        KafkaConsumer<byte[], byte[]> testBrokerFailure_verifyingConsumer = this.createConsumerAndSubscribe("randomGroup", (List<String>)new .colon.colon((Object)this.kafka$api$TransactionsBounceTest$$outputTopic(), (List)Nil$.MODULE$), true);
        HashMap testBrokerFailure_recordsByPartition = new HashMap();
        TestUtils$.MODULE$.pollUntilAtLeastNumRecords(testBrokerFailure_verifyingConsumer, testBrokerFailure_numInputRecords, TestUtils$.MODULE$.pollUntilAtLeastNumRecords$default$3()).foreach((Function1 & Serializable & scala.Serializable)record -> {
            TransactionsBounceTest.$anonfun$testBrokerFailure$7(testBrokerFailure_recordsByPartition, record);
            return BoxedUnit.UNIT;
        });
        ListBuffer testBrokerFailure_outputRecords = new ListBuffer();
        testBrokerFailure_recordsByPartition.values().foreach((Function1 & Serializable & scala.Serializable)partitionValues -> {
            TransactionsBounceTest.$anonfun$testBrokerFailure$9(testBrokerFailure_outputRecords, partitionValues);
            return BoxedUnit.UNIT;
        });
        Set testBrokerFailure_recordSet = testBrokerFailure_outputRecords.toSet();
        Assertions.assertEquals((int)testBrokerFailure_numInputRecords, (int)testBrokerFailure_recordSet.size());
        Set testBrokerFailure_expectedValues = RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), testBrokerFailure_numInputRecords).toSet();
        Assertions.assertEquals((Object)testBrokerFailure_expectedValues, (Object)testBrokerFailure_recordSet, (String)new StringBuilder(18).append("Missing messages: ").append(testBrokerFailure_expectedValues.$minus$minus((GenTraversableOnce)testBrokerFailure_recordSet)).toString());
    }

    private void testBrokerFailure(Function3<KafkaProducer<byte[], byte[]>, String, KafkaConsumer<byte[], byte[]>, BoxedUnit> commit) {
        String consumerGroup = "myGroup";
        int numInputRecords = 10000;
        this.createTopics();
        TestUtils$.MODULE$.seedTopicWithNumberedRecords(this.inputTopic(), numInputRecords, (Seq<KafkaServer>)this.servers());
        KafkaConsumer<byte[], byte[]> consumer = this.createConsumerAndSubscribe(consumerGroup, (List<String>)new .colon.colon((Object)this.inputTopic(), (List)Nil$.MODULE$), this.createConsumerAndSubscribe$default$3());
        KafkaProducer<byte[], byte[]> producer = this.createTransactionalProducer("test-txn");
        producer.initTransactions();
        BounceScheduler scheduler = new BounceScheduler();
        scheduler.start();
        try {
            IntRef numMessagesProcessed = IntRef.create((int)0);
            IntRef iteration = IntRef.create((int)0);
            while (numMessagesProcessed.elem < numInputRecords) {
                int toRead = Math.min(200, numInputRecords - numMessagesProcessed.elem);
                this.trace((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(46).append(iteration$1.elem).append(": About to read ").append(testBrokerFailure_toRead).append(" messages, processed ").append(numMessagesProcessed$1.elem).append(" so far..").toString());
                Seq<ConsumerRecord<byte[], byte[]>> records = TestUtils$.MODULE$.pollUntilAtLeastNumRecords(consumer, toRead, TestUtils$.MODULE$.pollUntilAtLeastNumRecords$default$3());
                this.trace((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(52).append("Received ").append(testBrokerFailure_records.size()).append(" messages, sending them transactionally to ").append(this.kafka$api$TransactionsBounceTest$$outputTopic()).toString());
                producer.beginTransaction();
                boolean shouldAbort = iteration.elem % 3 == 0;
                records.foreach((Function1 & Serializable & scala.Serializable)record -> testBrokerFailure_producer.send(TestUtils$.MODULE$.producerRecordWithExpectedTransactionStatus(this.kafka$api$TransactionsBounceTest$$outputTopic(), null, (byte[])record.key(), (byte[])record.value(), !testBrokerFailure_shouldAbort), (Callback)new ErrorLoggingCallback(this.kafka$api$TransactionsBounceTest$$outputTopic(), (byte[])record.key(), (byte[])record.value(), true)));
                this.trace((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(35).append("Sent ").append(testBrokerFailure_records.size()).append(" messages. Committing offsets.").toString());
                commit.apply(producer, (Object)consumerGroup, consumer);
                if (shouldAbort) {
                    this.trace((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(53).append("Committed offsets. Aborting transaction of ").append(testBrokerFailure_records.size()).append(" messages.").toString());
                    producer.abortTransaction();
                    TestUtils$.MODULE$.resetToCommittedPositions(consumer);
                } else {
                    this.trace((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(55).append("Committed offsets. committing transaction of ").append(testBrokerFailure_records.size()).append(" messages.").toString());
                    producer.commitTransaction();
                    numMessagesProcessed.elem += records.size();
                }
                ++iteration.elem;
            }
        }
        finally {
            scheduler.shutdown();
        }
        KafkaConsumer<byte[], byte[]> verifyingConsumer = this.createConsumerAndSubscribe("randomGroup", (List<String>)new .colon.colon((Object)this.kafka$api$TransactionsBounceTest$$outputTopic(), (List)Nil$.MODULE$), true);
        HashMap recordsByPartition = new HashMap();
        TestUtils$.MODULE$.pollUntilAtLeastNumRecords(verifyingConsumer, numInputRecords, TestUtils$.MODULE$.pollUntilAtLeastNumRecords$default$3()).foreach((Function1 & Serializable & scala.Serializable)record -> {
            TransactionsBounceTest.$anonfun$testBrokerFailure$7(testBrokerFailure_recordsByPartition, record);
            return BoxedUnit.UNIT;
        });
        ListBuffer outputRecords = new ListBuffer();
        recordsByPartition.values().foreach((Function1 & Serializable & scala.Serializable)partitionValues -> {
            TransactionsBounceTest.$anonfun$testBrokerFailure$9(testBrokerFailure_outputRecords, partitionValues);
            return BoxedUnit.UNIT;
        });
        Set recordSet = outputRecords.toSet();
        Assertions.assertEquals((int)numInputRecords, (int)recordSet.size());
        Set expectedValues = RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), numInputRecords).toSet();
        Assertions.assertEquals((Object)expectedValues, (Object)recordSet, (String)new StringBuilder(18).append("Missing messages: ").append(expectedValues.$minus$minus((GenTraversableOnce)recordSet)).toString());
    }

    private KafkaProducer<byte[], byte[]> createTransactionalProducer(String transactionalId) {
        Properties props = new Properties();
        props.put("acks", "all");
        props.put("batch.size", "512");
        props.put("transactional.id", transactionalId);
        props.put("enable.idempotence", "true");
        ByteArraySerializer x$2 = this.createProducer$default$1();
        ByteArraySerializer x$3 = this.createProducer$default$2();
        return this.createProducer(x$2, x$3, props);
    }

    private KafkaConsumer<byte[], byte[]> createConsumerAndSubscribe(String groupId, List<String> topics, boolean readCommitted) {
        Properties consumerProps = new Properties();
        consumerProps.put("group.id", groupId);
        consumerProps.put("enable.auto.commit", "false");
        consumerProps.put("isolation.level", readCommitted ? "read_committed" : "read_uncommitted");
        ByteArrayDeserializer x$2 = this.createConsumer$default$1();
        ByteArrayDeserializer x$3 = this.createConsumer$default$2();
        List<String> x$4 = this.createConsumer$default$4();
        KafkaConsumer consumer = this.createConsumer(x$2, x$3, consumerProps, x$4);
        consumer.subscribe((Collection)CollectionConverters$.MODULE$.seqAsJavaListConverter(topics).asJava());
        return consumer;
    }

    private boolean createConsumerAndSubscribe$default$3() {
        return false;
    }

    private scala.collection.immutable.Map<Object, Object> createTopics() {
        Properties topicConfig = new Properties();
        topicConfig.put(KafkaConfig$.MODULE$.MinInSyncReplicasProp(), Integer.toString(2));
        this.createTopic(this.inputTopic(), this.kafka$api$TransactionsBounceTest$$numPartitions(), 3, topicConfig);
        return this.createTopic(this.kafka$api$TransactionsBounceTest$$outputTopic(), this.kafka$api$TransactionsBounceTest$$numPartitions(), 3, topicConfig);
    }

    public static final /* synthetic */ void $anonfun$testBrokerFailure$7(HashMap recordsByPartition$1, ConsumerRecord record) {
        int value = new StringOps(Predef$.MODULE$.augmentString(TestUtils$.MODULE$.assertCommittedAndGetValue((ConsumerRecord<byte[], byte[]>)record))).toInt();
        TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition());
        ((BufferLike)recordsByPartition$1.getOrElseUpdate((Object)topicPartition, (Function0 & Serializable & scala.Serializable)() -> new ListBuffer())).append((Seq)Predef$.MODULE$.wrapIntArray(new int[]{value}));
    }

    public static final /* synthetic */ void $anonfun$testBrokerFailure$9(ListBuffer outputRecords$1, ListBuffer partitionValues) {
        Assertions.assertEquals((Object)partitionValues, (Object)partitionValues.sorted((Ordering)Ordering.Int$.MODULE$), (String)"Out of order messages detected");
        outputRecords$1.appendAll((TraversableOnce)partitionValues);
    }

    public TransactionsBounceTest() {
        this.producerBufferSize = 65536;
        this.kafka$api$TransactionsBounceTest$$numPartitions = 3;
        this.kafka$api$TransactionsBounceTest$$outputTopic = "output-topic";
        this.inputTopic = "input-topic";
        this.overridingProps().put(KafkaConfig$.MODULE$.AutoCreateTopicsEnableProp(), Boolean.toString(false));
        this.overridingProps().put(KafkaConfig$.MODULE$.MessageMaxBytesProp(), Integer.toString(this.serverMessageMaxBytes()));
        this.overridingProps().put(KafkaConfig$.MODULE$.ControlledShutdownEnableProp(), Boolean.toString(true));
        this.overridingProps().put(KafkaConfig$.MODULE$.UncleanLeaderElectionEnableProp(), Boolean.toString(false));
        this.overridingProps().put(KafkaConfig$.MODULE$.AutoLeaderRebalanceEnableProp(), Boolean.toString(false));
        this.overridingProps().put(KafkaConfig$.MODULE$.OffsetsTopicPartitionsProp(), Integer.toString(1));
        this.overridingProps().put(KafkaConfig$.MODULE$.OffsetsTopicReplicationFactorProp(), Integer.toString(3));
        this.overridingProps().put(KafkaConfig$.MODULE$.MinInSyncReplicasProp(), Integer.toString(2));
        this.overridingProps().put(KafkaConfig$.MODULE$.TransactionsTopicPartitionsProp(), Integer.toString(1));
        this.overridingProps().put(KafkaConfig$.MODULE$.TransactionsTopicReplicationFactorProp(), Integer.toString(3));
        this.overridingProps().put(KafkaConfig$.MODULE$.GroupMinSessionTimeoutMsProp(), "10");
        this.overridingProps().put(KafkaConfig$.MODULE$.GroupInitialRebalanceDelayMsProp(), "0");
    }

    public static final /* synthetic */ Object $anonfun$testWithGroupId$1$adapted(KafkaProducer producer, String groupId, KafkaConsumer consumer) {
        producer.sendOffsetsToTransaction((Map)CollectionConverters$.MODULE$.mapAsJavaMapConverter(TestUtils$.MODULE$.consumerPositions((KafkaConsumer<byte[], byte[]>)consumer)).asJava(), groupId);
        return BoxedUnit.UNIT;
    }

    public static final /* synthetic */ Object $anonfun$testWithGroupMetadata$1$adapted(KafkaProducer producer, String x$2, KafkaConsumer consumer) {
        producer.sendOffsetsToTransaction((Map)CollectionConverters$.MODULE$.mapAsJavaMapConverter(TestUtils$.MODULE$.consumerPositions((KafkaConsumer<byte[], byte[]>)consumer)).asJava(), consumer.groupMetadata());
        return BoxedUnit.UNIT;
    }

    private class BounceScheduler
    extends ShutdownableThread {
        public void doWork() {
            this.kafka$api$TransactionsBounceTest$BounceScheduler$$$outer().servers().foreach((Function1 & Serializable & scala.Serializable)server -> {
                BounceScheduler.$anonfun$doWork$1(this, server);
                return BoxedUnit.UNIT;
            });
            RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), this.kafka$api$TransactionsBounceTest$BounceScheduler$$$outer().kafka$api$TransactionsBounceTest$$numPartitions()).foreach((Function1)(JFunction1.mcII.sp & Serializable & scala.Serializable)partition -> TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged(this.kafka$api$TransactionsBounceTest$BounceScheduler$$$outer().zkClient(), this.kafka$api$TransactionsBounceTest$BounceScheduler$$$outer().kafka$api$TransactionsBounceTest$$outputTopic(), partition, TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$4(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$5(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$6()));
        }

        public void shutdown() {
            super.shutdown();
        }

        public /* synthetic */ TransactionsBounceTest kafka$api$TransactionsBounceTest$BounceScheduler$$$outer() {
            return TransactionsBounceTest.this;
        }

        public static final /* synthetic */ void $anonfun$doWork$1(BounceScheduler $this, KafkaServer server) {
            $this.trace((Function0 & Serializable & scala.Serializable)() -> new StringOps(Predef$.MODULE$.augmentString("Shutting down server : %s")).format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)server.config().brokerId())})));
            server.shutdown();
            server.awaitShutdown();
            Thread.sleep(500L);
            $this.trace((Function0 & Serializable & scala.Serializable)() -> new StringOps(Predef$.MODULE$.augmentString("Server %s shut down. Starting it up again.")).format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)server.config().brokerId())})));
            server.startup();
            $this.trace((Function0 & Serializable & scala.Serializable)() -> new StringOps(Predef$.MODULE$.augmentString("Restarted server: %s")).format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)server.config().brokerId())})));
            Thread.sleep(500L);
        }

        public BounceScheduler() {
            if (TransactionsBounceTest.this == null) {
                throw null;
            }
            super("daemon-broker-bouncer", false);
        }
    }
}

