/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.kafka.source.reader;

import java.io.IOException;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.source.SourceOutput;
import org.apache.flink.connector.base.source.reader.RecordEmitter;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplitState;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.consumer.ConsumerRecord;

@Internal
public class KafkaRecordEmitter<T>
implements RecordEmitter<ConsumerRecord<byte[], byte[]>, T, KafkaPartitionSplitState> {
    private final KafkaRecordDeserializationSchema<T> deserializationSchema;
    private final SourceOutputWrapper<T> sourceOutputWrapper = new SourceOutputWrapper();

    public KafkaRecordEmitter(KafkaRecordDeserializationSchema<T> deserializationSchema) {
        this.deserializationSchema = deserializationSchema;
    }

    public void emitRecord(ConsumerRecord<byte[], byte[]> consumerRecord, SourceOutput<T> output, KafkaPartitionSplitState splitState) throws Exception {
        try {
            this.sourceOutputWrapper.setSourceOutput(output);
            this.sourceOutputWrapper.setTimestamp(consumerRecord.timestamp());
            this.deserializationSchema.deserialize(consumerRecord, this.sourceOutputWrapper);
            splitState.setCurrentOffset(consumerRecord.offset() + 1L);
        }
        catch (Exception e) {
            throw new IOException("Failed to deserialize consumer record due to", e);
        }
    }

    private static class SourceOutputWrapper<T>
    implements Collector<T> {
        private SourceOutput<T> sourceOutput;
        private long timestamp;

        private SourceOutputWrapper() {
        }

        public void collect(T record) {
            this.sourceOutput.collect(record, this.timestamp);
        }

        public void close() {
        }

        private void setSourceOutput(SourceOutput<T> sourceOutput) {
            this.sourceOutput = sourceOutput;
        }

        private void setTimestamp(long timestamp) {
            this.timestamp = timestamp;
        }
    }
}

