package org.apache.flume.serialization;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import org.apache.avro.Schema;
import org.apache.avro.SchemaNormalization;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.SeekableInput;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.commons.codec.binary.Hex;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.FlumeException;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.serialization.EventDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tukaani.xz.common.Util;

/* loaded from: input_file:META-INF/bundled-dependencies/flume-ng-core-1.11.0.jar:org/apache/flume/serialization/AvroEventDeserializer.class */
public class AvroEventDeserializer implements EventDeserializer {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) AvroEventDeserializer.class);
    private final AvroSchemaType schemaType;
    private final ResettableInputStream ris;
    private Schema schema;
    private byte[] schemaHash;
    private String schemaHashString;
    private DataFileReader<GenericRecord> fileReader;
    private GenericDatumWriter datumWriter;
    private GenericRecord record;
    private ByteArrayOutputStream out;
    private BinaryEncoder encoder;
    public static final String CONFIG_SCHEMA_TYPE_KEY = "schemaType";
    public static final String AVRO_SCHEMA_HEADER_HASH = "flume.avro.schema.hash";
    public static final String AVRO_SCHEMA_HEADER_LITERAL = "flume.avro.schema.literal";

    @VisibleForTesting
    /* loaded from: input_file:META-INF/bundled-dependencies/flume-ng-core-1.11.0.jar:org/apache/flume/serialization/AvroEventDeserializer$AvroSchemaType.class */
    public enum AvroSchemaType {
        HASH,
        LITERAL
    }

    /* loaded from: input_file:META-INF/bundled-dependencies/flume-ng-core-1.11.0.jar:org/apache/flume/serialization/AvroEventDeserializer$Builder.class */
    public static class Builder implements EventDeserializer.Builder {
        @Override // org.apache.flume.serialization.EventDeserializer.Builder
        public EventDeserializer build(Context context, ResettableInputStream resettableInputStream) {
            if (!(resettableInputStream instanceof RemoteMarkable)) {
                throw new IllegalArgumentException("Cannot use this deserializer without a RemoteMarkable input stream");
            }
            AvroEventDeserializer avroEventDeserializer = new AvroEventDeserializer(context, resettableInputStream);
            try {
                avroEventDeserializer.initialize();
                return avroEventDeserializer;
            } catch (Exception e) {
                throw new FlumeException("Cannot instantiate deserializer", e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:META-INF/bundled-dependencies/flume-ng-core-1.11.0.jar:org/apache/flume/serialization/AvroEventDeserializer$SeekableResettableInputBridge.class */
    public static class SeekableResettableInputBridge implements SeekableInput {
        ResettableInputStream ris;

        public SeekableResettableInputBridge(ResettableInputStream resettableInputStream) {
            this.ris = resettableInputStream;
        }

        @Override // org.apache.avro.file.SeekableInput
        public void seek(long j) throws IOException {
            this.ris.seek(j);
        }

        @Override // org.apache.avro.file.SeekableInput
        public long tell() throws IOException {
            return this.ris.tell();
        }

        @Override // org.apache.avro.file.SeekableInput
        public long length() throws IOException {
            return this.ris instanceof LengthMeasurable ? ((LengthMeasurable) this.ris).length() : Util.VLI_MAX;
        }

        @Override // org.apache.avro.file.SeekableInput
        public int read(byte[] bArr, int i, int i2) throws IOException {
            return this.ris.read(bArr, i, i2);
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() throws IOException {
            this.ris.close();
        }
    }

    private AvroEventDeserializer(Context context, ResettableInputStream resettableInputStream) {
        this.ris = resettableInputStream;
        this.schemaType = AvroSchemaType.valueOf(context.getString(CONFIG_SCHEMA_TYPE_KEY, AvroSchemaType.HASH.toString()).toUpperCase(Locale.ENGLISH));
        if (this.schemaType == AvroSchemaType.LITERAL) {
            logger.warn("schemaType set to " + AvroSchemaType.LITERAL.toString() + ", so storing full Avro schema in the header of each event, which may be inefficient. Consider using the hash of the schema instead of the literal schema.");
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void initialize() throws IOException, NoSuchAlgorithmException {
        SeekableResettableInputBridge seekableResettableInputBridge = new SeekableResettableInputBridge(this.ris);
        long tell = seekableResettableInputBridge.tell();
        seekableResettableInputBridge.seek(0L);
        this.fileReader = new DataFileReader<>(seekableResettableInputBridge, new GenericDatumReader());
        this.fileReader.sync(tell);
        this.schema = this.fileReader.getSchema();
        this.datumWriter = new GenericDatumWriter(this.schema);
        this.out = new ByteArrayOutputStream();
        this.encoder = EncoderFactory.get().binaryEncoder(this.out, this.encoder);
        this.schemaHash = SchemaNormalization.parsingFingerprint("CRC-64-AVRO", this.schema);
        this.schemaHashString = Hex.encodeHexString(this.schemaHash);
    }

    @Override // org.apache.flume.serialization.EventDeserializer
    public Event readEvent() throws IOException {
        if (!this.fileReader.hasNext()) {
            return null;
        }
        this.record = this.fileReader.next(this.record);
        this.out.reset();
        this.datumWriter.write(this.record, this.encoder);
        this.encoder.flush();
        Event withBody = EventBuilder.withBody(this.out.toByteArray());
        if (this.schemaType == AvroSchemaType.HASH) {
            withBody.getHeaders().put(AVRO_SCHEMA_HEADER_HASH, this.schemaHashString);
        } else {
            withBody.getHeaders().put("flume.avro.schema.literal", this.schema.toString());
        }
        return withBody;
    }

    @Override // org.apache.flume.serialization.EventDeserializer
    public List<Event> readEvents(int i) throws IOException {
        ArrayList newArrayList = Lists.newArrayList();
        for (int i2 = 0; i2 < i && this.fileReader.hasNext(); i2++) {
            Event readEvent = readEvent();
            if (readEvent != null) {
                newArrayList.add(readEvent);
            }
        }
        return newArrayList;
    }

    @Override // org.apache.flume.serialization.EventDeserializer, org.apache.flume.serialization.Resettable
    public void mark() throws IOException {
        long previousSync = this.fileReader.previousSync() - 16;
        if (previousSync < 0) {
            previousSync = 0;
        }
        ((RemoteMarkable) this.ris).markPosition(previousSync);
    }

    @Override // org.apache.flume.serialization.EventDeserializer, org.apache.flume.serialization.Resettable
    public void reset() throws IOException {
        this.fileReader.sync(((RemoteMarkable) this.ris).getMarkPosition());
    }

    @Override // org.apache.flume.serialization.EventDeserializer, java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.ris.close();
    }
}
