package org.apache.apex.malhar.kafka;

import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.LocalMode;
import com.datatorrent.api.Operator;
import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.stram.StramLocalCluster;
import com.google.common.collect.Lists;
import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.apex.malhar.kafka.AbstractKafkaInputOperator;
import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestWatcher;
import org.junit.runner.Description;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/apex/malhar/kafka/KafkaInputOperatorTest.class */
public class KafkaInputOperatorTest extends KafkaOperatorTestBase {
    private int totalBrokers;
    private String partition;
    private String testName = "";

    @Rule
    public final KafkaTestInfo testInfo = new KafkaTestInfo();
    private static final int scale = 2;
    private static final int totalCount = 20;
    private static final int failureTrigger = 6;
    private static final int tuplesPerWindow = 10;
    private static final int waitTime = 60600;
    private static CountDownLatch latch;
    private static Thread monitorThread;
    public static String APPLICATION_PATH = baseDir + File.separator + StramLocalCluster.class.getName() + File.separator;
    private static final Logger logger = LoggerFactory.getLogger(KafkaInputOperatorTest.class);
    private static List<String> tupleCollection = new LinkedList();
    private static boolean hasFailure = false;
    private static final boolean countDownAll = false;
    private static int k = countDownAll;

    /* loaded from: input_file:org/apache/apex/malhar/kafka/KafkaInputOperatorTest$CollectorModule.class */
    public static class CollectorModule extends BaseOperator {
        long currentWindowId;
        long operatorId;
        public final transient DefaultInputPort<byte[]> inputPort = new DefaultInputPort<byte[]>() { // from class: org.apache.apex.malhar.kafka.KafkaInputOperatorTest.CollectorModule.1
            public void process(byte[] bArr) {
                CollectorModule.this.processTuple(bArr);
            }
        };
        boolean isIdempotentTest = false;
        transient List<String> windowTupleCollector = Lists.newArrayList();
        private transient Map<String, List<String>> tupleCollectedInWindow = new HashMap();
        private int endTuples = KafkaInputOperatorTest.countDownAll;

        public void setup(Context.OperatorContext operatorContext) {
            super.setup(operatorContext);
            this.operatorId = operatorContext.getId();
        }

        public void beginWindow(long j) {
            super.beginWindow(j);
            this.currentWindowId = j;
            this.windowTupleCollector.clear();
            this.endTuples = KafkaInputOperatorTest.countDownAll;
        }

        public void processTuple(byte[] bArr) {
            String str = new String(bArr);
            if (KafkaInputOperatorTest.hasFailure && KafkaInputOperatorTest.access$208() == KafkaInputOperatorTest.failureTrigger) {
                boolean unused = KafkaInputOperatorTest.hasFailure = false;
                throw new RuntimeException();
            }
            if (str.startsWith(KafkaOperatorTestBase.END_TUPLE)) {
                this.endTuples++;
            }
            this.windowTupleCollector.add(str);
        }

        public void endWindow() {
            super.endWindow();
            if (this.isIdempotentTest) {
                String str = this.operatorId + "," + this.currentWindowId;
                List<String> list = this.tupleCollectedInWindow.get(str);
                if (list != null) {
                    Assert.assertEquals("replay messages should be exactly same as previous window", list, this.windowTupleCollector);
                } else {
                    ArrayList newArrayList = Lists.newArrayList();
                    newArrayList.addAll(this.windowTupleCollector);
                    this.tupleCollectedInWindow.put(str, newArrayList);
                }
            }
            this.windowTupleCollector.size();
            KafkaInputOperatorTest.tupleCollection.addAll(this.windowTupleCollector);
            int i = this.endTuples;
            if (KafkaInputOperatorTest.latch != null) {
                Assert.assertTrue("received END_TUPLES more than expected.", KafkaInputOperatorTest.latch.getCount() >= ((long) i));
                while (i > 0) {
                    KafkaInputOperatorTest.latch.countDown();
                    i--;
                }
                if (KafkaInputOperatorTest.latch.getCount() == 0) {
                    try {
                        throw new Operator.ShutdownException();
                    } catch (Throwable th) {
                        KafkaInputOperatorTest.monitorThread.interrupt();
                        throw th;
                    }
                }
            }
        }
    }

    /* loaded from: input_file:org/apache/apex/malhar/kafka/KafkaInputOperatorTest$KafkaTestInfo.class */
    public class KafkaTestInfo extends TestWatcher {
        public Description desc;

        public KafkaTestInfo() {
        }

        public String getDir() {
            return "target/" + this.desc.getClassName() + "/" + this.desc.getMethodName() + "/" + KafkaInputOperatorTest.this.testName;
        }

        protected void starting(Description description) {
            this.desc = description;
        }
    }

    @Parameterized.Parameters(name = "multi-cluster: {0}, multi-partition: {1}, partition: {2}")
    public static Collection<Object[]> testScenario() {
        return Arrays.asList(new Object[]{true, false, "one_to_one"}, new Object[]{true, false, "one_to_many"}, new Object[]{true, true, "one_to_one"}, new Object[]{true, true, "one_to_many"}, new Object[]{false, true, "one_to_one"}, new Object[]{false, true, "one_to_many"}, new Object[]{false, false, "one_to_one"}, new Object[]{false, false, "one_to_many"});
    }

    @Before
    public void before() {
        StringBuilder append = new StringBuilder().append(KafkaOperatorTestBase.TEST_TOPIC);
        int i = testCounter;
        testCounter = i + 1;
        this.testName = append.append(i).toString();
        logger.info("before() test case: {}", this.testName);
        tupleCollection.clear();
        k = countDownAll;
        createTopic(countDownAll, this.testName);
        if (this.hasMultiCluster) {
            createTopic(1, this.testName);
        }
    }

    public KafkaInputOperatorTest(boolean z, boolean z2, String str) {
        this.totalBrokers = countDownAll;
        this.partition = null;
        this.hasMultiCluster = z;
        this.hasMultiPartition = z2;
        this.totalBrokers = (1 + (z2 ? 1 : countDownAll)) * (1 + (z ? 1 : countDownAll));
        this.partition = str;
    }

    @Test
    public void testInputOperator() throws Exception {
        hasFailure = false;
        testInputOperator(false, false);
    }

    @Test
    public void testInputOperatorWithFailure() throws Exception {
        hasFailure = true;
        testInputOperator(true, false);
    }

    @Test
    public void testIdempotentInputOperatorWithFailure() throws Exception {
        hasFailure = true;
        testInputOperator(true, true);
    }

    public void testInputOperator(boolean z, boolean z2) throws Exception {
        latch = new CountDownLatch(this.totalBrokers);
        logger.info("Test Case: name: {}; totalBrokers: {}; hasFailure: {}; hasMultiCluster: {}; hasMultiPartition: {}, partition: {}", new Object[]{this.testName, Integer.valueOf(this.totalBrokers), Boolean.valueOf(z), Boolean.valueOf(this.hasMultiCluster), Boolean.valueOf(this.hasMultiPartition), this.partition});
        KafkaTestProducer kafkaTestProducer = new KafkaTestProducer(this.testName, this.hasMultiPartition, this.hasMultiCluster);
        kafkaTestProducer.setSendCount(totalCount);
        Thread thread = new Thread(kafkaTestProducer);
        thread.start();
        int i = totalCount + this.totalBrokers;
        LocalMode newInstance = LocalMode.newInstance();
        DAG dag = newInstance.getDAG();
        KafkaSinglePortInputOperator kafkaSinglePortInputOperator = (KafkaSinglePortInputOperator) dag.addOperator("Kafka input" + this.testName, KafkaSinglePortInputOperator.class);
        kafkaSinglePortInputOperator.setInitialPartitionCount(1);
        kafkaSinglePortInputOperator.setTopics(this.testName);
        kafkaSinglePortInputOperator.setInitialOffset(AbstractKafkaInputOperator.InitialOffset.EARLIEST.name());
        kafkaSinglePortInputOperator.setClusters(getClusterConfig());
        kafkaSinglePortInputOperator.setStrategy(this.partition);
        if (z2) {
            kafkaSinglePortInputOperator.setWindowDataManager(new FSWindowDataManager());
        }
        CollectorModule addOperator = dag.addOperator("TestMessageCollector", CollectorModule.class);
        addOperator.isIdempotentTest = z2;
        dag.addStream("Kafka message" + this.testName, kafkaSinglePortInputOperator.outputPort, addOperator.inputPort).setLocality(DAG.Locality.CONTAINER_LOCAL);
        if (z) {
            setupHasFailureTest(kafkaSinglePortInputOperator, dag);
        }
        StramLocalCluster controller = newInstance.getController();
        controller.setHeartbeatMonitoringEnabled(false);
        monitorThread = new Thread((Runnable) controller, "master");
        monitorThread.start();
        boolean z3 = true;
        try {
            z3 = latch.await(60600L, TimeUnit.MILLISECONDS);
            controller.shutdown();
            monitorThread.join();
        } catch (Exception e) {
            logger.warn(e.getMessage());
        }
        thread.join();
        if (!z3 || i != tupleCollection.size()) {
            logger.info("Number of received/expected tuples: {}/{}, testName: {}, tuples: \n{}", new Object[]{Integer.valueOf(tupleCollection.size()), Integer.valueOf(i), this.testName, tupleCollection});
        }
        Assert.assertTrue("TIMEOUT. testName: " + this.testName + "; Collected data: " + tupleCollection, z3);
        Assert.assertTrue("testName: " + this.testName + "; Collected tuple size: " + tupleCollection.size() + "; Expected tuple size: " + i + "; data: \n" + tupleCollection, i == tupleCollection.size());
        logger.info("End of test case: {}", this.testName);
    }

    private void setupHasFailureTest(KafkaSinglePortInputOperator kafkaSinglePortInputOperator, DAG dag) {
        kafkaSinglePortInputOperator.setHoldingBufferSize(5000);
        dag.setAttribute(Context.DAGContext.CHECKPOINT_WINDOW_COUNT, 1);
        kafkaSinglePortInputOperator.setMaxTuplesPerWindow(tuplesPerWindow);
    }

    private String getClusterConfig() {
        return "localhost:" + TEST_KAFKA_BROKER_PORT[countDownAll][countDownAll] + (this.hasMultiPartition ? ",localhost:" + TEST_KAFKA_BROKER_PORT[countDownAll][1] : "") + (this.hasMultiCluster ? ";localhost:" + TEST_KAFKA_BROKER_PORT[1][countDownAll] : "") + ((this.hasMultiCluster && this.hasMultiPartition) ? ",localhost:" + TEST_KAFKA_BROKER_PORT[1][1] : "");
    }

    static /* synthetic */ int access$208() {
        int i = k;
        k = i + 1;
        return i;
    }
}
