/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals;

import java.time.Duration;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KGroupedTable;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Named;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.Suppressed;
import org.apache.kafka.streams.kstream.TableJoined;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.kstream.ValueMapperWithKey;
import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
import org.apache.kafka.streams.kstream.internals.AbstractStream;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.kstream.internals.ConsumedInternal;
import org.apache.kafka.streams.kstream.internals.GroupedInternal;
import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder;
import org.apache.kafka.streams.kstream.internals.KGroupedTableImpl;
import org.apache.kafka.streams.kstream.internals.KStreamAggProcessorSupplier;
import org.apache.kafka.streams.kstream.internals.KStreamImpl;
import org.apache.kafka.streams.kstream.internals.KStreamMapValues;
import org.apache.kafka.streams.kstream.internals.KTableFilter;
import org.apache.kafka.streams.kstream.internals.KTableKTableAbstractJoin;
import org.apache.kafka.streams.kstream.internals.KTableKTableInnerJoin;
import org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoin;
import org.apache.kafka.streams.kstream.internals.KTableKTableOuterJoin;
import org.apache.kafka.streams.kstream.internals.KTableKTableRightJoin;
import org.apache.kafka.streams.kstream.internals.KTableMapValues;
import org.apache.kafka.streams.kstream.internals.KTableProcessorSupplier;
import org.apache.kafka.streams.kstream.internals.KTableRepartitionMap;
import org.apache.kafka.streams.kstream.internals.KTableSource;
import org.apache.kafka.streams.kstream.internals.KTableSourceValueGetterSupplier;
import org.apache.kafka.streams.kstream.internals.KTableTransformValues;
import org.apache.kafka.streams.kstream.internals.KTableValueGetterSupplier;
import org.apache.kafka.streams.kstream.internals.KeyValueStoreMaterializer;
import org.apache.kafka.streams.kstream.internals.MaterializedInternal;
import org.apache.kafka.streams.kstream.internals.NamedInternal;
import org.apache.kafka.streams.kstream.internals.ProducedInternal;
import org.apache.kafka.streams.kstream.internals.TableJoinedInternal;
import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.CombinedKeySchema;
import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignTableJoinProcessorSupplier;
import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ResponseJoinProcessorSupplier;
import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionJoinProcessorSupplier;
import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionReceiveProcessorSupplier;
import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionResponseWrapper;
import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionResponseWrapperSerde;
import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionSendProcessorSupplier;
import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapper;
import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapperSerde;
import org.apache.kafka.streams.kstream.internals.graph.ForeignJoinSubscriptionSendNode;
import org.apache.kafka.streams.kstream.internals.graph.ForeignTableJoinNode;
import org.apache.kafka.streams.kstream.internals.graph.GraphGraceSearchUtil;
import org.apache.kafka.streams.kstream.internals.graph.GraphNode;
import org.apache.kafka.streams.kstream.internals.graph.KTableKTableJoinNode;
import org.apache.kafka.streams.kstream.internals.graph.ProcessorGraphNode;
import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;
import org.apache.kafka.streams.kstream.internals.graph.StatefulProcessorNode;
import org.apache.kafka.streams.kstream.internals.graph.StreamSinkNode;
import org.apache.kafka.streams.kstream.internals.graph.StreamSourceNode;
import org.apache.kafka.streams.kstream.internals.graph.TableFilterNode;
import org.apache.kafka.streams.kstream.internals.graph.TableProcessorNode;
import org.apache.kafka.streams.kstream.internals.graph.TableRepartitionMapNode;
import org.apache.kafka.streams.kstream.internals.graph.TableSuppressNode;
import org.apache.kafka.streams.kstream.internals.suppress.FinalResultsSuppressionBuilder;
import org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessorSupplier;
import org.apache.kafka.streams.kstream.internals.suppress.NamedSuppressed;
import org.apache.kafka.streams.kstream.internals.suppress.SuppressedInternal;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.internals.InternalTopicProperties;
import org.apache.kafka.streams.processor.internals.StaticTopicNameExtractor;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.VersionedBytesStoreSupplier;
import org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KTableImpl<K, S, V>
extends AbstractStream<K, V>
implements KTable<K, V> {
    private static final Logger LOG = LoggerFactory.getLogger(KTableImpl.class);
    static final String SOURCE_NAME = "KTABLE-SOURCE-";
    static final String STATE_STORE_NAME = "STATE-STORE-";
    private static final String FILTER_NAME = "KTABLE-FILTER-";
    private static final String JOINTHIS_NAME = "KTABLE-JOINTHIS-";
    private static final String JOINOTHER_NAME = "KTABLE-JOINOTHER-";
    private static final String MAPVALUES_NAME = "KTABLE-MAPVALUES-";
    private static final String MERGE_NAME = "KTABLE-MERGE-";
    private static final String SELECT_NAME = "KTABLE-SELECT-";
    private static final String SUPPRESS_NAME = "KTABLE-SUPPRESS-";
    private static final String TOSTREAM_NAME = "KTABLE-TOSTREAM-";
    private static final String TRANSFORMVALUES_NAME = "KTABLE-TRANSFORMVALUES-";
    private static final String FK_JOIN = "KTABLE-FK-JOIN-";
    private static final String FK_JOIN_STATE_STORE_NAME = "KTABLE-FK-JOIN-SUBSCRIPTION-STATE-STORE-";
    private static final String SUBSCRIPTION_REGISTRATION = "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-";
    private static final String SUBSCRIPTION_RESPONSE = "KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-";
    private static final String SUBSCRIPTION_PROCESSOR = "KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-";
    private static final String SUBSCRIPTION_RESPONSE_RESOLVER_PROCESSOR = "KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-";
    private static final String FK_JOIN_OUTPUT_NAME = "KTABLE-FK-JOIN-OUTPUT-";
    private static final String TOPIC_SUFFIX = "-topic";
    private static final String SINK_NAME = "KTABLE-SINK-";
    private final Object processorSupplier;
    private final String queryableStoreName;
    private boolean sendOldValues = false;
    private final Function<Optional<Set<Integer>>, Integer> getPartition = maybeMulticastPartitions -> {
        if (!maybeMulticastPartitions.isPresent()) {
            return null;
        }
        if (((Set)maybeMulticastPartitions.get()).size() != 1) {
            throw new IllegalArgumentException("The partitions returned by StreamPartitioner#partitions method when used for FK join should be a singleton set");
        }
        return (Integer)((Set)maybeMulticastPartitions.get()).iterator().next();
    };

    public KTableImpl(String name, Serde<K> keySerde, Serde<V> valueSerde, Set<String> subTopologySourceNodes, String queryableStoreName, org.apache.kafka.streams.processor.ProcessorSupplier<?, ?> processorSupplier, GraphNode graphNode, InternalStreamsBuilder builder) {
        super(name, keySerde, valueSerde, subTopologySourceNodes, graphNode, builder);
        this.processorSupplier = processorSupplier;
        this.queryableStoreName = queryableStoreName;
    }

    public KTableImpl(String name, Serde<K> keySerde, Serde<V> valueSerde, Set<String> subTopologySourceNodes, String queryableStoreName, ProcessorSupplier<?, ?, ?, ?> newProcessorSupplier, GraphNode graphNode, InternalStreamsBuilder builder) {
        super(name, keySerde, valueSerde, subTopologySourceNodes, graphNode, builder);
        this.processorSupplier = newProcessorSupplier;
        this.queryableStoreName = queryableStoreName;
    }

    @Override
    public String queryableStoreName() {
        return this.queryableStoreName;
    }

    private KTable<K, V> doFilter(Predicate<? super K, ? super V> predicate, Named named, MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal, boolean filterNot) {
        StoreBuilder<?> storeBuilder;
        String queryableStoreName;
        Serde valueSerde;
        Serde keySerde;
        if (materializedInternal != null) {
            if (materializedInternal.storeName() == null) {
                this.builder.newStoreName(FILTER_NAME);
            }
            keySerde = materializedInternal.keySerde() != null ? materializedInternal.keySerde() : this.keySerde;
            valueSerde = materializedInternal.valueSerde() != null ? materializedInternal.valueSerde() : this.valueSerde;
            queryableStoreName = materializedInternal.queryableStoreName();
            storeBuilder = queryableStoreName != null ? new KeyValueStoreMaterializer<K, V>(materializedInternal).materialize() : null;
        } else {
            keySerde = this.keySerde;
            valueSerde = this.valueSerde;
            queryableStoreName = null;
            storeBuilder = null;
        }
        String name = new NamedInternal(named).orElseGenerateWithPrefix(this.builder, FILTER_NAME);
        KTableFilter<? super K, ? super V> processorSupplier = new KTableFilter<K, V>(this, predicate, filterNot, queryableStoreName);
        ProcessorParameters processorParameters = this.unsafeCastProcessorParametersToCompletelyDifferentType(new ProcessorParameters(processorSupplier, name));
        TableFilterNode tableNode = new TableFilterNode(name, processorParameters, storeBuilder);
        KTableImpl.maybeSetOutputVersioned(tableNode, materializedInternal);
        this.builder.addGraphNode(this.graphNode, tableNode);
        return new KTableImpl<K, S, V>(name, keySerde, valueSerde, (Set<String>)this.subTopologySourceNodes, queryableStoreName, processorSupplier, tableNode, this.builder);
    }

    @Override
    public KTable<K, V> filter(Predicate<? super K, ? super V> predicate) {
        Objects.requireNonNull(predicate, "predicate can't be null");
        return this.doFilter(predicate, NamedInternal.empty(), null, false);
    }

    @Override
    public KTable<K, V> filter(Predicate<? super K, ? super V> predicate, Named named) {
        Objects.requireNonNull(predicate, "predicate can't be null");
        return this.doFilter(predicate, named, null, false);
    }

    @Override
    public KTable<K, V> filter(Predicate<? super K, ? super V> predicate, Named named, Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
        Objects.requireNonNull(predicate, "predicate can't be null");
        Objects.requireNonNull(materialized, "materialized can't be null");
        MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>>(materialized);
        return this.doFilter(predicate, named, materializedInternal, false);
    }

    @Override
    public KTable<K, V> filter(Predicate<? super K, ? super V> predicate, Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
        return this.filter(predicate, NamedInternal.empty(), materialized);
    }

    @Override
    public KTable<K, V> filterNot(Predicate<? super K, ? super V> predicate) {
        Objects.requireNonNull(predicate, "predicate can't be null");
        return this.doFilter(predicate, NamedInternal.empty(), null, true);
    }

    @Override
    public KTable<K, V> filterNot(Predicate<? super K, ? super V> predicate, Named named) {
        Objects.requireNonNull(predicate, "predicate can't be null");
        return this.doFilter(predicate, named, null, true);
    }

    @Override
    public KTable<K, V> filterNot(Predicate<? super K, ? super V> predicate, Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
        return this.filterNot(predicate, NamedInternal.empty(), materialized);
    }

    @Override
    public KTable<K, V> filterNot(Predicate<? super K, ? super V> predicate, Named named, Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
        Objects.requireNonNull(predicate, "predicate can't be null");
        Objects.requireNonNull(materialized, "materialized can't be null");
        MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>>(materialized);
        NamedInternal renamed = new NamedInternal(named);
        return this.doFilter(predicate, renamed, materializedInternal, true);
    }

    private <VR> KTable<K, VR> doMapValues(ValueMapperWithKey<? super K, ? super V, ? extends VR> mapper, Named named, MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal) {
        StoreBuilder<?> storeBuilder;
        String queryableStoreName;
        Serde<VR> valueSerde;
        Serde keySerde;
        if (materializedInternal != null) {
            if (materializedInternal.storeName() == null) {
                this.builder.newStoreName(MAPVALUES_NAME);
            }
            keySerde = materializedInternal.keySerde() != null ? materializedInternal.keySerde() : this.keySerde;
            valueSerde = materializedInternal.valueSerde();
            queryableStoreName = materializedInternal.queryableStoreName();
            storeBuilder = queryableStoreName != null ? new KeyValueStoreMaterializer<K, VR>(materializedInternal).materialize() : null;
        } else {
            keySerde = this.keySerde;
            valueSerde = null;
            queryableStoreName = null;
            storeBuilder = null;
        }
        String name = new NamedInternal(named).orElseGenerateWithPrefix(this.builder, MAPVALUES_NAME);
        KTableMapValues<? super K, ? super V, ? extends VR> processorSupplier = new KTableMapValues<K, V, VR>(this, mapper, queryableStoreName);
        ProcessorParameters processorParameters = this.unsafeCastProcessorParametersToCompletelyDifferentType(new ProcessorParameters(processorSupplier, name));
        TableProcessorNode tableNode = new TableProcessorNode(name, processorParameters, storeBuilder);
        KTableImpl.maybeSetOutputVersioned(tableNode, materializedInternal);
        this.builder.addGraphNode(this.graphNode, tableNode);
        return new KTableImpl<K, S, VR>(name, keySerde, valueSerde, (Set<String>)this.subTopologySourceNodes, queryableStoreName, processorSupplier, tableNode, this.builder);
    }

    @Override
    public <VR> KTable<K, VR> mapValues(ValueMapper<? super V, ? extends VR> mapper) {
        Objects.requireNonNull(mapper, "mapper can't be null");
        return this.doMapValues(KTableImpl.withKey(mapper), NamedInternal.empty(), null);
    }

    @Override
    public <VR> KTable<K, VR> mapValues(ValueMapper<? super V, ? extends VR> mapper, Named named) {
        Objects.requireNonNull(mapper, "mapper can't be null");
        return this.doMapValues(KTableImpl.withKey(mapper), named, null);
    }

    @Override
    public <VR> KTable<K, VR> mapValues(ValueMapperWithKey<? super K, ? super V, ? extends VR> mapper) {
        Objects.requireNonNull(mapper, "mapper can't be null");
        return this.doMapValues(mapper, NamedInternal.empty(), null);
    }

    @Override
    public <VR> KTable<K, VR> mapValues(ValueMapperWithKey<? super K, ? super V, ? extends VR> mapper, Named named) {
        Objects.requireNonNull(mapper, "mapper can't be null");
        return this.doMapValues(mapper, named, null);
    }

    @Override
    public <VR> KTable<K, VR> mapValues(ValueMapper<? super V, ? extends VR> mapper, Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
        return this.mapValues(mapper, (Named)NamedInternal.empty(), materialized);
    }

    @Override
    public <VR> KTable<K, VR> mapValues(ValueMapper<? super V, ? extends VR> mapper, Named named, Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
        Objects.requireNonNull(mapper, "mapper can't be null");
        Objects.requireNonNull(materialized, "materialized can't be null");
        MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>>(materialized);
        return this.doMapValues(KTableImpl.withKey(mapper), named, materializedInternal);
    }

    @Override
    public <VR> KTable<K, VR> mapValues(ValueMapperWithKey<? super K, ? super V, ? extends VR> mapper, Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
        return this.mapValues(mapper, (Named)NamedInternal.empty(), materialized);
    }

    @Override
    public <VR> KTable<K, VR> mapValues(ValueMapperWithKey<? super K, ? super V, ? extends VR> mapper, Named named, Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
        Objects.requireNonNull(mapper, "mapper can't be null");
        Objects.requireNonNull(materialized, "materialized can't be null");
        MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>>(materialized);
        return this.doMapValues(mapper, named, materializedInternal);
    }

    @Override
    public <VR> KTable<K, VR> transformValues(ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> transformerSupplier, String ... stateStoreNames) {
        return this.doTransformValues(transformerSupplier, null, NamedInternal.empty(), stateStoreNames);
    }

    @Override
    public <VR> KTable<K, VR> transformValues(ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> transformerSupplier, Named named, String ... stateStoreNames) {
        Objects.requireNonNull(named, "processorName can't be null");
        return this.doTransformValues(transformerSupplier, null, new NamedInternal(named), stateStoreNames);
    }

    @Override
    public <VR> KTable<K, VR> transformValues(ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> transformerSupplier, Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized, String ... stateStoreNames) {
        return this.transformValues(transformerSupplier, materialized, NamedInternal.empty(), stateStoreNames);
    }

    @Override
    public <VR> KTable<K, VR> transformValues(ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> transformerSupplier, Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized, Named named, String ... stateStoreNames) {
        Objects.requireNonNull(materialized, "materialized can't be null");
        Objects.requireNonNull(named, "named can't be null");
        MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>>(materialized);
        return this.doTransformValues(transformerSupplier, materializedInternal, new NamedInternal(named), stateStoreNames);
    }

    private <VR> KTable<K, VR> doTransformValues(ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> transformerSupplier, MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal, NamedInternal namedInternal, String ... stateStoreNames) {
        StoreBuilder<?> storeBuilder;
        String queryableStoreName;
        Serde<VR> valueSerde;
        Serde keySerde;
        Objects.requireNonNull(stateStoreNames, "stateStoreNames");
        if (materializedInternal != null) {
            keySerde = materializedInternal.keySerde() != null ? materializedInternal.keySerde() : this.keySerde;
            valueSerde = materializedInternal.valueSerde();
            queryableStoreName = materializedInternal.queryableStoreName();
            storeBuilder = queryableStoreName != null ? new KeyValueStoreMaterializer<K, VR>(materializedInternal).materialize() : null;
        } else {
            keySerde = this.keySerde;
            valueSerde = null;
            queryableStoreName = null;
            storeBuilder = null;
        }
        String name = namedInternal.orElseGenerateWithPrefix(this.builder, TRANSFORMVALUES_NAME);
        KTableTransformValues<? super K, ? super V, ? extends VR> processorSupplier = new KTableTransformValues<K, V, VR>(this, transformerSupplier, queryableStoreName);
        ProcessorParameters processorParameters = this.unsafeCastProcessorParametersToCompletelyDifferentType(new ProcessorParameters(processorSupplier, name));
        TableProcessorNode tableNode = new TableProcessorNode(name, processorParameters, storeBuilder, stateStoreNames);
        KTableImpl.maybeSetOutputVersioned(tableNode, materializedInternal);
        this.builder.addGraphNode(this.graphNode, tableNode);
        return new KTableImpl<K, S, VR>(name, keySerde, valueSerde, (Set<String>)this.subTopologySourceNodes, queryableStoreName, processorSupplier, tableNode, this.builder);
    }

    @Override
    public KStream<K, V> toStream() {
        return this.toStream(NamedInternal.empty());
    }

    @Override
    public KStream<K, V> toStream(Named named) {
        Objects.requireNonNull(named, "named can't be null");
        String name = new NamedInternal(named).orElseGenerateWithPrefix(this.builder, TOSTREAM_NAME);
        KStreamMapValues<Object, Change, Object> kStreamMapValues = new KStreamMapValues<Object, Change, Object>((key, change) -> change.newValue);
        ProcessorParameters processorParameters = this.unsafeCastProcessorParametersToCompletelyDifferentType(new ProcessorParameters(kStreamMapValues, name));
        ProcessorGraphNode toStreamNode = new ProcessorGraphNode(name, processorParameters);
        this.builder.addGraphNode(this.graphNode, toStreamNode);
        return new KStreamImpl(name, this.keySerde, this.valueSerde, this.subTopologySourceNodes, false, toStreamNode, this.builder);
    }

    @Override
    public <K1> KStream<K1, V> toStream(KeyValueMapper<? super K, ? super V, ? extends K1> mapper) {
        return this.toStream().selectKey(mapper);
    }

    @Override
    public <K1> KStream<K1, V> toStream(KeyValueMapper<? super K, ? super V, ? extends K1> mapper, Named named) {
        return this.toStream(named).selectKey(mapper);
    }

    @Override
    public KTable<K, V> suppress(Suppressed<? super K> suppressed) {
        StoreBuilder storeBuilder;
        if (this.graphNode.isOutputVersioned().isPresent() && this.graphNode.isOutputVersioned().get().booleanValue()) {
            throw new TopologyException("suppress() is only supported for non-versioned KTables");
        }
        if (!(suppressed instanceof NamedSuppressed)) {
            throw new IllegalArgumentException("Custom subclasses of Suppressed are not supported.");
        }
        String givenName = ((NamedSuppressed)suppressed).name();
        String name = givenName != null ? givenName : this.builder.newProcessorName(SUPPRESS_NAME);
        SuppressedInternal<K> suppressedInternal = this.buildSuppress(suppressed, name);
        String storeName = suppressedInternal.name() != null ? suppressedInternal.name() + "-store" : this.builder.newStoreName(SUPPRESS_NAME);
        KTableSuppressProcessorSupplier suppressionSupplier = new KTableSuppressProcessorSupplier(suppressedInternal, storeName, this);
        if (suppressedInternal.bufferConfig().isLoggingEnabled()) {
            Map<String, String> topicConfig = suppressedInternal.bufferConfig().getLogConfig();
            storeBuilder = new InMemoryTimeOrderedKeyValueBuffer.Builder(storeName, this.keySerde, this.valueSerde).withLoggingEnabled(topicConfig);
        } else {
            storeBuilder = new InMemoryTimeOrderedKeyValueBuffer.Builder(storeName, this.keySerde, this.valueSerde).withLoggingDisabled();
        }
        TableSuppressNode node = new TableSuppressNode(name, new ProcessorParameters(suppressionSupplier, name), storeBuilder);
        node.setOutputVersioned(false);
        this.builder.addGraphNode(this.graphNode, node);
        return new KTableImpl<K, S, V>(name, this.keySerde, this.valueSerde, Collections.singleton(this.name), null, suppressionSupplier, node, this.builder);
    }

    private SuppressedInternal<K> buildSuppress(Suppressed<? super K> suppress, String name) {
        if (suppress instanceof FinalResultsSuppressionBuilder) {
            long grace = GraphGraceSearchUtil.findAndVerifyWindowGrace(this.graphNode);
            LOG.info("Using grace period of [{}] as the suppress duration for node [{}].", (Object)Duration.ofMillis(grace), (Object)name);
            FinalResultsSuppressionBuilder builder = (FinalResultsSuppressionBuilder)suppress;
            SuppressedInternal finalResultsSuppression = builder.buildFinalResultsSuppression(Duration.ofMillis(grace));
            return finalResultsSuppression;
        }
        if (suppress instanceof SuppressedInternal) {
            return (SuppressedInternal)suppress;
        }
        throw new IllegalArgumentException("Custom subclasses of Suppressed are not allowed.");
    }

    @Override
    public <V1, R> KTable<K, R> join(KTable<K, V1> other, ValueJoiner<? super V, ? super V1, ? extends R> joiner) {
        return this.doJoin(other, joiner, NamedInternal.empty(), null, false, false);
    }

    @Override
    public <V1, R> KTable<K, R> join(KTable<K, V1> other, ValueJoiner<? super V, ? super V1, ? extends R> joiner, Named named) {
        return this.doJoin(other, joiner, named, null, false, false);
    }

    @Override
    public <VO, VR> KTable<K, VR> join(KTable<K, VO> other, ValueJoiner<? super V, ? super VO, ? extends VR> joiner, Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
        return this.join(other, joiner, NamedInternal.empty(), materialized);
    }

    @Override
    public <VO, VR> KTable<K, VR> join(KTable<K, VO> other, ValueJoiner<? super V, ? super VO, ? extends VR> joiner, Named named, Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
        Objects.requireNonNull(materialized, "materialized can't be null");
        MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>>(materialized, this.builder, MERGE_NAME);
        return this.doJoin(other, joiner, named, materializedInternal, false, false);
    }

    @Override
    public <V1, R> KTable<K, R> outerJoin(KTable<K, V1> other, ValueJoiner<? super V, ? super V1, ? extends R> joiner) {
        return this.outerJoin(other, joiner, (Named)NamedInternal.empty());
    }

    @Override
    public <V1, R> KTable<K, R> outerJoin(KTable<K, V1> other, ValueJoiner<? super V, ? super V1, ? extends R> joiner, Named named) {
        return this.doJoin(other, joiner, named, null, true, true);
    }

    @Override
    public <VO, VR> KTable<K, VR> outerJoin(KTable<K, VO> other, ValueJoiner<? super V, ? super VO, ? extends VR> joiner, Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
        return this.outerJoin(other, joiner, NamedInternal.empty(), materialized);
    }

    @Override
    public <VO, VR> KTable<K, VR> outerJoin(KTable<K, VO> other, ValueJoiner<? super V, ? super VO, ? extends VR> joiner, Named named, Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
        Objects.requireNonNull(materialized, "materialized can't be null");
        MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>>(materialized, this.builder, MERGE_NAME);
        return this.doJoin(other, joiner, named, materializedInternal, true, true);
    }

    @Override
    public <V1, R> KTable<K, R> leftJoin(KTable<K, V1> other, ValueJoiner<? super V, ? super V1, ? extends R> joiner) {
        return this.leftJoin(other, joiner, (Named)NamedInternal.empty());
    }

    @Override
    public <V1, R> KTable<K, R> leftJoin(KTable<K, V1> other, ValueJoiner<? super V, ? super V1, ? extends R> joiner, Named named) {
        return this.doJoin(other, joiner, named, null, true, false);
    }

    @Override
    public <VO, VR> KTable<K, VR> leftJoin(KTable<K, VO> other, ValueJoiner<? super V, ? super VO, ? extends VR> joiner, Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
        return this.leftJoin(other, joiner, NamedInternal.empty(), materialized);
    }

    @Override
    public <VO, VR> KTable<K, VR> leftJoin(KTable<K, VO> other, ValueJoiner<? super V, ? super VO, ? extends VR> joiner, Named named, Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
        Objects.requireNonNull(materialized, "materialized can't be null");
        MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>>(materialized, this.builder, MERGE_NAME);
        return this.doJoin(other, joiner, named, materializedInternal, true, false);
    }

    private <VO, VR> KTable<K, VR> doJoin(KTable<K, VO> other, ValueJoiner<? super V, ? super VO, ? extends VR> joiner, Named joinName, MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal, boolean leftOuter, boolean rightOuter) {
        StoreBuilder<?> storeBuilder;
        String queryableStoreName;
        Serde<VR> valueSerde;
        Serde<K> keySerde;
        KTableKTableAbstractJoin joinOther;
        KTableKTableAbstractJoin joinThis;
        Objects.requireNonNull(other, "other can't be null");
        Objects.requireNonNull(joiner, "joiner can't be null");
        Objects.requireNonNull(joinName, "joinName can't be null");
        NamedInternal renamed = new NamedInternal(joinName);
        String joinMergeName = renamed.orElseGenerateWithPrefix(this.builder, MERGE_NAME);
        Set<String> allSourceNodes = this.ensureCopartitionWith(Collections.singleton((AbstractStream)((Object)other)));
        if (leftOuter) {
            this.enableSendingOldValues(true);
        }
        if (rightOuter) {
            ((KTableImpl)other).enableSendingOldValues(true);
        }
        if (!leftOuter) {
            joinThis = new KTableKTableInnerJoin(this, (KTableImpl)other, joiner);
            joinOther = new KTableKTableInnerJoin((KTableImpl)other, this, KTableImpl.reverseJoiner(joiner));
        } else if (!rightOuter) {
            joinThis = new KTableKTableLeftJoin(this, (KTableImpl)other, joiner);
            joinOther = new KTableKTableRightJoin((KTableImpl)other, this, KTableImpl.reverseJoiner(joiner));
        } else {
            joinThis = new KTableKTableOuterJoin(this, (KTableImpl)other, joiner);
            joinOther = new KTableKTableOuterJoin((KTableImpl)other, this, KTableImpl.reverseJoiner(joiner));
        }
        String joinThisName = renamed.suffixWithOrElseGet("-join-this", this.builder, JOINTHIS_NAME);
        String joinOtherName = renamed.suffixWithOrElseGet("-join-other", this.builder, JOINOTHER_NAME);
        ProcessorParameters joinThisProcessorParameters = new ProcessorParameters(joinThis, joinThisName);
        ProcessorParameters joinOtherProcessorParameters = new ProcessorParameters(joinOther, joinOtherName);
        if (materializedInternal != null) {
            if (materializedInternal.keySerde() == null) {
                materializedInternal.withKeySerde(this.keySerde);
            }
            keySerde = materializedInternal.keySerde();
            valueSerde = materializedInternal.valueSerde();
            queryableStoreName = materializedInternal.storeName();
            storeBuilder = new KeyValueStoreMaterializer<K, VR>(materializedInternal).materialize();
        } else {
            keySerde = this.keySerde;
            valueSerde = null;
            queryableStoreName = null;
            storeBuilder = null;
        }
        KTableKTableJoinNode kTableKTableJoinNode = KTableKTableJoinNode.kTableKTableJoinNodeBuilder().withNodeName(joinMergeName).withJoinThisProcessorParameters(joinThisProcessorParameters).withJoinOtherProcessorParameters(joinOtherProcessorParameters).withThisJoinSideNodeName(this.name).withOtherJoinSideNodeName(((KTableImpl)other).name).withJoinThisStoreNames(this.valueGetterSupplier().storeNames()).withJoinOtherStoreNames(((KTableImpl)other).valueGetterSupplier().storeNames()).withKeySerde(keySerde).withValueSerde(valueSerde).withQueryableStoreName(queryableStoreName).withStoreBuilder(storeBuilder).build();
        boolean isOutputVersioned = materializedInternal != null && materializedInternal.storeSupplier() instanceof VersionedBytesStoreSupplier;
        kTableKTableJoinNode.setOutputVersioned(isOutputVersioned);
        this.builder.addGraphNode(this.graphNode, kTableKTableJoinNode);
        this.builder.addGraphNode(((KTableImpl)other).graphNode, kTableKTableJoinNode);
        return new KTableImpl(kTableKTableJoinNode.nodeName(), kTableKTableJoinNode.keySerde(), kTableKTableJoinNode.valueSerde(), allSourceNodes, kTableKTableJoinNode.queryableStoreName(), kTableKTableJoinNode.joinMerger(), kTableKTableJoinNode, this.builder);
    }

    @Override
    public <K1, V1> KGroupedTable<K1, V1> groupBy(KeyValueMapper<? super K, ? super V, KeyValue<K1, V1>> selector) {
        return this.groupBy(selector, Grouped.with(null, null));
    }

    @Override
    public <K1, V1> KGroupedTable<K1, V1> groupBy(KeyValueMapper<? super K, ? super V, KeyValue<K1, V1>> selector, Grouped<K1, V1> grouped) {
        Objects.requireNonNull(selector, "selector can't be null");
        Objects.requireNonNull(grouped, "grouped can't be null");
        GroupedInternal<K1, V1> groupedInternal = new GroupedInternal<K1, V1>(grouped);
        String selectName = new NamedInternal(groupedInternal.name()).orElseGenerateWithPrefix(this.builder, SELECT_NAME);
        KTableRepartitionMap<? super K, ? super V, K1, V1> selectSupplier = new KTableRepartitionMap<K, V, K1, V1>(this, selector);
        ProcessorParameters<? super K, ? super V, K1, V1> processorParameters = new ProcessorParameters<K, V, K1, V1>(selectSupplier, selectName);
        TableRepartitionMapNode<? super K, ? super V> groupByMapNode = new TableRepartitionMapNode<K, V>(selectName, processorParameters);
        this.builder.addGraphNode(this.graphNode, groupByMapNode);
        this.enableSendingOldValues(true);
        return new KGroupedTableImpl<K1, V1>(this.builder, selectName, this.subTopologySourceNodes, groupedInternal, groupByMapNode);
    }

    public KTableValueGetterSupplier<K, V> valueGetterSupplier() {
        if (this.processorSupplier instanceof KTableSource) {
            KTableSource source = (KTableSource)this.processorSupplier;
            source.materialize();
            return new KTableSourceValueGetterSupplier(source.queryableName());
        }
        if (this.processorSupplier instanceof KStreamAggProcessorSupplier) {
            return ((KStreamAggProcessorSupplier)this.processorSupplier).view();
        }
        return ((KTableProcessorSupplier)this.processorSupplier).view();
    }

    public boolean enableSendingOldValues(boolean forceMaterialization) {
        if (!this.sendOldValues) {
            KTableProcessorSupplier tableProcessorSupplier;
            if (this.processorSupplier instanceof KTableSource) {
                KTableSource source = (KTableSource)this.processorSupplier;
                if (!forceMaterialization && !source.materialized()) {
                    return false;
                }
                source.enableSendingOldValues();
            } else if (this.processorSupplier instanceof KStreamAggProcessorSupplier) {
                ((KStreamAggProcessorSupplier)this.processorSupplier).enableSendingOldValues();
            } else if (this.processorSupplier instanceof KTableProcessorSupplier && !(tableProcessorSupplier = (KTableProcessorSupplier)this.processorSupplier).enableSendingOldValues(forceMaterialization)) {
                return false;
            }
            this.sendOldValues = true;
        }
        return true;
    }

    boolean sendingOldValueEnabled() {
        return this.sendOldValues;
    }

    private <VR> ProcessorParameters<K, VR, ?, ?> unsafeCastProcessorParametersToCompletelyDifferentType(ProcessorParameters<K, Change<V>, ?, ?> kObjectProcessorParameters) {
        return kObjectProcessorParameters;
    }

    @Override
    public <VR, KO, VO> KTable<K, VR> join(KTable<KO, VO> other, Function<V, KO> foreignKeyExtractor, ValueJoiner<V, VO, VR> joiner) {
        return this.doJoinOnForeignKey(other, foreignKeyExtractor, joiner, TableJoined.with(null, null), Materialized.with(null, null), false);
    }

    @Override
    public <VR, KO, VO> KTable<K, VR> join(KTable<KO, VO> other, Function<V, KO> foreignKeyExtractor, ValueJoiner<V, VO, VR> joiner, Named named) {
        return this.doJoinOnForeignKey(other, foreignKeyExtractor, joiner, TableJoined.as(new NamedInternal(named).name()), Materialized.with(null, null), false);
    }

    @Override
    public <VR, KO, VO> KTable<K, VR> join(KTable<KO, VO> other, Function<V, KO> foreignKeyExtractor, ValueJoiner<V, VO, VR> joiner, TableJoined<K, KO> tableJoined) {
        return this.doJoinOnForeignKey(other, foreignKeyExtractor, joiner, tableJoined, Materialized.with(null, null), false);
    }

    @Override
    public <VR, KO, VO> KTable<K, VR> join(KTable<KO, VO> other, Function<V, KO> foreignKeyExtractor, ValueJoiner<V, VO, VR> joiner, Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
        return this.doJoinOnForeignKey(other, foreignKeyExtractor, joiner, TableJoined.with(null, null), materialized, false);
    }

    @Override
    public <VR, KO, VO> KTable<K, VR> join(KTable<KO, VO> other, Function<V, KO> foreignKeyExtractor, ValueJoiner<V, VO, VR> joiner, Named named, Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
        return this.doJoinOnForeignKey(other, foreignKeyExtractor, joiner, TableJoined.as(new NamedInternal(named).name()), materialized, false);
    }

    @Override
    public <VR, KO, VO> KTable<K, VR> join(KTable<KO, VO> other, Function<V, KO> foreignKeyExtractor, ValueJoiner<V, VO, VR> joiner, TableJoined<K, KO> tableJoined, Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
        return this.doJoinOnForeignKey(other, foreignKeyExtractor, joiner, tableJoined, materialized, false);
    }

    @Override
    public <VR, KO, VO> KTable<K, VR> leftJoin(KTable<KO, VO> other, Function<V, KO> foreignKeyExtractor, ValueJoiner<V, VO, VR> joiner) {
        return this.doJoinOnForeignKey(other, foreignKeyExtractor, joiner, TableJoined.with(null, null), Materialized.with(null, null), true);
    }

    @Override
    public <VR, KO, VO> KTable<K, VR> leftJoin(KTable<KO, VO> other, Function<V, KO> foreignKeyExtractor, ValueJoiner<V, VO, VR> joiner, Named named) {
        return this.doJoinOnForeignKey(other, foreignKeyExtractor, joiner, TableJoined.as(new NamedInternal(named).name()), Materialized.with(null, null), true);
    }

    @Override
    public <VR, KO, VO> KTable<K, VR> leftJoin(KTable<KO, VO> other, Function<V, KO> foreignKeyExtractor, ValueJoiner<V, VO, VR> joiner, TableJoined<K, KO> tableJoined) {
        return this.doJoinOnForeignKey(other, foreignKeyExtractor, joiner, tableJoined, Materialized.with(null, null), true);
    }

    @Override
    public <VR, KO, VO> KTable<K, VR> leftJoin(KTable<KO, VO> other, Function<V, KO> foreignKeyExtractor, ValueJoiner<V, VO, VR> joiner, Named named, Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
        return this.doJoinOnForeignKey(other, foreignKeyExtractor, joiner, TableJoined.as(new NamedInternal(named).name()), materialized, true);
    }

    @Override
    public <VR, KO, VO> KTable<K, VR> leftJoin(KTable<KO, VO> other, Function<V, KO> foreignKeyExtractor, ValueJoiner<V, VO, VR> joiner, TableJoined<K, KO> tableJoined, Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
        return this.doJoinOnForeignKey(other, foreignKeyExtractor, joiner, tableJoined, materialized, true);
    }

    @Override
    public <VR, KO, VO> KTable<K, VR> leftJoin(KTable<KO, VO> other, Function<V, KO> foreignKeyExtractor, ValueJoiner<V, VO, VR> joiner, Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
        return this.doJoinOnForeignKey(other, foreignKeyExtractor, joiner, TableJoined.with(null, null), materialized, true);
    }

    private <VR, KO, VO> KTable<K, VR> doJoinOnForeignKey(KTable<KO, VO> foreignKeyTable, Function<V, KO> foreignKeyExtractor, ValueJoiner<V, VO, VR> joiner, TableJoined<K, KO> tableJoined, Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized, boolean leftJoin) {
        Objects.requireNonNull(foreignKeyTable, "foreignKeyTable can't be null");
        Objects.requireNonNull(foreignKeyExtractor, "foreignKeyExtractor can't be null");
        Objects.requireNonNull(joiner, "joiner can't be null");
        Objects.requireNonNull(tableJoined, "tableJoined can't be null");
        Objects.requireNonNull(materialized, "materialized can't be null");
        ((KTableImpl)foreignKeyTable).enableSendingOldValues(true);
        this.enableSendingOldValues(true);
        TableJoinedInternal tableJoinedInternal = new TableJoinedInternal(tableJoined);
        NamedInternal renamed = new NamedInternal(tableJoinedInternal.name());
        String subscriptionTopicName = renamed.suffixWithOrElseGet("-subscription-registration", this.builder, SUBSCRIPTION_REGISTRATION) + TOPIC_SUFFIX;
        Supplier<String> subscriptionPrimaryKeySerdePseudoTopic = () -> this.internalTopologyBuilder().decoratePseudoTopic(subscriptionTopicName + "-pk");
        Supplier<String> subscriptionForeignKeySerdePseudoTopic = () -> this.internalTopologyBuilder().decoratePseudoTopic(subscriptionTopicName + "-fk");
        Supplier<String> valueHashSerdePseudoTopic = () -> this.internalTopologyBuilder().decoratePseudoTopic(subscriptionTopicName + "-vh");
        this.builder.internalTopologyBuilder.addInternalTopic(subscriptionTopicName, InternalTopicProperties.empty());
        Serde foreignKeySerde = ((KTableImpl)foreignKeyTable).keySerde;
        SubscriptionWrapperSerde subscriptionWrapperSerde = new SubscriptionWrapperSerde(subscriptionPrimaryKeySerdePseudoTopic, this.keySerde);
        SubscriptionResponseWrapperSerde responseWrapperSerde = new SubscriptionResponseWrapperSerde(((KTableImpl)foreignKeyTable).valueSerde);
        CombinedKeySchema combinedKeySchema = new CombinedKeySchema(subscriptionForeignKeySerdePseudoTopic, foreignKeySerde, subscriptionPrimaryKeySerdePseudoTopic, this.keySerde);
        ForeignJoinSubscriptionSendNode subscriptionSendNode = new ForeignJoinSubscriptionSendNode(new ProcessorParameters(new SubscriptionSendProcessorSupplier(foreignKeyExtractor, subscriptionForeignKeySerdePseudoTopic, valueHashSerdePseudoTopic, foreignKeySerde, this.valueSerde == null ? null : this.valueSerde.serializer(), leftJoin), renamed.suffixWithOrElseGet("-subscription-registration-processor", this.builder, SUBSCRIPTION_REGISTRATION)));
        this.builder.addGraphNode(this.graphNode, subscriptionSendNode);
        StreamPartitioner<Object, SubscriptionWrapper> subscriptionSinkPartitioner = tableJoinedInternal.otherPartitioner() == null ? null : (topic, key, val, numPartitions) -> this.getPartition.apply(tableJoinedInternal.otherPartitioner().partitions(topic, key, null, numPartitions));
        StreamSinkNode<Object, SubscriptionWrapper> subscriptionSink = new StreamSinkNode<Object, SubscriptionWrapper>(renamed.suffixWithOrElseGet("-subscription-registration-sink", this.builder, SINK_NAME), new StaticTopicNameExtractor(subscriptionTopicName), new ProducedInternal<Object, SubscriptionWrapper>(Produced.with(foreignKeySerde, subscriptionWrapperSerde, subscriptionSinkPartitioner)));
        this.builder.addGraphNode(subscriptionSendNode, subscriptionSink);
        StreamSourceNode subscriptionSource = new StreamSourceNode(renamed.suffixWithOrElseGet("-subscription-registration-source", this.builder, SOURCE_NAME), Collections.singleton(subscriptionTopicName), new ConsumedInternal(Consumed.with(foreignKeySerde, subscriptionWrapperSerde)));
        this.builder.addGraphNode(subscriptionSink, subscriptionSource);
        HashSet<String> copartitionedRepartitionSources = new HashSet<String>(((KTableImpl)foreignKeyTable).subTopologySourceNodes);
        copartitionedRepartitionSources.add(subscriptionSource.nodeName());
        this.builder.internalTopologyBuilder.copartitionSources(copartitionedRepartitionSources);
        StoreBuilder subscriptionStore = Stores.timestampedKeyValueStoreBuilder(Stores.persistentTimestampedKeyValueStore(renamed.suffixWithOrElseGet("-subscription-store", this.builder, FK_JOIN_STATE_STORE_NAME)), new Serdes.BytesSerde(), subscriptionWrapperSerde);
        this.builder.addStateStore(subscriptionStore);
        StatefulProcessorNode subscriptionReceiveNode = new StatefulProcessorNode(new ProcessorParameters(new SubscriptionReceiveProcessorSupplier(subscriptionStore, combinedKeySchema), renamed.suffixWithOrElseGet("-subscription-receive", this.builder, SUBSCRIPTION_PROCESSOR)), Collections.singleton(subscriptionStore), Collections.emptySet());
        this.builder.addGraphNode(subscriptionSource, subscriptionReceiveNode);
        KTableValueGetterSupplier<K, V> foreignKeyValueGetter = ((KTableImpl)foreignKeyTable).valueGetterSupplier();
        StatefulProcessorNode subscriptionJoinNode = new StatefulProcessorNode(new ProcessorParameters(new SubscriptionJoinProcessorSupplier(foreignKeyValueGetter), renamed.suffixWithOrElseGet("-subscription-join-foreign", this.builder, SUBSCRIPTION_PROCESSOR)), Collections.emptySet(), Collections.singleton(foreignKeyValueGetter));
        this.builder.addGraphNode(subscriptionReceiveNode, subscriptionJoinNode);
        ForeignTableJoinNode foreignTableJoinNode = new ForeignTableJoinNode(new ProcessorParameters(new ForeignTableJoinProcessorSupplier(subscriptionStore, combinedKeySchema), renamed.suffixWithOrElseGet("-foreign-join-subscription", this.builder, SUBSCRIPTION_PROCESSOR)), Collections.singleton(subscriptionStore), Collections.emptySet());
        this.builder.addGraphNode(((KTableImpl)foreignKeyTable).graphNode, foreignTableJoinNode);
        String finalRepartitionTopicName = renamed.suffixWithOrElseGet("-subscription-response", this.builder, SUBSCRIPTION_RESPONSE) + TOPIC_SUFFIX;
        this.builder.internalTopologyBuilder.addInternalTopic(finalRepartitionTopicName, InternalTopicProperties.empty());
        StreamPartitioner<Object, SubscriptionResponseWrapper> foreignResponseSinkPartitioner = tableJoinedInternal.partitioner() == null ? (topic, key, subscriptionResponseWrapper, numPartitions) -> subscriptionResponseWrapper.getPrimaryPartition() : (topic, key, val, numPartitions) -> this.getPartition.apply(tableJoinedInternal.partitioner().partitions(topic, key, null, numPartitions));
        StreamSinkNode<Object, SubscriptionResponseWrapper> foreignResponseSink = new StreamSinkNode<Object, SubscriptionResponseWrapper>(renamed.suffixWithOrElseGet("-subscription-response-sink", this.builder, SINK_NAME), new StaticTopicNameExtractor(finalRepartitionTopicName), new ProducedInternal<Object, SubscriptionResponseWrapper>(Produced.with(this.keySerde, responseWrapperSerde, foreignResponseSinkPartitioner)));
        this.builder.addGraphNode(subscriptionJoinNode, foreignResponseSink);
        this.builder.addGraphNode(foreignTableJoinNode, foreignResponseSink);
        StreamSourceNode foreignResponseSource = new StreamSourceNode(renamed.suffixWithOrElseGet("-subscription-response-source", this.builder, SOURCE_NAME), Collections.singleton(finalRepartitionTopicName), new ConsumedInternal(Consumed.with(this.keySerde, responseWrapperSerde)));
        this.builder.addGraphNode(foreignResponseSink, foreignResponseSource);
        HashSet<String> resultSourceNodes = new HashSet<String>(this.subTopologySourceNodes);
        resultSourceNodes.add(foreignResponseSource.nodeName());
        this.builder.internalTopologyBuilder.copartitionSources(resultSourceNodes);
        KTableValueGetterSupplier<K, V> primaryKeyValueGetter = this.valueGetterSupplier();
        StatefulProcessorNode<K, V> responseJoinNode = new StatefulProcessorNode<K, V>(new ProcessorParameters<K, V, VO, VR>(new ResponseJoinProcessorSupplier<K, V, VO, VR>(primaryKeyValueGetter, this.valueSerde == null ? null : this.valueSerde.serializer(), valueHashSerdePseudoTopic, joiner, leftJoin), renamed.suffixWithOrElseGet("-subscription-response-resolver", this.builder, SUBSCRIPTION_RESPONSE_RESOLVER_PROCESSOR)), Collections.emptySet(), Collections.singleton(primaryKeyValueGetter));
        this.builder.addGraphNode(foreignResponseSource, responseJoinNode);
        String resultProcessorName = renamed.suffixWithOrElseGet("-result", this.builder, FK_JOIN_OUTPUT_NAME);
        MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>>(materialized, this.builder, FK_JOIN_OUTPUT_NAME);
        if (materializedInternal.keySerde() == null) {
            materializedInternal.withKeySerde(this.keySerde);
        }
        KTableSource resultProcessorSupplier = new KTableSource(materializedInternal.storeName(), materializedInternal.queryableStoreName());
        StoreBuilder<?> resultStore = new KeyValueStoreMaterializer<K, VR>(materializedInternal).materialize();
        TableProcessorNode resultNode = new TableProcessorNode(resultProcessorName, new ProcessorParameters(resultProcessorSupplier, resultProcessorName), resultStore);
        resultNode.setOutputVersioned(materializedInternal.storeSupplier() instanceof VersionedBytesStoreSupplier);
        this.builder.addGraphNode(responseJoinNode, resultNode);
        return new KTableImpl<K, S, VR>(resultProcessorName, this.keySerde, materializedInternal.valueSerde(), resultSourceNodes, materializedInternal.storeName(), resultProcessorSupplier, resultNode, this.builder);
    }

    private static void maybeSetOutputVersioned(GraphNode tableNode, MaterializedInternal<?, ?, KeyValueStore<Bytes, byte[]>> materializedInternal) {
        if (materializedInternal != null) {
            tableNode.setOutputVersioned(materializedInternal.storeSupplier() instanceof VersionedBytesStoreSupplier);
        }
    }
}

