package org.apache.kafka.streams.processor.internals;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TopologyConfig;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.errors.UnknownTopologyException;
import org.apache.kafka.streams.internals.StreamsConfigUtils;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.processor.internals.namedtopology.NamedTopology;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/streams/processor/internals/TopologyMetadata.class */
public class TopologyMetadata {
    private Logger log;
    public static final String UNNAMED_TOPOLOGY = "__UNNAMED_TOPOLOGY__";
    private static final Pattern EMPTY_ZERO_LENGTH_PATTERN = Pattern.compile("");
    private final StreamsConfig config;
    private final StreamsConfigUtils.ProcessingMode processingMode;
    private final TopologyVersion version;
    private final TaskExecutionMetadata taskExecutionMetadata;
    private final Set<String> pausedTopologies;
    private final ConcurrentNavigableMap<String, InternalTopologyBuilder> builders;
    private ProcessorTopology globalTopology;
    private final Map<String, StateStore> globalStateStores;
    private final Set<String> allInputTopics;
    private final Map<String, Long> threadVersions;

    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/TopologyMetadata$Subtopology.class */
    public static class Subtopology implements Comparable<Subtopology> {
        final int nodeGroupId;
        final String namedTopology;

        public Subtopology(int i, String str) {
            this.nodeGroupId = i;
            this.namedTopology = str;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            Subtopology subtopology = (Subtopology) obj;
            return this.nodeGroupId == subtopology.nodeGroupId && Objects.equals(this.namedTopology, subtopology.namedTopology);
        }

        public int hashCode() {
            return Objects.hash(Integer.valueOf(this.nodeGroupId), this.namedTopology);
        }

        @Override // java.lang.Comparable
        public int compareTo(Subtopology subtopology) {
            if (this.nodeGroupId != subtopology.nodeGroupId) {
                return Integer.compare(this.nodeGroupId, subtopology.nodeGroupId);
            }
            if (this.namedTopology == null) {
                return subtopology.namedTopology == null ? 0 : -1;
            }
            if (subtopology.namedTopology == null) {
                return 1;
            }
            return this.namedTopology.compareTo(subtopology.namedTopology);
        }
    }

    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/TopologyMetadata$TopologyVersion.class */
    public static class TopologyVersion {
        public AtomicLong topologyVersion = new AtomicLong(0);
        public ReentrantLock topologyLock = new ReentrantLock();
        public Condition topologyCV = this.topologyLock.newCondition();
        public List<TopologyVersionListener> activeTopologyUpdateListeners = new LinkedList();
    }

    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/TopologyMetadata$TopologyVersionListener.class */
    public static class TopologyVersionListener {
        final long topologyVersion;
        final KafkaFutureImpl<Void> future;

        public TopologyVersionListener(long j, KafkaFutureImpl<Void> kafkaFutureImpl) {
            this.topologyVersion = j;
            this.future = kafkaFutureImpl;
        }
    }

    public TopologyMetadata(InternalTopologyBuilder internalTopologyBuilder, StreamsConfig streamsConfig) {
        this.globalStateStores = new HashMap();
        this.allInputTopics = new HashSet();
        this.threadVersions = new ConcurrentHashMap();
        this.version = new TopologyVersion();
        this.processingMode = StreamsConfigUtils.processingMode(streamsConfig);
        this.config = streamsConfig;
        this.log = LoggerFactory.getLogger(getClass());
        this.pausedTopologies = ConcurrentHashMap.newKeySet();
        this.builders = new ConcurrentSkipListMap();
        if (internalTopologyBuilder.hasNamedTopology()) {
            this.builders.put(internalTopologyBuilder.topologyName(), internalTopologyBuilder);
        } else {
            this.builders.put(UNNAMED_TOPOLOGY, internalTopologyBuilder);
        }
        this.taskExecutionMetadata = new TaskExecutionMetadata(this.builders.keySet(), this.pausedTopologies, this.processingMode);
    }

    public TopologyMetadata(ConcurrentNavigableMap<String, InternalTopologyBuilder> concurrentNavigableMap, StreamsConfig streamsConfig) {
        this.globalStateStores = new HashMap();
        this.allInputTopics = new HashSet();
        this.threadVersions = new ConcurrentHashMap();
        this.version = new TopologyVersion();
        this.processingMode = StreamsConfigUtils.processingMode(streamsConfig);
        this.config = streamsConfig;
        this.log = LoggerFactory.getLogger(getClass());
        this.pausedTopologies = ConcurrentHashMap.newKeySet();
        this.builders = concurrentNavigableMap;
        if (concurrentNavigableMap.isEmpty()) {
            this.log.info("Created an empty KafkaStreams app with no topology");
        }
        this.taskExecutionMetadata = new TaskExecutionMetadata(concurrentNavigableMap.keySet(), this.pausedTopologies, this.processingMode);
    }

    public void setLog(LogContext logContext) {
        this.log = logContext.logger(getClass());
    }

    public StreamsConfigUtils.ProcessingMode processingMode() {
        return this.processingMode;
    }

    public long topologyVersion() {
        return this.version.topologyVersion.get();
    }

    private void lock() {
        this.version.topologyLock.lock();
    }

    private void unlock() {
        this.version.topologyLock.unlock();
    }

    public Collection<String> sourceTopicsForTopology(String str) {
        return ((InternalTopologyBuilder) this.builders.get(str)).fullSourceTopicNames();
    }

    public boolean needsUpdate(String str) {
        return this.threadVersions.get(str).longValue() < topologyVersion();
    }

    public void registerThread(String str) {
        this.threadVersions.put(str, 0L);
    }

    public void unregisterThread(String str) {
        this.threadVersions.remove(str);
        maybeNotifyTopologyVersionListeners();
    }

    public TaskExecutionMetadata taskExecutionMetadata() {
        return this.taskExecutionMetadata;
    }

    public void executeTopologyUpdatesAndBumpThreadVersion(Consumer<Set<String>> consumer, Consumer<Set<String>> consumer2) {
        try {
            this.version.topologyLock.lock();
            long j = topologyVersion();
            consumer.accept(namedTopologiesView());
            consumer2.accept(namedTopologiesView());
            this.threadVersions.put(Thread.currentThread().getName(), Long.valueOf(j));
            this.version.topologyLock.unlock();
        } catch (Throwable th) {
            this.version.topologyLock.unlock();
            throw th;
        }
    }

    public void maybeNotifyTopologyVersionListeners() {
        try {
            lock();
            long minimumThreadVersion = minimumThreadVersion();
            ListIterator<TopologyVersionListener> listIterator = this.version.activeTopologyUpdateListeners.listIterator();
            while (listIterator.hasNext()) {
                TopologyVersionListener next = listIterator.next();
                if (minimumThreadVersion >= next.topologyVersion) {
                    next.future.complete((Object) null);
                    listIterator.remove();
                    this.log.info("All threads are now on topology version {}", Long.valueOf(next.topologyVersion));
                }
            }
        } finally {
            unlock();
        }
    }

    private long minimumThreadVersion() {
        return this.threadVersions.values().stream().min((v0, v1) -> {
            return Long.compare(v0, v1);
        }).orElse(Long.MAX_VALUE).longValue();
    }

    public void wakeupThreads() {
        try {
            lock();
            this.version.topologyCV.signalAll();
        } finally {
            unlock();
        }
    }

    public void maybeWaitForNonEmptyTopology(Supplier<StreamThread.State> supplier) {
        if (isEmpty() && supplier.get().isAlive()) {
            try {
                lock();
                while (isEmpty() && supplier.get().isAlive()) {
                    try {
                        this.log.debug("Detected that the topology is currently empty, waiting for something to process");
                        this.version.topologyCV.await();
                    } catch (InterruptedException e) {
                        this.log.error("StreamThread was interrupted while waiting on empty topology", e);
                    }
                }
            } finally {
                unlock();
            }
        }
    }

    public void registerAndBuildNewTopology(KafkaFutureImpl<Void> kafkaFutureImpl, InternalTopologyBuilder internalTopologyBuilder) {
        try {
            try {
                lock();
                buildAndVerifyTopology(internalTopologyBuilder);
                this.log.info("New NamedTopology {} passed validation and will be added, old topology version is {}", internalTopologyBuilder.topologyName(), Long.valueOf(this.version.topologyVersion.get()));
                this.version.topologyVersion.incrementAndGet();
                this.version.activeTopologyUpdateListeners.add(new TopologyVersionListener(topologyVersion(), kafkaFutureImpl));
                this.builders.put(internalTopologyBuilder.topologyName(), internalTopologyBuilder);
                wakeupThreads();
                this.log.info("Added NamedTopology {} and updated topology version to {}", internalTopologyBuilder.topologyName(), Long.valueOf(this.version.topologyVersion.get()));
                unlock();
            } catch (Throwable th) {
                this.log.error("Failed to add NamedTopology {}, please retry the operation.", internalTopologyBuilder.topologyName());
                kafkaFutureImpl.completeExceptionally(th);
                unlock();
            }
        } catch (Throwable th2) {
            unlock();
            throw th2;
        }
    }

    public void pauseTopology(String str) {
        this.pausedTopologies.add(str);
    }

    public boolean isPaused(String str) {
        return this.pausedTopologies.contains(getTopologyNameOrElseUnnamed(str));
    }

    public void resumeTopology(String str) {
        this.pausedTopologies.remove(str);
    }

    public KafkaFuture<Void> unregisterTopology(KafkaFutureImpl<Void> kafkaFutureImpl, String str) {
        try {
            try {
                lock();
                this.log.info("Beginning removal of NamedTopology {}, old topology version is {}", str, Long.valueOf(this.version.topologyVersion.get()));
                this.version.topologyVersion.incrementAndGet();
                this.version.activeTopologyUpdateListeners.add(new TopologyVersionListener(topologyVersion(), kafkaFutureImpl));
                InternalTopologyBuilder internalTopologyBuilder = (InternalTopologyBuilder) this.builders.remove(str);
                List<String> fullSourceTopicNames = internalTopologyBuilder.fullSourceTopicNames();
                Set<String> set = this.allInputTopics;
                Objects.requireNonNull(set);
                fullSourceTopicNames.forEach((v1) -> {
                    r1.remove(v1);
                });
                List<String> allSourcePatternStrings = internalTopologyBuilder.allSourcePatternStrings();
                Set<String> set2 = this.allInputTopics;
                Objects.requireNonNull(set2);
                allSourcePatternStrings.forEach((v1) -> {
                    r1.remove(v1);
                });
                this.log.info("Finished removing NamedTopology {}, topology version was updated to {}", str, Long.valueOf(this.version.topologyVersion.get()));
                unlock();
            } catch (Throwable th) {
                this.log.error("Failed to remove NamedTopology {}, please retry.", str);
                kafkaFutureImpl.completeExceptionally(th);
                unlock();
            }
            return kafkaFutureImpl;
        } catch (Throwable th2) {
            unlock();
            throw th2;
        }
    }

    public TopologyConfig.TaskConfig taskConfig(TaskId taskId) {
        return lookupBuilderForTask(taskId).topologyConfigs().getTaskConfig();
    }

    public void buildAndRewriteTopology() {
        applyToEachBuilder(this::buildAndVerifyTopology);
    }

    private void buildAndVerifyTopology(InternalTopologyBuilder internalTopologyBuilder) {
        internalTopologyBuilder.rewriteTopology(this.config);
        internalTopologyBuilder.buildTopology();
        HashSet hashSet = new HashSet(this.allInputTopics);
        int size = hashSet.size();
        List<String> fullSourceTopicNames = internalTopologyBuilder.fullSourceTopicNames();
        List<String> allSourcePatternStrings = internalTopologyBuilder.allSourcePatternStrings();
        HashSet hashSet2 = new HashSet(fullSourceTopicNames);
        hashSet2.addAll(allSourcePatternStrings);
        int size2 = hashSet2.size();
        hashSet.addAll(hashSet2);
        if (hashSet.size() != size + size2) {
            fullSourceTopicNames.retainAll(hashSet);
            allSourcePatternStrings.retainAll(hashSet);
            this.log.error("Tried to add the NamedTopology {} but it had overlap with other input topics {} or patterns {}", new Object[]{internalTopologyBuilder.topologyName(), fullSourceTopicNames, allSourcePatternStrings});
            throw new TopologyException("Named Topologies may not subscribe to the same input topics or patterns");
        }
        ProcessorTopology buildGlobalStateTopology = internalTopologyBuilder.buildGlobalStateTopology();
        if (buildGlobalStateTopology != null) {
            if (internalTopologyBuilder.topologyName() != null) {
                throw new IllegalStateException("Global state stores are not supported with Named Topologies");
            }
            if (this.globalTopology != null) {
                throw new TopologyException("Topology builder had global state, but global topology has already been set");
            }
            this.globalTopology = buildGlobalStateTopology;
            this.globalStateStores.putAll(internalTopologyBuilder.globalStateStores());
        }
        this.allInputTopics.addAll(hashSet2);
    }

    public int numStreamThreads(StreamsConfig streamsConfig) {
        int intValue = streamsConfig.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG).intValue();
        if (hasNamedTopologies()) {
            if (hasNoLocalTopology()) {
                this.log.error("Detected a named topology with no input topics, a named topology may not be empty.");
                throw new TopologyException("Topology has no stream threads and no global threads, must subscribe to at least one source topic or pattern.");
            }
        } else if (hasNoLocalTopology() && !hasGlobalTopology()) {
            this.log.error("Topology with no input topics will create no stream threads and no global thread.");
            throw new TopologyException("Topology has no stream threads and no global threads, must subscribe to at least one source topic or global table.");
        }
        if (intValue == 0 || !hasNoLocalTopology()) {
            return intValue;
        }
        this.log.info("Overriding number of StreamThreads to zero for global-only topology");
        return 0;
    }

    public boolean hasNamedTopologies() {
        return !this.builders.containsKey(UNNAMED_TOPOLOGY);
    }

    public Set<String> namedTopologiesView() {
        return hasNamedTopologies() ? Collections.unmodifiableSet(this.builders.keySet()) : Collections.emptySet();
    }

    public boolean hasGlobalTopology() {
        return evaluateConditionIsTrueForAnyBuilders((v0) -> {
            return v0.hasGlobalStores();
        });
    }

    public boolean hasNoLocalTopology() {
        return evaluateConditionIsTrueForAnyBuilders((v0) -> {
            return v0.hasNoLocalTopology();
        });
    }

    public boolean hasPersistentStores() {
        if (hasNamedTopologies()) {
            return true;
        }
        return evaluateConditionIsTrueForAnyBuilders((v0) -> {
            return v0.hasPersistentStores();
        });
    }

    public boolean hasStore(String str) {
        return evaluateConditionIsTrueForAnyBuilders(internalTopologyBuilder -> {
            return Boolean.valueOf(internalTopologyBuilder.hasStore(str));
        });
    }

    public boolean hasOffsetResetOverrides() {
        return hasNamedTopologies() || evaluateConditionIsTrueForAnyBuilders((v0) -> {
            return v0.hasOffsetResetOverrides();
        });
    }

    public Optional<AutoOffsetResetStrategy> offsetResetStrategy(String str) {
        for (InternalTopologyBuilder internalTopologyBuilder : this.builders.values()) {
            if (internalTopologyBuilder.containsTopic(str)) {
                return Optional.ofNullable(internalTopologyBuilder.offsetResetStrategy(str));
            }
        }
        this.log.warn("Unable to look up offset reset strategy for topic {} as this topic does not appear in the sources of any of the current topologies: {}\n This may be due to natural race condition when removing a topology but it should not persist or appear frequently.", str, namedTopologiesView());
        return null;
    }

    public Collection<String> fullSourceTopicNamesForTopology(String str) {
        Objects.requireNonNull(str, "topology name must not be null");
        return lookupBuilderForNamedTopology(str).fullSourceTopicNames();
    }

    public Collection<String> allFullSourceTopicNames() {
        ArrayList arrayList = new ArrayList();
        applyToEachBuilder(internalTopologyBuilder -> {
            arrayList.addAll(internalTopologyBuilder.fullSourceTopicNames());
        });
        return arrayList;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Pattern sourceTopicPattern() {
        StringBuilder sb = new StringBuilder();
        applyToEachBuilder(internalTopologyBuilder -> {
            String sourceTopicPatternString = internalTopologyBuilder.sourceTopicPatternString();
            if (sourceTopicPatternString.isEmpty()) {
                return;
            }
            sb.append(sourceTopicPatternString).append("|");
        });
        if (sb.length() <= 0) {
            return EMPTY_ZERO_LENGTH_PATTERN;
        }
        sb.setLength(sb.length() - 1);
        return Pattern.compile(sb.toString());
    }

    public boolean usesPatternSubscription() {
        return evaluateConditionIsTrueForAnyBuilders((v0) -> {
            return v0.usesPatternSubscription();
        });
    }

    public boolean isEmpty() {
        return this.builders.isEmpty();
    }

    public String topologyDescriptionString() {
        if (isEmpty()) {
            return "";
        }
        StringBuilder sb = new StringBuilder();
        applyToEachBuilder(internalTopologyBuilder -> {
            sb.append(internalTopologyBuilder.describe().toString());
        });
        return sb.toString();
    }

    public ProcessorTopology buildSubtopology(TaskId taskId) {
        return lookupBuilderForTask(taskId).buildSubtopology(taskId.subtopology());
    }

    public ProcessorTopology globalTaskTopology() {
        if (hasNamedTopologies()) {
            throw new IllegalStateException("Global state stores are not supported with Named Topologies");
        }
        return this.globalTopology;
    }

    public Map<String, StateStore> globalStateStores() {
        return this.globalStateStores;
    }

    public Map<String, List<String>> stateStoreNameToSourceTopicsForTopology(String str) {
        return lookupBuilderForNamedTopology(str).stateStoreNameToFullSourceTopicNames();
    }

    public Set<String> stateStoreNamesForSubtopology(String str, int i) {
        return lookupBuilderForNamedTopology(str).stateStoreNamesForSubtopology(i);
    }

    public Map<String, List<String>> stateStoreNameToSourceTopics() {
        HashMap hashMap = new HashMap();
        applyToEachBuilder(internalTopologyBuilder -> {
            hashMap.putAll(internalTopologyBuilder.stateStoreNameToFullSourceTopicNames());
        });
        return hashMap;
    }

    public String storeForChangelogTopic(String str) {
        Iterator it = this.builders.values().iterator();
        while (it.hasNext()) {
            String storeForChangelogTopic = ((InternalTopologyBuilder) it.next()).storeForChangelogTopic(str);
            if (storeForChangelogTopic != null) {
                return storeForChangelogTopic;
            }
        }
        this.log.warn("Unable to locate any store for topic {}", str);
        return "";
    }

    public Collection<String> sourceTopicsForStore(String str, String str2) {
        return lookupBuilderForNamedTopology(str2).sourceTopicsForStore(str);
    }

    public static String getTopologyNameOrElseUnnamed(String str) {
        return str == null ? UNNAMED_TOPOLOGY : str;
    }

    public Map<Subtopology, InternalTopologyBuilder.TopicsInfo> subtopologyTopicsInfoMapExcluding(Set<String> set) {
        HashMap hashMap = new HashMap();
        applyToEachBuilder(internalTopologyBuilder -> {
            if (set.contains(internalTopologyBuilder.topologyName())) {
                return;
            }
            hashMap.putAll(internalTopologyBuilder.subtopologyToTopicsInfo());
        });
        return hashMap;
    }

    public Map<String, Map<Subtopology, InternalTopologyBuilder.TopicsInfo>> topologyToSubtopologyTopicsInfoMap() {
        HashMap hashMap = new HashMap();
        applyToEachBuilder(internalTopologyBuilder -> {
            hashMap.put(internalTopologyBuilder.topologyName(), internalTopologyBuilder.subtopologyToTopicsInfo());
        });
        return hashMap;
    }

    public Map<String, List<String>> nodeToSourceTopics(TaskId taskId) {
        return lookupBuilderForTask(taskId).nodeToSourceTopics();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void addSubscribedTopicsFromMetadata(Set<String> set, String str) {
        applyToEachBuilder(internalTopologyBuilder -> {
            internalTopologyBuilder.addSubscribedTopicsFromMetadata(set, str);
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void addSubscribedTopicsFromAssignment(Set<TopicPartition> set, String str) {
        applyToEachBuilder(internalTopologyBuilder -> {
            internalTopologyBuilder.addSubscribedTopicsFromAssignment(set, str);
        });
    }

    public Collection<Set<String>> copartitionGroups() {
        ArrayList arrayList = new ArrayList();
        applyToEachBuilder(internalTopologyBuilder -> {
            arrayList.addAll(internalTopologyBuilder.copartitionGroups());
        });
        return arrayList;
    }

    private InternalTopologyBuilder lookupBuilderForTask(TaskId taskId) {
        InternalTopologyBuilder internalTopologyBuilder = taskId.topologyName() == null ? (InternalTopologyBuilder) this.builders.get(UNNAMED_TOPOLOGY) : (InternalTopologyBuilder) this.builders.get(taskId.topologyName());
        if (internalTopologyBuilder == null) {
            throw new UnknownTopologyException("Unable to locate topology builder", taskId.topologyName());
        }
        return internalTopologyBuilder;
    }

    public Collection<NamedTopology> allNamedTopologies() {
        return (Collection) this.builders.values().stream().map((v0) -> {
            return v0.namedTopology();
        }).collect(Collectors.toSet());
    }

    public InternalTopologyBuilder lookupBuilderForNamedTopology(String str) {
        return (InternalTopologyBuilder) this.builders.get(getTopologyNameOrElseUnnamed(str));
    }

    private boolean evaluateConditionIsTrueForAnyBuilders(Function<InternalTopologyBuilder, Boolean> function) {
        Iterator it = this.builders.values().iterator();
        while (it.hasNext()) {
            if (function.apply((InternalTopologyBuilder) it.next()).booleanValue()) {
                return true;
            }
        }
        return false;
    }

    private void applyToEachBuilder(Consumer<InternalTopologyBuilder> consumer) {
        Iterator it = this.builders.values().iterator();
        while (it.hasNext()) {
            consumer.accept((InternalTopologyBuilder) it.next());
        }
    }
}
