/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals;

import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.internals.KTableImpl;
import org.apache.kafka.streams.kstream.internals.KTableValueGetter;
import org.apache.kafka.streams.kstream.internals.KTableValueGetterSupplier;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;

class KStreamKTableLeftJoin<K, R, V1, V2>
implements ProcessorSupplier<K, V1> {
    private final KTableValueGetterSupplier<K, V2> valueGetterSupplier;
    private final ValueJoiner<V1, V2, R> joiner;

    KStreamKTableLeftJoin(KTableImpl<K, ?, V2> table, ValueJoiner<V1, V2, R> joiner) {
        this.valueGetterSupplier = table.valueGetterSupplier();
        this.joiner = joiner;
    }

    @Override
    public Processor<K, V1> get() {
        return new KStreamKTableLeftJoinProcessor(this.valueGetterSupplier.get());
    }

    private class KStreamKTableLeftJoinProcessor
    extends AbstractProcessor<K, V1> {
        private final KTableValueGetter<K, V2> valueGetter;

        public KStreamKTableLeftJoinProcessor(KTableValueGetter<K, V2> valueGetter) {
            this.valueGetter = valueGetter;
        }

        @Override
        public void init(ProcessorContext context) {
            super.init(context);
            this.valueGetter.init(context);
        }

        @Override
        public void process(K key, V1 value) {
            if (key != null) {
                this.context().forward(key, KStreamKTableLeftJoin.this.joiner.apply(value, this.valueGetter.get(key)));
            }
        }
    }
}

