package org.apache.apex.malhar.kafka;

import com.datatorrent.api.Attribute;
import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.LocalMode;
import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.lib.helper.OperatorContextTestHelper;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
import org.apache.apex.malhar.kafka.AbstractKafkaInputOperator;
import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
import org.apache.commons.io.FileUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/apex/malhar/kafka/KafkaOutputOperatorTest.class */
public class KafkaOutputOperatorTest extends KafkaOperatorTestBase {
    String testName;
    private final String VALUE_DESERIALIZER = "org.apache.apex.malhar.kafka.KafkaHelper";
    private final String VALUE_SERIALIZER = "org.apache.apex.malhar.kafka.KafkaHelper";
    private static List<Person> tupleCollection = new LinkedList();
    public static String APPLICATION_PATH = baseDir + File.separator + "MyKafkaApp" + File.separator;

    /* loaded from: input_file:org/apache/apex/malhar/kafka/KafkaOutputOperatorTest$CollectorInputPort.class */
    public static class CollectorInputPort extends DefaultInputPort<byte[]> {
        CollectorModule ownerNode;

        CollectorInputPort(CollectorModule collectorModule) {
            this.ownerNode = collectorModule;
        }

        public void process(byte[] bArr) {
            KafkaOutputOperatorTest.tupleCollection.add(new KafkaHelper().m0deserialize("r", bArr));
        }
    }

    /* loaded from: input_file:org/apache/apex/malhar/kafka/KafkaOutputOperatorTest$CollectorModule.class */
    public static class CollectorModule extends BaseOperator {
        public final transient CollectorInputPort inputPort = new CollectorInputPort(this);
        long currentWindowId;
        long operatorId;

        public void setup(Context.OperatorContext operatorContext) {
            super.setup(operatorContext);
            this.operatorId = operatorContext.getId();
        }

        public void beginWindow(long j) {
            super.beginWindow(j);
            this.currentWindowId = j;
        }

        public void endWindow() {
            super.endWindow();
        }
    }

    /* loaded from: input_file:org/apache/apex/malhar/kafka/KafkaOutputOperatorTest$Person.class */
    public static class Person {
        public String name;
        public Integer age;

        public Person(String str, Integer num) {
            this.name = str;
            this.age = num;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            Person person = (Person) obj;
            if (this.name != null) {
                if (!this.name.equals(person.name)) {
                    return false;
                }
            } else if (person.name != null) {
                return false;
            }
            return this.age != null ? this.age.equals(person.age) : person.age == null;
        }

        public int hashCode() {
            return (31 * (this.name != null ? this.name.hashCode() : 0)) + (this.age != null ? this.age.hashCode() : 0);
        }

        public String toString() {
            return this.name + this.age.toString();
        }
    }

    @Before
    public void before() {
        FileUtils.deleteQuietly(new File(APPLICATION_PATH));
        StringBuilder append = new StringBuilder().append(KafkaOperatorTestBase.TEST_TOPIC);
        int i = testCounter;
        testCounter = i + 1;
        this.testName = append.append(i).toString();
        createTopic(0, this.testName);
        if (this.hasMultiCluster) {
            createTopic(1, this.testName);
        }
    }

    @After
    public void after() {
        FileUtils.deleteQuietly(new File(APPLICATION_PATH));
    }

    @Test
    public void testExactlyOnceWithFailure() throws Exception {
        List<Person> GenerateList = GenerateList();
        sendDataToKafka(true, GenerateList, true, false);
        Assert.assertTrue("With Failure", compare(ReadFromKafka(), GenerateList));
    }

    @Test
    public void testExactlyOnceWithNoFailure() throws Exception {
        List<Person> GenerateList = GenerateList();
        sendDataToKafka(true, GenerateList, false, false);
        Assert.assertTrue("With No Failure", compare(ReadFromKafka(), GenerateList));
    }

    @Test
    public void testExactlyOnceWithDifferentTuplesAfterRecovery() throws Exception {
        try {
            sendDataToKafka(true, GenerateList(), true, true);
            Assert.assertTrue("Wrong tuples during replay, should throw exception", false);
        } catch (RuntimeException e) {
            boolean z = false;
            if (e.getMessage().contains("Violates")) {
                z = true;
            }
            Assert.assertTrue("Different tuples after recovery", z);
        }
    }

    @Test
    public void testKafkaOutput() throws Exception {
        List<Person> GenerateList = GenerateList();
        sendDataToKafka(false, GenerateList, false, false);
        Assert.assertTrue("No failure", compare(ReadFromKafka(), GenerateList));
    }

    @Test
    public void testKafkaOutputWithFailure() throws Exception {
        List<Person> GenerateList = GenerateList();
        sendDataToKafka(false, GenerateList, true, true);
        Assert.assertTrue("No failure", ReadFromKafka().size() > GenerateList.size());
    }

    private void sendDataToKafka(boolean z, List<Person> list, boolean z2, boolean z3) throws InterruptedException {
        DefaultInputPort defaultInputPort;
        KafkaSinglePortExactlyOnceOutputOperator<Person> kafkaSinglePortExactlyOnceOutputOperator;
        Properties properties = new Properties();
        properties.put("value.serializer", "org.apache.apex.malhar.kafka.KafkaHelper");
        if (!z) {
            properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        }
        properties.put("bootstrap.servers", getClusterConfig());
        properties.put("value.deserializer", "org.apache.apex.malhar.kafka.KafkaHelper");
        Attribute.AttributeMap.DefaultAttributeMap defaultAttributeMap = new Attribute.AttributeMap.DefaultAttributeMap();
        defaultAttributeMap.put(Context.DAGContext.APPLICATION_NAME, "MyKafkaApp");
        defaultAttributeMap.put(DAG.APPLICATION_PATH, APPLICATION_PATH);
        Context.OperatorContext mockOperatorContext = OperatorContextTestHelper.mockOperatorContext(2, defaultAttributeMap);
        cleanUp(mockOperatorContext);
        if (z) {
            KafkaSinglePortExactlyOnceOutputOperator<Person> ResetKafkaOutput = ResetKafkaOutput(this.testName, properties, mockOperatorContext);
            defaultInputPort = ResetKafkaOutput.inputPort;
            kafkaSinglePortExactlyOnceOutputOperator = ResetKafkaOutput;
        } else {
            KafkaSinglePortExactlyOnceOutputOperator<Person> ResetKafkaSimpleOutput = ResetKafkaSimpleOutput(this.testName, properties, mockOperatorContext);
            defaultInputPort = ((KafkaSinglePortOutputOperator) ResetKafkaSimpleOutput).inputPort;
            kafkaSinglePortExactlyOnceOutputOperator = ResetKafkaSimpleOutput;
        }
        kafkaSinglePortExactlyOnceOutputOperator.beginWindow(1L);
        defaultInputPort.getSink().put(list.get(0));
        defaultInputPort.getSink().put(list.get(1));
        defaultInputPort.getSink().put(list.get(2));
        kafkaSinglePortExactlyOnceOutputOperator.endWindow();
        kafkaSinglePortExactlyOnceOutputOperator.beginWindow(2L);
        defaultInputPort.getSink().put(list.get(3));
        defaultInputPort.getSink().put(list.get(4));
        defaultInputPort.getSink().put(list.get(5));
        kafkaSinglePortExactlyOnceOutputOperator.endWindow();
        kafkaSinglePortExactlyOnceOutputOperator.beginWindow(3L);
        defaultInputPort.getSink().put(list.get(6));
        defaultInputPort.getSink().put(list.get(7));
        if (z2) {
            if (z) {
                KafkaSinglePortExactlyOnceOutputOperator<Person> ResetKafkaOutput2 = ResetKafkaOutput(this.testName, properties, mockOperatorContext);
                defaultInputPort = ResetKafkaOutput2.inputPort;
                kafkaSinglePortExactlyOnceOutputOperator = ResetKafkaOutput2;
            } else {
                KafkaSinglePortExactlyOnceOutputOperator<Person> ResetKafkaSimpleOutput2 = ResetKafkaSimpleOutput(this.testName, properties, mockOperatorContext);
                defaultInputPort = ((KafkaSinglePortOutputOperator) ResetKafkaSimpleOutput2).inputPort;
                kafkaSinglePortExactlyOnceOutputOperator = ResetKafkaSimpleOutput2;
            }
            kafkaSinglePortExactlyOnceOutputOperator.beginWindow(2L);
            defaultInputPort.getSink().put(list.get(3));
            defaultInputPort.getSink().put(list.get(4));
            defaultInputPort.getSink().put(list.get(5));
            kafkaSinglePortExactlyOnceOutputOperator.endWindow();
            kafkaSinglePortExactlyOnceOutputOperator.beginWindow(3L);
            defaultInputPort.getSink().put(list.get(6));
            if (!z3) {
                defaultInputPort.getSink().put(list.get(7));
            }
        }
        defaultInputPort.getSink().put(list.get(8));
        defaultInputPort.getSink().put(list.get(9));
        kafkaSinglePortExactlyOnceOutputOperator.endWindow();
        kafkaSinglePortExactlyOnceOutputOperator.beginWindow(4L);
        defaultInputPort.getSink().put(list.get(10));
        defaultInputPort.getSink().put(list.get(11));
        kafkaSinglePortExactlyOnceOutputOperator.endWindow();
        cleanUp(mockOperatorContext);
    }

    private KafkaSinglePortExactlyOnceOutputOperator<Person> ResetKafkaOutput(String str, Properties properties, Context.OperatorContext operatorContext) {
        KafkaSinglePortExactlyOnceOutputOperator<Person> kafkaSinglePortExactlyOnceOutputOperator = new KafkaSinglePortExactlyOnceOutputOperator<>();
        kafkaSinglePortExactlyOnceOutputOperator.setTopic(str);
        kafkaSinglePortExactlyOnceOutputOperator.setProperties(properties);
        kafkaSinglePortExactlyOnceOutputOperator.setup(operatorContext);
        return kafkaSinglePortExactlyOnceOutputOperator;
    }

    private KafkaSinglePortOutputOperator<String, Person> ResetKafkaSimpleOutput(String str, Properties properties, Context.OperatorContext operatorContext) {
        KafkaSinglePortOutputOperator<String, Person> kafkaSinglePortOutputOperator = new KafkaSinglePortOutputOperator<>();
        kafkaSinglePortOutputOperator.setTopic(str);
        kafkaSinglePortOutputOperator.setProperties(properties);
        kafkaSinglePortOutputOperator.setup(operatorContext);
        return kafkaSinglePortOutputOperator;
    }

    private void cleanUp(Context.OperatorContext operatorContext) {
        FSWindowDataManager fSWindowDataManager = new FSWindowDataManager();
        fSWindowDataManager.setup(operatorContext);
        try {
            fSWindowDataManager.committed(fSWindowDataManager.getLargestCompletedWindow());
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    private boolean compare(List<Person> list, List<Person> list2) {
        if (list.size() != list2.size()) {
            return false;
        }
        for (int i = 0; i < list.size(); i++) {
            if (!list.get(i).equals(list2.get(i))) {
                return false;
            }
        }
        return true;
    }

    private String getClusterConfig() {
        return "localhost:" + TEST_KAFKA_BROKER_PORT[0][0] + (this.hasMultiPartition ? ",localhost:" + TEST_KAFKA_BROKER_PORT[0][1] : "") + (this.hasMultiCluster ? ";localhost:" + TEST_KAFKA_BROKER_PORT[1][0] : "") + ((this.hasMultiCluster && this.hasMultiPartition) ? ",localhost:" + TEST_KAFKA_BROKER_PORT[1][1] : "");
    }

    private List<Person> GenerateList() {
        ArrayList arrayList = new ArrayList();
        int i = 0;
        while (true) {
            Integer num = i;
            if (num.intValue() >= 12) {
                return arrayList;
            }
            arrayList.add(new Person(num.toString(), num));
            i = Integer.valueOf(num.intValue() + 1);
        }
    }

    private List<Person> ReadFromKafka() {
        tupleCollection.clear();
        Properties properties = new Properties();
        properties.put("bootstrap.servers", getClusterConfig());
        properties.put("bootstrap.servers", getClusterConfig());
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.apex.malhar.kafka.KafkaHelper");
        properties.put("group.id", "KafkaTest");
        LocalMode newInstance = LocalMode.newInstance();
        DAG dag = newInstance.getDAG();
        KafkaSinglePortInputOperator addOperator = dag.addOperator("Kafka input", KafkaSinglePortInputOperator.class);
        addOperator.setConsumerProps(properties);
        addOperator.setInitialPartitionCount(1);
        addOperator.setTopics(this.testName);
        addOperator.setInitialOffset(AbstractKafkaInputOperator.InitialOffset.EARLIEST.name());
        addOperator.setClusters(getClusterConfig());
        addOperator.setStrategy("one_to_one");
        dag.addStream("Kafka message", addOperator.outputPort, dag.addOperator("collector", new CollectorModule()).inputPort);
        LocalMode.Controller controller = newInstance.getController();
        controller.setHeartbeatMonitoringEnabled(false);
        controller.run(30000L);
        return tupleCollection;
    }
}
