/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafka.formatter;

import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.serializers.AbstractKafkaAvroSerializer;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.Properties;
import kafka.common.KafkaException;
import kafka.common.MessageReader;
import org.apache.avro.AvroRuntimeException;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.util.Utf8;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.SerializationException;

public class AvroMessageReader
extends AbstractKafkaAvroSerializer
implements MessageReader {
    private String topic = null;
    private BufferedReader reader = null;
    private Boolean parseKey = false;
    private String keySeparator = "\t";
    private boolean ignoreError = false;
    private final DecoderFactory decoderFactory = DecoderFactory.get();
    private Schema keySchema = null;
    private Schema valueSchema = null;
    private String keySubject = null;
    private String valueSubject = null;

    public AvroMessageReader() {
    }

    AvroMessageReader(SchemaRegistryClient schemaRegistryClient, Schema keySchema, Schema valueSchema, String topic, boolean parseKey, BufferedReader reader) {
        this.schemaRegistry = schemaRegistryClient;
        this.keySchema = keySchema;
        this.valueSchema = valueSchema;
        this.topic = topic;
        this.keySubject = topic + "-key";
        this.valueSubject = topic + "-value";
        this.parseKey = parseKey;
        this.reader = reader;
    }

    public void init(InputStream inputStream, Properties props) {
        this.topic = props.getProperty("topic");
        if (props.containsKey("parse.key")) {
            this.parseKey = props.getProperty("parse.key").trim().toLowerCase().equals("true");
        }
        if (props.containsKey("key.separator")) {
            this.keySeparator = props.getProperty("key.separator");
        }
        if (props.containsKey("ignore.error")) {
            this.ignoreError = props.getProperty("ignore.error").trim().toLowerCase().equals("true");
        }
        this.reader = new BufferedReader(new InputStreamReader(inputStream));
        String url = props.getProperty("schema.registry.url");
        if (url == null) {
            throw new ConfigException("Missing schema registry url!");
        }
        this.schemaRegistry = new CachedSchemaRegistryClient(url, 1000);
        if (!props.containsKey("value.schema")) {
            throw new ConfigException("Must provide the Avro schema string in value.schema");
        }
        String valueSchemaString = props.getProperty("value.schema");
        Schema.Parser parser = new Schema.Parser();
        this.valueSchema = parser.parse(valueSchemaString);
        if (this.parseKey.booleanValue()) {
            if (!props.containsKey("key.schema")) {
                throw new ConfigException("Must provide the Avro schema string in key.schema");
            }
            String keySchemaString = props.getProperty("key.schema");
            this.keySchema = parser.parse(keySchemaString);
        }
        this.keySubject = this.topic + "-key";
        this.valueSubject = this.topic + "-value";
    }

    public ProducerRecord<byte[], byte[]> readMessage() {
        try {
            String line = this.reader.readLine();
            if (line == null) {
                return null;
            }
            if (!this.parseKey.booleanValue()) {
                Object value = this.jsonToAvro(line, this.valueSchema);
                byte[] serializedValue = this.serializeImpl(this.valueSubject, value);
                return new ProducerRecord(this.topic, (Object)serializedValue);
            }
            int keyIndex = line.indexOf(this.keySeparator);
            if (keyIndex < 0) {
                if (this.ignoreError) {
                    Object value = this.jsonToAvro(line, this.valueSchema);
                    byte[] serializedValue = this.serializeImpl(this.valueSubject, value);
                    return new ProducerRecord(this.topic, (Object)serializedValue);
                }
                throw new KafkaException("No key found in line " + line);
            }
            String keyString = line.substring(0, keyIndex);
            String valueString = keyIndex + this.keySeparator.length() > line.length() ? "" : line.substring(keyIndex + this.keySeparator.length());
            Object key = this.jsonToAvro(keyString, this.keySchema);
            byte[] serializedKey = this.serializeImpl(this.keySubject, key);
            Object value = this.jsonToAvro(valueString, this.valueSchema);
            byte[] serializedValue = this.serializeImpl(this.valueSubject, value);
            return new ProducerRecord(this.topic, (Object)serializedKey, (Object)serializedValue);
        }
        catch (IOException e) {
            throw new KafkaException("Error reading from input", (Throwable)e);
        }
    }

    private Object jsonToAvro(String jsonString, Schema schema) {
        try {
            GenericDatumReader reader = new GenericDatumReader(schema);
            Object object = reader.read(null, (Decoder)this.decoderFactory.jsonDecoder(schema, jsonString));
            if (schema.getType().equals((Object)Schema.Type.STRING)) {
                object = ((Utf8)object).toString();
            }
            return object;
        }
        catch (IOException e) {
            throw new SerializationException(String.format("Error deserializing json %s to Avro of schema %s", jsonString, schema), (Throwable)e);
        }
        catch (AvroRuntimeException e) {
            throw new SerializationException(String.format("Error deserializing json %s to Avro of schema %s", jsonString, schema), (Throwable)e);
        }
    }

    public void close() {
    }
}

