package org.apache.kafka.streams.kstream.internals.foreignkeyjoin;

import java.util.Arrays;
import java.util.function.Supplier;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapper;
import org.apache.kafka.streams.processor.api.ContextualProcessor;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.api.RecordMetadata;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
import org.apache.kafka.streams.state.internals.Murmur3;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.class */
public class SubscriptionSendProcessorSupplier<K, KO, V> implements ProcessorSupplier<K, Change<V>, KO, SubscriptionWrapper<K>> {
    private static final Logger LOG = LoggerFactory.getLogger(SubscriptionSendProcessorSupplier.class);
    private final ForeignKeyExtractor<K, V, KO> foreignKeyExtractor;
    private final Supplier<String> foreignKeySerdeTopicSupplier;
    private final Supplier<String> valueSerdeTopicSupplier;
    private final boolean leftJoin;
    private Serializer<KO> foreignKeySerializer;
    private Serializer<V> valueSerializer;
    private boolean useVersionedSemantics;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier$UnbindChangeProcessor.class */
    public class UnbindChangeProcessor extends ContextualProcessor<K, Change<V>, KO, SubscriptionWrapper<K>> {
        private Sensor droppedRecordsSensor;
        private String foreignKeySerdeTopic;
        private String valueSerdeTopic;
        private long[] recordHash;

        private UnbindChangeProcessor() {
        }

        @Override // org.apache.kafka.streams.processor.api.ContextualProcessor, org.apache.kafka.streams.processor.api.Processor
        public void init(ProcessorContext<KO, SubscriptionWrapper<K>> processorContext) {
            super.init(processorContext);
            this.foreignKeySerdeTopic = SubscriptionSendProcessorSupplier.this.foreignKeySerdeTopicSupplier.get();
            this.valueSerdeTopic = SubscriptionSendProcessorSupplier.this.valueSerdeTopicSupplier.get();
            if (SubscriptionSendProcessorSupplier.this.foreignKeySerializer == null) {
                SubscriptionSendProcessorSupplier.this.foreignKeySerializer = processorContext.keySerde().serializer();
            }
            if (SubscriptionSendProcessorSupplier.this.valueSerializer == null) {
                SubscriptionSendProcessorSupplier.this.valueSerializer = processorContext.valueSerde().serializer();
            }
            this.droppedRecordsSensor = TaskMetrics.droppedRecordsSensor(Thread.currentThread().getName(), processorContext.taskId().toString(), (StreamsMetricsImpl) processorContext.metrics());
        }

        @Override // org.apache.kafka.streams.processor.api.Processor
        public void process(Record<K, Change<V>> record) {
            this.recordHash = null;
            if (SubscriptionSendProcessorSupplier.this.useVersionedSemantics && !record.value().isLatest) {
                SubscriptionSendProcessorSupplier.LOG.info("Skipping out-of-order record from versioned table while performing table-table join.");
                this.droppedRecordsSensor.record();
            } else if (SubscriptionSendProcessorSupplier.this.leftJoin) {
                leftJoinInstructions(record);
            } else {
                defaultJoinInstructions(record);
            }
        }

        private void leftJoinInstructions(Record<K, Change<V>> record) {
            if (record.value().oldValue == null) {
                if (record.value().newValue != null) {
                    forward(record, SubscriptionSendProcessorSupplier.this.foreignKeyExtractor.extract(record.key(), record.value().newValue), SubscriptionWrapper.Instruction.PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE);
                }
            } else {
                KO extract = SubscriptionSendProcessorSupplier.this.foreignKeyExtractor.extract(record.key(), record.value().oldValue);
                KO extract2 = record.value().newValue == null ? null : SubscriptionSendProcessorSupplier.this.foreignKeyExtractor.extract(record.key(), record.value().newValue);
                if (extract != null && !Arrays.equals(serialize(extract2), serialize(extract))) {
                    forward(record, extract, SubscriptionWrapper.Instruction.DELETE_KEY_AND_PROPAGATE);
                }
                forward(record, extract2, SubscriptionWrapper.Instruction.PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE);
            }
        }

        private void defaultJoinInstructions(Record<K, Change<V>> record) {
            if (record.value().oldValue == null) {
                if (record.value().newValue != null) {
                    KO extract = SubscriptionSendProcessorSupplier.this.foreignKeyExtractor.extract(record.key(), record.value().newValue);
                    if (extract == null) {
                        logSkippedRecordDueToNullForeignKey();
                        return;
                    } else {
                        forward(record, extract, SubscriptionWrapper.Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE);
                        return;
                    }
                }
                return;
            }
            KO extract2 = record.value().oldValue == null ? null : SubscriptionSendProcessorSupplier.this.foreignKeyExtractor.extract(record.key(), record.value().oldValue);
            if (extract2 == null) {
                logSkippedRecordDueToNullForeignKey();
                return;
            }
            if (record.value().newValue == null) {
                forward(record, extract2, SubscriptionWrapper.Instruction.DELETE_KEY_AND_PROPAGATE);
                return;
            }
            KO extract3 = record.value().newValue == null ? null : SubscriptionSendProcessorSupplier.this.foreignKeyExtractor.extract(record.key(), record.value().newValue);
            if (extract3 == null) {
                logSkippedRecordDueToNullForeignKey();
                return;
            }
            if (!Arrays.equals(serialize(extract3), serialize(extract2))) {
                forward(record, extract2, SubscriptionWrapper.Instruction.DELETE_KEY_NO_PROPAGATE);
            }
            forward(record, extract3, SubscriptionWrapper.Instruction.PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE);
        }

        private byte[] serialize(KO ko) {
            return SubscriptionSendProcessorSupplier.this.foreignKeySerializer.serialize(this.foreignKeySerdeTopic, ko);
        }

        /* JADX WARN: Multi-variable type inference failed */
        private void forward(Record<K, Change<V>> record, KO ko, SubscriptionWrapper.Instruction instruction) {
            context().forward(record.withKey(ko).withValue(new SubscriptionWrapper(hash(record), instruction, record.key(), Integer.valueOf(context().recordMetadata().get().partition()))));
        }

        private long[] hash(Record<K, Change<V>> record) {
            if (this.recordHash == null) {
                this.recordHash = record.value().newValue == null ? null : Murmur3.hash128(SubscriptionSendProcessorSupplier.this.valueSerializer.serialize(this.valueSerdeTopic, record.value().newValue));
            }
            return this.recordHash;
        }

        private void logSkippedRecordDueToNullForeignKey() {
            if (context().recordMetadata().isPresent()) {
                RecordMetadata recordMetadata = context().recordMetadata().get();
                SubscriptionSendProcessorSupplier.LOG.warn("Skipping record due to null foreign key. topic=[{}] partition=[{}] offset=[{}]", new Object[]{recordMetadata.topic(), Integer.valueOf(recordMetadata.partition()), Long.valueOf(recordMetadata.offset())});
            } else {
                SubscriptionSendProcessorSupplier.LOG.warn("Skipping record due to null foreign key. Topic, partition, and offset not known.");
            }
            this.droppedRecordsSensor.record();
        }
    }

    public SubscriptionSendProcessorSupplier(ForeignKeyExtractor<K, V, KO> foreignKeyExtractor, Supplier<String> supplier, Supplier<String> supplier2, Serde<KO> serde, Serializer<V> serializer, boolean z) {
        this.foreignKeyExtractor = foreignKeyExtractor;
        this.foreignKeySerdeTopicSupplier = supplier;
        this.valueSerdeTopicSupplier = supplier2;
        this.valueSerializer = serializer;
        this.leftJoin = z;
        this.foreignKeySerializer = serde == null ? null : serde.serializer();
    }

    @Override // org.apache.kafka.streams.processor.api.ProcessorSupplier, java.util.function.Supplier
    public Processor<K, Change<V>, KO, SubscriptionWrapper<K>> get() {
        return new UnbindChangeProcessor();
    }

    public void setUseVersionedSemantics(boolean z) {
        this.useVersionedSemantics = z;
    }

    public boolean isUseVersionedSemantics() {
        return this.useVersionedSemantics;
    }
}
