package org.apache.flink.connector.pulsar.source.reader;

import org.apache.flink.api.connector.source.SourceOutput;
import org.apache.flink.connector.base.source.reader.RecordEmitter;
import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;
import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplitState;
import org.apache.flink.util.Collector;
import org.apache.pulsar.client.api.Message;

/* loaded from: input_file:org/apache/flink/connector/pulsar/source/reader/PulsarRecordEmitter.class */
public class PulsarRecordEmitter<T> implements RecordEmitter<Message<byte[]>, T, PulsarPartitionSplitState> {
    private final PulsarDeserializationSchema<T> deserializationSchema;
    private final SourceOutputWrapper<T> sourceOutputWrapper = new SourceOutputWrapper<>();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/connector/pulsar/source/reader/PulsarRecordEmitter$SourceOutputWrapper.class */
    public static class SourceOutputWrapper<T> implements Collector<T> {
        private SourceOutput<T> sourceOutput;
        private long timestamp;

        private SourceOutputWrapper() {
        }

        public void collect(T t) {
            if (this.timestamp > 0) {
                this.sourceOutput.collect(t, this.timestamp);
            } else {
                this.sourceOutput.collect(t);
            }
        }

        public void close() {
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void setSourceOutput(SourceOutput<T> sourceOutput) {
            this.sourceOutput = sourceOutput;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void setTimestamp(Message<?> message) {
            this.timestamp = message.getEventTime();
        }
    }

    public PulsarRecordEmitter(PulsarDeserializationSchema<T> pulsarDeserializationSchema) {
        this.deserializationSchema = pulsarDeserializationSchema;
    }

    @Override // org.apache.flink.connector.base.source.reader.RecordEmitter
    public void emitRecord(Message<byte[]> message, SourceOutput<T> sourceOutput, PulsarPartitionSplitState pulsarPartitionSplitState) throws Exception {
        this.sourceOutputWrapper.setSourceOutput(sourceOutput);
        this.sourceOutputWrapper.setTimestamp(message);
        this.deserializationSchema.deserialize(message, this.sourceOutputWrapper);
        pulsarPartitionSplitState.setLatestConsumedId(message.getMessageId());
        message.release();
    }
}
