/*
 * Decompiled with CFR 0.152.
 */
package org.jsmart.zerocode.core.kafka.receive;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.google.inject.name.Named;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Iterator;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.jsmart.zerocode.core.di.provider.ObjectMapperProvider;
import org.jsmart.zerocode.core.kafka.consume.ConsumerLocalConfigs;
import org.jsmart.zerocode.core.kafka.helper.KafkaConsumerHelper;
import org.jsmart.zerocode.core.kafka.helper.KafkaFileRecordHelper;
import org.jsmart.zerocode.core.kafka.receive.ConsumerCommonConfigs;
import org.jsmart.zerocode.core.kafka.receive.message.ConsumerJsonRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
public class KafkaReceiver {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaReceiver.class);
    private final ObjectMapper objectMapper = new ObjectMapperProvider().get();
    @Inject(optional=true)
    @Named(value="kafka.consumer.properties")
    private String consumerPropertyFile;
    @Inject
    private ConsumerCommonConfigs consumerCommonConfigs;

    public String receive(String kafkaServers, String topicName, String requestJsonWithConfig) throws IOException {
        ConsumerLocalConfigs consumerLocalConfigs = KafkaConsumerHelper.readConsumerLocalTestProperties(requestJsonWithConfig);
        ConsumerLocalConfigs effectiveLocal = KafkaConsumerHelper.deriveEffectiveConfigs(consumerLocalConfigs, this.consumerCommonConfigs);
        LOGGER.info("\n### Kafka Consumer Effective configs:{}\n", (Object)effectiveLocal);
        Consumer consumer = KafkaConsumerHelper.createConsumer(kafkaServers, this.consumerPropertyFile, topicName);
        ArrayList<ConsumerRecord> rawRecords = new ArrayList<ConsumerRecord>();
        ArrayList<ConsumerJsonRecord> jsonRecords = new ArrayList<ConsumerJsonRecord>();
        int noOfTimeOuts = 0;
        KafkaConsumerHelper.handleSeekOffset(effectiveLocal, consumer);
        while (true) {
            LOGGER.info("polling records  - noOfTimeOuts reached : " + noOfTimeOuts);
            ConsumerRecords records = consumer.poll(Duration.ofMillis(KafkaConsumerHelper.getPollTime(effectiveLocal)));
            if (records.count() == 0) {
                if (++noOfTimeOuts <= KafkaConsumerHelper.getMaxTimeOuts(effectiveLocal)) continue;
                break;
            }
            LOGGER.info("Got {} records after {} timeouts\n", (Object)records.count(), (Object)noOfTimeOuts);
            noOfTimeOuts = 0;
            if (records != null) {
                Iterator recordIterator = records.iterator();
                LOGGER.info("Consumer chosen recordType: " + effectiveLocal.getRecordType());
                switch (effectiveLocal.getRecordType()) {
                    case "RAW": {
                        KafkaConsumerHelper.readRaw(rawRecords, recordIterator);
                        break;
                    }
                    case "JSON": {
                        KafkaConsumerHelper.readJson(jsonRecords, recordIterator);
                        break;
                    }
                    default: {
                        throw new RuntimeException("Unsupported record type - '" + effectiveLocal.getRecordType() + "'. Supported values are 'JSON','RAW'");
                    }
                }
            }
            KafkaConsumerHelper.handleCommitSyncAsync((Consumer<Long, String>)consumer, this.consumerCommonConfigs, effectiveLocal);
        }
        consumer.close();
        KafkaFileRecordHelper.handleRecordsDump(effectiveLocal, rawRecords, jsonRecords);
        return KafkaConsumerHelper.prepareResult(effectiveLocal, jsonRecords, rawRecords);
    }
}

