package org.apache.kafka.streams.kstream.internals;

import java.util.Objects;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.api.RecordMetadata;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/streams/kstream/internals/KTableSource.class */
public class KTableSource<KIn, VIn> implements ProcessorSupplier<KIn, VIn, KIn, Change<VIn>> {
    private static final Logger LOG = LoggerFactory.getLogger(KTableSource.class);
    private final String storeName;
    private String queryableName;
    private boolean sendOldValues;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/streams/kstream/internals/KTableSource$KTableSourceProcessor.class */
    public class KTableSourceProcessor implements Processor<KIn, VIn, KIn, Change<VIn>> {
        private ProcessorContext<KIn, Change<VIn>> context;
        private TimestampedKeyValueStore<KIn, VIn> store;
        private TimestampedTupleForwarder<KIn, VIn> tupleForwarder;
        private Sensor droppedRecordsSensor;

        private KTableSourceProcessor() {
        }

        @Override // org.apache.kafka.streams.processor.api.Processor
        public void init(ProcessorContext<KIn, Change<VIn>> processorContext) {
            this.context = processorContext;
            this.droppedRecordsSensor = TaskMetrics.droppedRecordsSensor(Thread.currentThread().getName(), processorContext.taskId().toString(), (StreamsMetricsImpl) processorContext.metrics());
            if (KTableSource.this.queryableName != null) {
                this.store = (TimestampedKeyValueStore) processorContext.getStateStore(KTableSource.this.queryableName);
                this.tupleForwarder = new TimestampedTupleForwarder<>(this.store, processorContext, new TimestampedCacheFlushListener(processorContext), KTableSource.this.sendOldValues);
            }
        }

        @Override // org.apache.kafka.streams.processor.api.Processor
        public void process(Record<KIn, VIn> record) {
            Object obj;
            if (record.key() == null) {
                if (this.context.recordMetadata().isPresent()) {
                    RecordMetadata recordMetadata = this.context.recordMetadata().get();
                    KTableSource.LOG.warn("Skipping record due to null key. topic=[{}] partition=[{}] offset=[{}]", new Object[]{recordMetadata.topic(), Integer.valueOf(recordMetadata.partition()), Long.valueOf(recordMetadata.offset())});
                } else {
                    KTableSource.LOG.warn("Skipping record due to null key. Topic, partition, and offset not known.");
                }
                this.droppedRecordsSensor.record();
                return;
            }
            if (KTableSource.this.queryableName == null) {
                this.context.forward(record.withValue(new Change(record.value(), null)));
                return;
            }
            ValueAndTimestamp valueAndTimestamp = (ValueAndTimestamp) this.store.get(record.key());
            if (valueAndTimestamp != null) {
                obj = valueAndTimestamp.value();
                if (record.timestamp() < valueAndTimestamp.timestamp()) {
                    if (this.context.recordMetadata().isPresent()) {
                        RecordMetadata recordMetadata2 = this.context.recordMetadata().get();
                        KTableSource.LOG.warn("Detected out-of-order KTable update for {}, old timestamp=[{}] new timestamp=[{}]. topic=[{}] partition=[{}] offset=[{}].", new Object[]{this.store.name(), Long.valueOf(valueAndTimestamp.timestamp()), Long.valueOf(record.timestamp()), recordMetadata2.topic(), Integer.valueOf(recordMetadata2.partition()), Long.valueOf(recordMetadata2.offset())});
                    } else {
                        KTableSource.LOG.warn("Detected out-of-order KTable update for {}, old timestamp=[{}] new timestamp=[{}]. Topic, partition and offset not known.", new Object[]{this.store.name(), Long.valueOf(valueAndTimestamp.timestamp()), Long.valueOf(record.timestamp())});
                    }
                }
            } else {
                obj = null;
            }
            this.store.put(record.key(), ValueAndTimestamp.make(record.value(), record.timestamp()));
            this.tupleForwarder.maybeForward(record.withValue(new Change(record.value(), obj)));
        }
    }

    public KTableSource(String str, String str2) {
        Objects.requireNonNull(str, "storeName can't be null");
        this.storeName = str;
        this.queryableName = str2;
        this.sendOldValues = false;
    }

    public String queryableName() {
        return this.queryableName;
    }

    @Override // org.apache.kafka.streams.processor.api.ProcessorSupplier, java.util.function.Supplier
    public Processor<KIn, VIn, KIn, Change<VIn>> get() {
        return new KTableSourceProcessor();
    }

    public void enableSendingOldValues() {
        this.sendOldValues = true;
        this.queryableName = this.storeName;
    }

    public void materialize() {
        this.queryableName = this.storeName;
    }

    public boolean materialized() {
        return this.queryableName != null;
    }
}
